In [1]:
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler

from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np
import os


In [2]:
# navigate to dataset in order to extract csv files
curr_dir = os.getcwd()

folder_path = os.path.join(curr_dir, "fifa_dataset")
files = os.listdir(folder_path)

male_files = [file for file in files if file.startswith('p') and file.endswith('.csv')]

appName = "FIFA Dataset Ingestion"
master = "local"



In [3]:
# start spark session - throttle memory threshold to 6GB
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("SystemsToolChains") \
    .config("spark.driver.memory", "6g") \
    .getOrCreate()


# combine all years csv for male players
master_df = None
for csv_file in male_files:
    file_path = os.path.join(folder_path, csv_file)

    current_df = spark.read.csv(file_path, header=True, inferSchema=True)
    year = f"20{csv_file.split('_')[-1].split('.')[0]}"
    current_df = current_df.withColumn("year", lit(year).cast("int"))
    if not master_df:
        master_df = current_df
    else:
        master_df = master_df.union(current_df)

master_df = master_df.withColumn("unique_id", monotonically_increasing_id())

master_df.printSchema()
    
    

23/11/16 01:27:17 WARN Utils: Your hostname, Nithanths-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 10.0.0.101 instead (on interface en0)
23/11/16 01:27:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/16 01:27:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: date (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable = true)
 

In [4]:
# drop columns with over 50% null values
null_percentage = [(col_name, (master_df.filter(col(col_name).isNull()).count() / master_df.count()) * 100.0)
                   for col_name in master_df.columns]

drop_50 = [col_name for col_name, percentage in null_percentage if percentage > 50.0]
master_df = master_df.drop(*drop_50)


# drop uninformative features
irrelevant_features = [
    'sofifa_id', 'player_url', 'short_name', 'long_name', 'player_positions', 'dob',
    'player_face_url', 'club_name', 'club_logo_url', 'club_flag_url', 'nation_team_id', 
    'nation_logo_url', 'nation_flag_url', 'club_jersey_number', 
    'club_loaned_from', 'release_clause_eur', 'player_tags', 
    'player_traits', 'real_face', 'nationality_id', 
    'nation_jersey_number', 'year' 
] 

master_df = master_df.drop(*irrelevant_features)


master_df.printSchema()


                                                                                

root
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_joined: date (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_name: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- weak_foot: integer (nullable = true)
 |-- skill_moves: integer (nullable = true)
 |-- international_reputation: integer (nullable = true)
 |-- work_rate: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- pace: integer (nullable = true)
 |-- shooting: integer (nullable = true)
 |-- passing: integer (nullable = true)

In [5]:
# clean up body type string feature
master_df = master_df.withColumn("body_type", trim(regexp_replace("body_type", r'\(.*\)', '')))
master_df.select('body_type').show(15)

+---------+
|body_type|
+---------+
|   Unique|
|   Unique|
|   Unique|
|   Unique|
|   Unique|
|   Unique|
|   Unique|
|   Unique|
|   Unique|
|   Unique|
|   Unique|
|   Normal|
|   Unique|
|   Unique|
|   Unique|
+---------+
only showing top 15 rows



In [6]:
from pyspark.sql.functions import udf, col, year
from pyspark.sql.types import IntegerType

def calculate_contract_length_spark(df):
    """
    Calculate the length of the contract 
    'club_joined' is a date from which the year will be extracted.
    'club_contract_valid_until' is an integer representing a year.
    """

    # Define UDF to calculate total contract length
    def contract_length_udf(joined_year, valid_until_year):
        if joined_year is not None and valid_until_year is not None:
            return valid_until_year - joined_year
        else:
            return None  

    # Register UDF to run
    contract_length_udf_spark = udf(contract_length_udf, IntegerType())

    # Extract year from 'club_joined' and cast 'club_contract_valid_until' to integer for calculation
    df = df.withColumn('joined_year', year(col('club_joined'))) \
           .withColumn('valid_until_year', col('club_contract_valid_until').cast(IntegerType()))

    df = df.withColumn('contract_length', contract_length_udf_spark(col('joined_year'), col('valid_until_year')))

    df = df.drop('joined_year', 'valid_until_year', 'club_joined', 'club_contract_valid_until')


    return df

master_df = calculate_contract_length_spark(master_df)

master_df.select('contract_length').show(15)

+---------------+
|contract_length|
+---------------+
|              2|
|              9|
|              2|
|              8|
|             10|
|              9|
|              4|
|             12|
|             11|
|             14|
|              7|
|             14|
|              8|
|             10|
|             12|
+---------------+
only showing top 15 rows



                                                                                

In [7]:
# preprocessing skill columns by handling all suffixes
def combine_skill_values(value):
    if isinstance(value, str):
        if '+' in value:
            parts = value.split('+')
            return int(parts[0]) + int(parts[1])
        elif '-' in value:
            parts = value.split('-')
            return int(parts[0]) - int(parts[1])
        else:
            return int(value)
    return value

skill_columns = [
    "ls", "st", "rs", "lw", "lf", "cf", "rf", "rw",
    "lam", "cam", "ram", "lm", "lcm", "cm", "rcm", "rm",
    "lwb", "ldm", "cdm", "rdm", "rwb", "lb", "lcb", "cb",
    "rcb", "rb", "gk"
]

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

map_skill = udf(combine_skill_values, IntegerType())


for col_name in skill_columns:
    master_df = master_df.withColumn(col_name, map_skill(master_df[col_name]))
master_df.show(15)

23/11/16 01:28:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+---------+---------+--------+---+---------+---------+------------+--------------------+------------+-------------+----------------+--------------+---------+-----------+------------------------+-------------+---------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+-------------------+---------------------------+-------------------------+------------------------+------------------+--------------------+-------------------+-----------------------+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---

In [8]:
# clean up the categorical columns and visualize both numerical and categorical features
master_df = master_df.withColumn('mentality_composure', col('mentality_composure').cast(IntegerType()))
numerical_cols = [f.name for f in master_df.schema.fields if isinstance(f.dataType, (IntegerType, DoubleType, FloatType))]
categorical_cols = [f.name for f in master_df.schema.fields if isinstance(f.dataType, (StringType))]
numerical_cols.remove("overall")
print(numerical_cols)
print(categorical_cols)


['potential', 'value_eur', 'wage_eur', 'age', 'height_cm', 'weight_kg', 'club_team_id', 'league_level', 'weak_foot', 'skill_moves', 'international_reputation', 'pace', 'shooting', 'passing', 'dribbling', 'defending', 'physic', 'attacking_crossing', 'attacking_finishing', 'attacking_heading_accuracy', 'attacking_short_passing', 'attacking_volleys', 'skill_dribbling', 'skill_curve', 'skill_fk_accuracy', 'skill_long_passing', 'skill_ball_control', 'movement_acceleration', 'movement_sprint_speed', 'movement_agility', 'movement_reactions', 'movement_balance', 'power_shot_power', 'power_jumping', 'power_stamina', 'power_strength', 'power_long_shots', 'mentality_aggression', 'mentality_interceptions', 'mentality_positioning', 'mentality_vision', 'mentality_penalties', 'mentality_composure', 'defending_marking_awareness', 'defending_standing_tackle', 'defending_sliding_tackle', 'goalkeeping_diving', 'goalkeeping_handling', 'goalkeeping_kicking', 'goalkeeping_positioning', 'goalkeeping_reflexes

In [9]:
# checking unique count of categorical columns to decide on encoding strategy
for col_name in categorical_cols:
    print(f"Unique values in {col_name}:")
    unique_values = master_df.select(col_name).distinct().collect()
    unique_values_list = [row[col_name] for row in unique_values]
    print(unique_values_list)
    print(len(unique_values_list))
    print("\n")

Unique values in league_name:


                                                                                

['Spain Primera Division', 'UAE Arabian Gulf League', 'English Premier League', 'Turkish Süper Lig', 'Swiss Super League', 'Korean K League 1', 'French Ligue 1', 'Ukrainian Premier League', 'Croatian Prva HNL', None, 'Scottish Premiership', 'Romanian Liga I', 'Chinese Super League', 'Greek Super League', 'French Ligue 2', 'South African Premier Division', 'Mexican Liga MX', 'Portuguese Liga ZON SAGRES', 'Danish Superliga', 'German 1. Bundesliga', 'Swedish Allsvenskan', 'German 2. Bundesliga', 'Campeonato Brasileiro Série A', 'Russian Premier League', 'Holland Eredivisie', 'Paraguayan Primera División', 'English League Championship', 'Polish T-Mobile Ekstraklasa', 'German 3. Bundesliga', 'Liga de Fútbol Profesional Boliviano', 'Chilian Campeonato Nacional', 'Cypriot First Division', 'Japanese J. League Division 1', 'Hungarian Nemzeti Bajnokság I', 'Austrian Football Bundesliga', 'USA Major League Soccer', 'Norwegian Eliteserien', 'Belgian Jupiler Pro League', 'Indian Super League', 'Col

In [10]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, FeatureHasher
from pyspark.ml import Pipeline

def preprocess_and_split_spark_df(df, target_col_name, numerical_cols, categorical_cols, train_ratio=0.7, val_ratio=0.15):
    # Update categorical_cols to exclude features too large for one hot encoding
    categorical_cols = [col for col in categorical_cols if col not in ['nationality_name', 'league_name']]

    # Splitting into features and target
    feature_df = df.drop(target_col_name)
    target_df = df.select("unique_id", target_col_name)

    # Impute missing values in numerical columns
    numerical_imputer = Imputer(inputCols=numerical_cols, outputCols=numerical_cols, strategy="mean")

    # normalize numerical features
    numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
    numerical_scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_numerical_features")

    # Impute missing values in categorical columns with the mode of feature
    for col_name in categorical_cols:
        mode = feature_df.groupBy(col_name).count().orderBy('count', ascending=False).first()[0]
        feature_df = feature_df.withColumn(col_name, when(col(col_name).isNull(), mode).otherwise(col(col_name)))

    # StringIndexer and OneHotEncoder for categorical columns
    indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="keep") for col_name in categorical_cols]
    encoder = OneHotEncoder(inputCols=[col_name + "_index" for col_name in categorical_cols],
                            outputCols=[col_name + "_encoded" for col_name in categorical_cols])

    # Feature Hashers for 'nationality_name' and 'league_name'
    hasher_nationality = FeatureHasher(inputCols=["nationality_name"], outputCol="hashed_nationality", numFeatures=100)
    hasher_league = FeatureHasher(inputCols=["league_name"], outputCol="hashed_league", numFeatures=50)

    # Combine all preprocessors into a pipeline
    pipeline = Pipeline(stages=[numerical_imputer, numerical_assembler, numerical_scaler] + indexers + [encoder, hasher_nationality, hasher_league])
    transformed_feature_df = pipeline.fit(feature_df).transform(feature_df)

    # Join the transformed features with the target 
    joined_df = transformed_feature_df.join(target_df, "unique_id")

    # Apply VectorAssembler to generate comprehensive feature vectors
    assembler_inputs = [col + "_encoded" for col in categorical_cols] + ["scaled_numerical_features", "hashed_nationality", "hashed_league"]
    feature_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
    final_df = feature_assembler.transform(joined_df).select("features", target_col_name)

    
    final_df = final_df.drop('unique_id')

    # Train-Validation-Test Split
    train_df, val_df, test_df = final_df.randomSplit([train_ratio, val_ratio, 1 - train_ratio - val_ratio])

    return train_df, val_df, test_df


train_df, val_df, test_df = preprocess_and_split_spark_df(master_df, 'overall', numerical_cols, categorical_cols)


                                                                                

In [11]:
# sample of transformed and preprocessed dataframes
train_df.show(5)
val_df.show(5)
test_df.show(5)

                                                                                

+--------------------+-------+
|            features|overall|
+--------------------+-------+
|(273,[0,29,31,40,...|     74|
|(273,[0,29,31,40,...|     70|
|(273,[0,29,31,40,...|     70|
|(273,[0,29,31,40,...|     70|
|(273,[0,29,31,40,...|     70|
+--------------------+-------+
only showing top 5 rows

+--------------------+-------+
|            features|overall|
+--------------------+-------+
|(273,[0,29,31,40,...|     70|
|(273,[0,29,31,40,...|     70|
|(273,[0,29,31,40,...|     71|
|(273,[0,29,31,40,...|     72|
|(273,[0,29,31,40,...|     72|
+--------------------+-------+
only showing top 5 rows

+--------------------+-------+
|            features|overall|
+--------------------+-------+
|(273,[0,29,31,40,...|     73|
|(273,[0,29,31,40,...|     73|
|(273,[0,29,31,40,...|     71|
|(273,[0,29,31,40,...|     71|
|(273,[0,29,31,40,...|     70|
+--------------------+-------+
only showing top 5 rows



In [12]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Running Random Forest
rf = RandomForestRegressor(featuresCol='features', labelCol='overall')

rf_model = rf.fit(train_df)

train_predictions_rf = rf_model.transform(train_df)

# evaluator for regression with RMSE metric
evaluator_rf = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="rmse")

# Evaluating model on the training set
train_rmse_rf = evaluator_rf.evaluate(train_predictions_rf)
print("Training Set - Root Mean Squared Error (RMSE):", train_rmse_rf)

# Transform the model on the test dataset
test_predictions_rf = rf_model.transform(test_df)

# Evaluate the model on the test set
test_rmse_rf = evaluator_rf.evaluate(test_predictions_rf)
print("Test Set - Root Mean Squared Error (RMSE):", test_rmse_rf)

# Optionally, evaluate using MAE (Mean Absolute Error)
evaluator_mae_rf = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="mae")
test_mae_rf = evaluator_mae_rf.evaluate(test_predictions_rf)
print("Test Set - Mean Absolute Error (MAE):", test_mae_rf)

23/11/16 01:29:55 WARN DAGScheduler: Broadcasting large task binary with size 1083.0 KiB
                                                                                

Training Set - Root Mean Squared Error (RMSE): 1.6739712983854318


                                                                                

Test Set - Root Mean Squared Error (RMSE): 1.6832906478928829




Test Set - Mean Absolute Error (MAE): 1.1994771721117659


                                                                                

In [13]:
# Define the hyperparameter grid
numTreesList = [10, 20, 40]
maxDepthList = [5, 10]

# Variables to store the best model's information
best_rmse_rf = float("inf")
best_model_rf = None
best_params_rf = {}

# Iterate over all combinations of hyperparameters
for numTrees in numTreesList:
    for maxDepth in maxDepthList:
        # Define the Random Forest model
        rf = RandomForestRegressor(featuresCol='features', labelCol='overall', numTrees=numTrees, maxDepth=maxDepth)

        # Fit the model on the training data
        model_rf = rf.fit(train_df)

        # Evaluate on the validation set
        val_predictions_rf = model_rf.transform(val_df)
        val_rmse_rf = evaluator_rf.evaluate(val_predictions_rf)

        # Log progress
        print(f"Params: numTrees={numTrees}, maxDepth={maxDepth}, Validation RMSE={val_rmse_rf}")

        # Update the best model if the current one is better
        if val_rmse_rf < best_rmse_rf:
            best_rmse_rf = val_rmse_rf
            best_model_rf = model_rf
            best_params_rf = {'numTrees': numTrees, 'maxDepth': maxDepth}

# Log the best parameters and RMSE
print(f"Best Parameters: {best_params_rf}, Best Validation RMSE: {best_rmse_rf}")

                                                                                

Params: numTrees=10, maxDepth=5, Validation RMSE=1.708622260013104


23/11/16 01:30:39 WARN DAGScheduler: Broadcasting large task binary with size 1085.1 KiB
23/11/16 01:30:39 WARN DAGScheduler: Broadcasting large task binary with size 1292.9 KiB
23/11/16 01:30:40 WARN DAGScheduler: Broadcasting large task binary with size 1709.1 KiB
23/11/16 01:30:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/11/16 01:30:44 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
                                                                                

Params: numTrees=10, maxDepth=10, Validation RMSE=0.9463339504372835


23/11/16 01:31:06 WARN DAGScheduler: Broadcasting large task binary with size 1083.0 KiB
                                                                                

Params: numTrees=20, maxDepth=5, Validation RMSE=1.6507870532346285


23/11/16 01:31:20 WARN DAGScheduler: Broadcasting large task binary with size 1083.0 KiB
23/11/16 01:31:20 WARN DAGScheduler: Broadcasting large task binary with size 1290.9 KiB
23/11/16 01:31:21 WARN DAGScheduler: Broadcasting large task binary with size 1706.2 KiB
23/11/16 01:31:23 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/11/16 01:31:26 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/11/16 01:31:34 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
                                                                                

Params: numTrees=20, maxDepth=10, Validation RMSE=0.9170978803514371


23/11/16 01:32:08 WARN DAGScheduler: Broadcasting large task binary with size 1079.2 KiB
23/11/16 01:32:09 WARN DAGScheduler: Broadcasting large task binary with size 1287.2 KiB
                                                                                

Params: numTrees=40, maxDepth=5, Validation RMSE=1.6348742241498835


23/11/16 01:32:24 WARN DAGScheduler: Broadcasting large task binary with size 1079.2 KiB
23/11/16 01:32:25 WARN DAGScheduler: Broadcasting large task binary with size 1287.2 KiB
23/11/16 01:32:26 WARN DAGScheduler: Broadcasting large task binary with size 1702.4 KiB
23/11/16 01:32:27 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/11/16 01:32:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/11/16 01:32:40 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
23/11/16 01:33:01 WARN DAGScheduler: Broadcasting large task binary with size 9.2 MiB
23/11/16 01:33:25 WARN DAGScheduler: Broadcasting large task binary with size 1180.3 KiB
23/11/16 01:33:29 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB

Params: numTrees=40, maxDepth=10, Validation RMSE=0.9044286239464913
Best Parameters: {'numTrees': 40, 'maxDepth': 10}, Best Validation RMSE: 0.9044286239464913


                                                                                

In [14]:
# Evaluate the best model on the test set
test_predictions_rf = best_model_rf.transform(test_df)
test_rmse_rf = evaluator_rf.evaluate(test_predictions_rf)
print("Test Set - Best Model RMSE:", test_rmse_rf)

# Optionally, evaluate using MAE
evaluator_mae_rf = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="mae")
test_mae_rf = evaluator_mae_rf.evaluate(test_predictions_rf)
print("Test Set - Best Model MAE:", test_mae_rf)

                                                                                

Test Set - Best Model RMSE: 0.9169455616192772




Test Set - Best Model MAE: 0.6552045636918004


                                                                                

Best Random Forest Model has 40 trees in its classifier and 10 as the max depth of its trees. The RF model performs really well for regression achieving an RMSE of 0.917 and an MAE of 0.66 on the test set.

In [15]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Define the Linear Regression model
lr = LinearRegression(featuresCol='features', labelCol='overall')

# Fit the model on the training dataset
lr_model = lr.fit(train_df)

# Transform the model on the training dataset to check its performance there
train_predictions = lr_model.transform(train_df)

23/11/16 01:33:50 WARN Instrumentation: [cc9914aa] regParam is zero, which might cause numerical instability and overfitting.
23/11/16 01:33:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/16 01:33:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
23/11/16 01:33:53 WARN Instrumentation: [cc9914aa] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [16]:
# Define an evaluator for regression with RMSE metric
evaluator = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="rmse")

# Evaluate the model on the training set
train_rmse = evaluator.evaluate(train_predictions)
print("Training Set - Root Mean Squared Error (RMSE):", train_rmse)
# Transform the model on the test dataset
test_predictions = lr_model.transform(test_df)

# Evaluate the model on the test set
test_rmse = evaluator.evaluate(test_predictions)
print("Test Set - Root Mean Squared Error (RMSE):", test_rmse)

# Optionally, you can also evaluate using MAE (Mean Absolute Error)
evaluator_mae = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="mae")
test_mae = evaluator_mae.evaluate(test_predictions)
print("Test Set - Mean Absolute Error (MAE):", test_mae)

                                                                                

Training Set - Root Mean Squared Error (RMSE): 1.7255131267280805


                                                                                

Test Set - Root Mean Squared Error (RMSE): 1.748399764438042




Test Set - Mean Absolute Error (MAE): 1.3628229667579255


                                                                                

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression

# perform cross validation
evaluator = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="rmse")
best_rmse = float("inf")
best_model = None
best_params = {}

for regParam in [0.001, 0.01, 0.1]:
    for maxIter in [100, 150]:
       
        lr = LinearRegression(featuresCol='features', labelCol='overall', regParam=regParam, maxIter=maxIter)

        # Fit the model
        model = lr.fit(train_df)

        # Perform evaluation on the validation set
        val_predictions = model.transform(val_df)
        rmse = evaluator.evaluate(val_predictions)

        # Log progress
        print(f"Params: regParam={regParam}, maxIter={maxIter}, RMSE={rmse}")

        # Update best model if current model is better
        if rmse < best_rmse:
            best_rmse = rmse
            best_model = model
            best_params = {'regParam': regParam, 'maxIter': maxIter}

# Log best parameters
print(f"Best Params: {best_params}, Best RMSE: {best_rmse}")


                                                                                

Params: regParam=0.001, maxIter=100, RMSE=1.7444107277932375


                                                                                

Params: regParam=0.001, maxIter=150, RMSE=1.7444107277932561


                                                                                

Params: regParam=0.01, maxIter=100, RMSE=1.7446970837524292


                                                                                

Params: regParam=0.01, maxIter=150, RMSE=1.7446970837524405


                                                                                

Params: regParam=0.1, maxIter=100, RMSE=1.7518591391100211




Params: regParam=0.1, maxIter=150, RMSE=1.7518591391100236
Best Params: {'regParam': 0.001, 'maxIter': 100}, Best RMSE: 1.7444107277932375


                                                                                

In [18]:
# Use the fitted model to make predictions on the test dataset
test_predictions = best_model.transform(test_df)

# Evaluate the model on the test set
test_rmse = evaluator.evaluate(test_predictions)
print("Test Set - Root Mean Squared Error (RMSE):", test_rmse)

# Optionally, evaluate using MAE (Mean Absolute Error)
evaluator_mae = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="mae")
test_mae = evaluator_mae.evaluate(test_predictions)
print("Test Set - Mean Absolute Error (MAE):", test_mae)


                                                                                

Test Set - Root Mean Squared Error (RMSE): 1.748300325981113




Test Set - Mean Absolute Error (MAE): 1.3628200056802837


                                                                                

It seems like the Linear Regression model with regularization of 0.001 and max iterations of 100 performed the best during tuning, with a slightly improved RMSE of 1.744. After applying those parameters to the test set for performance, we get a similar RMSE of 1.7488 and MAE of 1.362 on the test set. So Linear Regression overall does well here with minimal error, but not as well as the random forest model.

In [19]:
# Converting the Spark DF to Tensors (both PyTorch and TensorFlow)
train_pandas_df = train_df.toPandas()
val_pandas_df = val_df.toPandas()
test_pandas_df = test_df.toPandas()

                                                                                

In [20]:
spark.stop()

In [21]:
import torch

# convert a DataFrame to tensors for PyTorch
def convert_to_tensors(df):
    features = torch.tensor(df['features'].apply(lambda x: x.toArray()).tolist()).float()
    targets = torch.tensor(df['overall'].values).float()
    return features, targets


train_features_tensor, train_targets_tensor = convert_to_tensors(train_pandas_df)
val_features_tensor, val_targets_tensor = convert_to_tensors(val_pandas_df)
test_features_tensor, test_targets_tensor = convert_to_tensors(test_pandas_df)


  features = torch.tensor(df['features'].apply(lambda x: x.toArray()).tolist()).float()


In [22]:
import tensorflow as tf
import numpy as np

# convert a DataFrame to tensors for TensorFlow
def convert_to_tf_tensors(df):
    features = np.array(df['features'].apply(lambda x: x.toArray()).tolist())
    targets = df['overall'].values
    return tf.convert_to_tensor(features, dtype=tf.float32), tf.convert_to_tensor(targets, dtype=tf.float32)


train_features_tensor_tf, train_targets_tensor_tf = convert_to_tf_tensors(train_pandas_df)
val_features_tensor_tf, val_targets_tensor_tf = convert_to_tf_tensors(val_pandas_df)
test_features_tensor_tf, test_targets_tensor_tf = convert_to_tf_tensors(test_pandas_df)


In [23]:
print(train_features_tensor.shape, train_targets_tensor.shape)
print(val_features_tensor.shape, val_targets_tensor.shape)
print(test_features_tensor.shape, test_targets_tensor.shape)
print("\n")
print(train_features_tensor_tf.shape, train_targets_tensor_tf.shape)
print(val_features_tensor_tf.shape, val_targets_tensor_tf.shape)
print(test_features_tensor_tf.shape, test_targets_tensor_tf.shape)

torch.Size([99552, 273]) torch.Size([99552])
torch.Size([21332, 273]) torch.Size([21332])
torch.Size([21195, 273]) torch.Size([21195])


(99552, 273) (99552,)
(21332, 273) (21332,)
(21195, 273) (21195,)


In [38]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

class VanillaNet(nn.Module):
    def __init__(self):
        super(VanillaNet, self).__init__()
        self.fc1 = nn.Linear(273, 128)  # 273 input features
        self.fc2 = nn.Linear(128, 1)    # Regression task

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [39]:
# Creating datasets and loaders
train_dataset = TensorDataset(train_features_tensor, train_targets_tensor)
val_dataset = TensorDataset(val_features_tensor, val_targets_tensor)
test_dataset = TensorDataset(test_features_tensor, test_targets_tensor)

train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=64, shuffle=False)

# Initialize model, loss, and optimizer
model = VanillaNet()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(100):  # 100 epochs
    model.train()
    total_train_loss = 0
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets.view(-1, 1))
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)

    # Validation phase
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, targets.view(-1, 1))
            total_val_loss += loss.item()

    avg_val_loss = total_val_loss / len(val_loader)

    # Logging training and validation loss
    print(f'Epoch [{epoch+1}/{100}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')

# Evaluate on test set
model.eval()
total_test_loss = 0
with torch.no_grad():
    for inputs, targets in DataLoader(dataset=test_dataset, batch_size=64, shuffle=False):
        outputs = model(inputs)
        loss = criterion(outputs, targets.view(-1, 1))
        total_test_loss += loss.item()

avg_test_loss = total_test_loss / len(DataLoader(dataset=test_dataset, batch_size=64))
test_rmse = torch.sqrt(torch.tensor(avg_test_loss))
print(f'Test RMSE: {test_rmse.item()}')


Epoch [1/100], Train Loss: 70.0349, Val Loss: 5.3386
Epoch [2/100], Train Loss: 4.7723, Val Loss: 4.2843
Epoch [3/100], Train Loss: 3.9878, Val Loss: 3.7910
Epoch [4/100], Train Loss: 3.5737, Val Loss: 3.4365
Epoch [5/100], Train Loss: 3.3788, Val Loss: 3.4007
Epoch [6/100], Train Loss: 3.3049, Val Loss: 3.1396
Epoch [7/100], Train Loss: 3.2446, Val Loss: 3.2163
Epoch [8/100], Train Loss: 3.2138, Val Loss: 3.1080
Epoch [9/100], Train Loss: 3.2043, Val Loss: 3.3817
Epoch [10/100], Train Loss: 3.1824, Val Loss: 3.1364
Epoch [11/100], Train Loss: 3.2109, Val Loss: 3.1842
Epoch [12/100], Train Loss: 3.2024, Val Loss: 3.2559
Epoch [13/100], Train Loss: 3.1858, Val Loss: 3.1937
Epoch [14/100], Train Loss: 3.1980, Val Loss: 3.2746
Epoch [15/100], Train Loss: 3.1686, Val Loss: 3.5703
Epoch [16/100], Train Loss: 3.1827, Val Loss: 3.0753
Epoch [17/100], Train Loss: 3.1719, Val Loss: 3.6027
Epoch [18/100], Train Loss: 3.1910, Val Loss: 3.1065
Epoch [19/100], Train Loss: 3.1620, Val Loss: 3.0932
E

In [40]:
best_rmse = float('inf')
best_lr = None
best_optimizer_type = None

for lr in [0.001, 0.01, 0.1]:
    for optimizer_type in [optim.Adam, optim.SGD]:
        model = VanillaNet()
        optimizer = optimizer_type(model.parameters(), lr=lr)

        print(f"Training with LR: {lr} and Optimizer: {'Adam' if optimizer_type == optim.Adam else 'SGD'}")
        for epoch in range(100):  # 100 epochs for tuning
            model.train()
            for inputs, targets in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, targets.view(-1, 1))
                loss.backward()
                optimizer.step()

        # Evaluate on validation set
        model.eval()
        with torch.no_grad():
            val_outputs = model(val_features_tensor)
            val_loss = torch.sqrt(criterion(val_outputs, val_targets_tensor.view(-1, 1)))
            val_rmse = val_loss.item()
        print(f"Validation RMSE: {val_rmse}")

        if val_rmse < best_rmse:
            best_rmse = val_rmse
            best_lr = lr
            best_optimizer_type = optimizer_type
            best_model_state = model.state_dict()

print(f'Best Learning Rate: {best_lr}, Best Optimizer: {best_optimizer_type.__name__}, Best RMSE: {best_rmse}')


Training with LR: 0.001 and Optimizer: Adam
Validation RMSE: 0.7278169989585876
Training with LR: 0.001 and Optimizer: SGD
Validation RMSE: 1.7770209312438965
Training with LR: 0.01 and Optimizer: Adam
Validation RMSE: 0.7353687882423401
Training with LR: 0.01 and Optimizer: SGD
Validation RMSE: 7.022190093994141
Training with LR: 0.1 and Optimizer: Adam
Validation RMSE: 2.38685941696167
Training with LR: 0.1 and Optimizer: SGD
Validation RMSE: 7.022299766540527
Best Learning Rate: 0.001, Best Optimizer: Adam, Best RMSE: 0.7278169989585876


In [42]:
tuned_model = VanillaNet()
tuned_model.load_state_dict(best_model_state)

# Evaluate the best model on test set
tuned_model.eval()
with torch.no_grad():
    test_outputs = tuned_model(test_features_tensor)
    test_loss = torch.sqrt(criterion(test_outputs, test_targets_tensor.view(-1, 1)))
    print(f'Test RMSE with Best Model: {test_loss.item()}')

Test RMSE with Best Model: 0.7311059832572937


After tuning our vanilla feedforward neural network to a learning rate of 0.001 and the Adam optimizer, we again found a very minimal RMSE of 0.727 which became 0.7311 on the test set. The vanilla feedforward neural net does about as well as the SparkML random forest model.

In [43]:
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(273, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.fc4(x)
        return x


In [44]:
# Initialize model, loss, and optimizer
model = MLP()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(100):  # 100 epochs
    model.train()
    total_train_loss = 0
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets.view(-1, 1))
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)

    # Validation phase
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, targets.view(-1, 1))
            total_val_loss += loss.item()

    avg_val_loss = total_val_loss / len(val_loader)
    print(f'Epoch [{epoch+1}/{100}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')

# Evaluate on test set
model.eval()
total_test_loss = 0
with torch.no_grad():
    for inputs, targets in DataLoader(dataset=test_dataset, batch_size=64, shuffle=False):
        outputs = model(inputs)
        loss = criterion(outputs, targets.view(-1, 1))
        total_test_loss += loss.item()

avg_test_loss = total_test_loss / len(DataLoader(dataset=test_dataset, batch_size=64))
test_rmse = torch.sqrt(torch.tensor(avg_test_loss))
print(f'Test RMSE: {test_rmse.item()}')

Epoch [1/100], Train Loss: 75.3744, Val Loss: 3.3501
Epoch [2/100], Train Loss: 2.6339, Val Loss: 2.3258
Epoch [3/100], Train Loss: 2.1581, Val Loss: 1.9718
Epoch [4/100], Train Loss: 2.0068, Val Loss: 1.8178
Epoch [5/100], Train Loss: 1.8927, Val Loss: 1.6306
Epoch [6/100], Train Loss: 1.6628, Val Loss: 2.9055
Epoch [7/100], Train Loss: 1.5502, Val Loss: 1.8685
Epoch [8/100], Train Loss: 1.4828, Val Loss: 1.6158
Epoch [9/100], Train Loss: 1.4611, Val Loss: 1.2947
Epoch [10/100], Train Loss: 1.4074, Val Loss: 1.8310
Epoch [11/100], Train Loss: 1.3814, Val Loss: 1.7881
Epoch [12/100], Train Loss: 1.3105, Val Loss: 1.1843
Epoch [13/100], Train Loss: 1.2568, Val Loss: 1.2222
Epoch [14/100], Train Loss: 1.2084, Val Loss: 1.5930
Epoch [15/100], Train Loss: 1.1798, Val Loss: 1.1070
Epoch [16/100], Train Loss: 1.1220, Val Loss: 1.1322
Epoch [17/100], Train Loss: 1.0807, Val Loss: 0.9338
Epoch [18/100], Train Loss: 1.0299, Val Loss: 1.1811
Epoch [19/100], Train Loss: 0.9712, Val Loss: 0.9122
E

In [45]:
# Hyperparameter tuning
best_rmse = float('inf')
best_lr = None
best_optimizer_type = None

for lr in [0.001, 0.01, 0.1]:
    for optimizer_type in [optim.Adam, optim.SGD]:
        model = MLP()
        optimizer = optimizer_type(model.parameters(), lr=lr)

        print(f"Training with LR: {lr} and Optimizer: {'Adam' if optimizer_type == optim.Adam else 'SGD'}")
        for epoch in range(100):  # 50 epochs for tuning
            model.train()
            for inputs, targets in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, targets.view(-1, 1))
                loss.backward()
                optimizer.step()

        # Evaluate on validation set
        model.eval()
        with torch.no_grad():
            val_outputs = model(val_features_tensor)
            val_loss = torch.sqrt(criterion(val_outputs, val_targets_tensor.view(-1, 1)))
            val_rmse = val_loss.item()
        print(f"Validation RMSE: {val_rmse}")

        if val_rmse < best_rmse:
            best_rmse = val_rmse
            best_lr = lr
            best_optimizer_type = optimizer_type
            best_model_state = model.state_dict()

print(f'Best Learning Rate: {best_lr}, Best Optimizer: {best_optimizer_type.__name__}, Best RMSE: {best_rmse}')

Training with LR: 0.001 and Optimizer: Adam
Validation RMSE: 0.7242836356163025
Training with LR: 0.001 and Optimizer: SGD
Validation RMSE: 7.0218586921691895
Training with LR: 0.01 and Optimizer: Adam
Validation RMSE: 0.8764960765838623
Training with LR: 0.01 and Optimizer: SGD
Validation RMSE: nan
Training with LR: 0.1 and Optimizer: Adam
Validation RMSE: 7.022477626800537
Training with LR: 0.1 and Optimizer: SGD
Validation RMSE: nan
Best Learning Rate: 0.001, Best Optimizer: Adam, Best RMSE: 0.7242836356163025


In [46]:
tuned_model = MLP()
tuned_model.load_state_dict(best_model_state)

# Evaluate the best model on test set
tuned_model.eval()
with torch.no_grad():
    test_outputs = tuned_model(test_features_tensor)
    test_loss = torch.sqrt(criterion(test_outputs, test_targets_tensor.view(-1, 1)))
    print(f'Test RMSE with Best Model: {test_loss.item()}')

Test RMSE with Best Model: 0.7273406386375427


Finally, after tuning our custom MLP model, we found the same parameters to be effective (a learning rate of 0.001 and an Adam optimizer), which gave extremely similar resutls with an RMSE of 0.724. When run against the test set, we see that error remain the same aroud 0.727 approximately. With that the models ranked by performance are 1. MLP, 2. Feedforward, 3. Random Forest, 4. Linear Regression. All models achieved a sub 2 RMSE after hyperparamter tuning which is quite robust and effective for this supervised regression problem. It seems like the dataset has been cleaned, preprocessed, and normalized quite well to be able to predict the overall rating of a FIFA player!