Task-I: Build and populate necessary tables (30% of course project
grade)
• Ingest the data from all years (Male: 2015-2022 and Female: 2016-2022) into
one Postgres Database table.
o Conduct any column name changes to ensure data from various years are
properly aligned in the correct columns in your DB table.
• Add a new column for the year. Also, ensure every record can be uniquely
identified in the database table.
• Your tables should be created in schema with the name “fifa”.
• In your ReadMe.md, add a description of the features in the dataset.
• In your ReadMe.md file, comment on the benefit of using PostgreSQL DB table
compared to a NoSQL Database in this case.

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType  

appName = "FifaProject"
master = "local" 
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.jars", "/Users/kozasound/Desktop/14_763_AISTC/postgresql-42.6.2.jar") \
    .getOrCreate() 
print("Spark session started successfully with PostgreSQL driver")

Spark session started successfully with PostgreSQL driver


In [7]:
from pyspark.sql.functions import lit, when, col
from pyspark.sql.types import BooleanType, DoubleType, IntegerType

db_properties = {
    'user': 'fifaproject',
    'password': '18763', 
    'url': 'jdbc:postgresql://localhost:5432/fifaproject',
    'dbtable': 'fifa.players_data',
    'driver': 'org.postgresql.Driver'
}

male_files = [f'/Users/kozasound/Desktop/14_763_AISTC/shawnakk_Project_Option1/archive/players_{year}.csv' for year in range(15, 23)]
male_years = [2000 + year for year in range(15, 23)]
female_files = [f'/Users/kozasound/Desktop/14_763_AISTC/shawnakk_Project_Option1/archive/female_players_{year}.csv' for year in range(16, 23)]
female_years = [2000 + year for year in range(16, 23)]
files = female_files + male_files
years = female_years + male_years
genders = ['Female'] * len(female_files) + ['Male'] * len(male_files)

for file, year, gender in zip(files, years, genders):
    try:
        df = spark.read.csv(file, header=True, inferSchema=True)
        df = df.withColumn("year", lit(year)).withColumn("sex", lit(gender))
        df = df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in df.columns])
        
        columns_to_cast = {
            'value_eur': DoubleType(),
            'wage_eur': DoubleType(),
            'club_team_id': IntegerType(),
            'league_level': IntegerType(),
            'club_jersey_number': IntegerType(),
        }
        for column, col_type in columns_to_cast.items():
            if column in df.columns:
                df = df.withColumn(column, col(column).cast(col_type))
        if 'real_face' in df.columns:
            df = df.withColumn("real_face", when(col("real_face") == "Yes", True).when(col("real_face") == "No", False).otherwise(None).cast(BooleanType()))
        df.write.format("jdbc") \
            .mode("append") \
            .option("url", db_properties['url']) \
            .option("dbtable", db_properties['dbtable']) \
            .option("user", db_properties['user']) \
            .option("password", db_properties['password']) \
            .option("driver", db_properties['driver']) \
            .save()
        
        print(f"Data for year {year}, gender: {gender} written to PostgreSQL.")
    except Exception as e:
        print(f"An error occurred while processing year {year}, gender: {gender}: {e}")
try:
    sample_df = spark.read \
        .format("jdbc") \
        .option("url", db_properties['url']) \
        .option("dbtable", "(SELECT * FROM fifa.players_data LIMIT 5) AS sample") \
        .option("user", db_properties['user']) \
        .option("password", db_properties['password']) \
        .option("driver", db_properties['driver']) \
        .load()
    sample_df.show()
except Exception as e:
    print(f"An error occurred while reading sample data: {e}")



Data for year 2016, gender: Female written to PostgreSQL.
Data for year 2017, gender: Female written to PostgreSQL.
Data for year 2018, gender: Female written to PostgreSQL.
Data for year 2019, gender: Female written to PostgreSQL.
Data for year 2020, gender: Female written to PostgreSQL.
Data for year 2021, gender: Female written to PostgreSQL.
Data for year 2022, gender: Female written to PostgreSQL.


                                                                                

Data for year 2015, gender: Male written to PostgreSQL.


                                                                                

Data for year 2016, gender: Male written to PostgreSQL.


                                                                                

Data for year 2017, gender: Male written to PostgreSQL.


                                                                                

Data for year 2018, gender: Male written to PostgreSQL.


                                                                                

Data for year 2019, gender: Male written to PostgreSQL.


                                                                                

Data for year 2020, gender: Male written to PostgreSQL.


                                                                                

Data for year 2021, gender: Male written to PostgreSQL.


[Stage 116:>                                                        (0 + 1) / 1]

Data for year 2022, gender: Male written to PostgreSQL.
+---------+--------------------+----------+------------------+----------------+-------+---------+---------+--------+---+----------+---------+---------+------------+---------+-----------+------------+-------------+------------------+----------------+-----------+-------------------------+--------------+----------------+--------------+---------------+--------------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+------------------+--------------------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+-----------

                                                                                

Task-II: Conduct analytics on your dataset (20% of course project
grade)
Develop Python functions that run Spark to answer the following questions (given that x,
y and z) are user-entered parameters. Core analysis should be conducted via Spark
and data should be ingested from Postgres database.
• In Year X, what were the Y clubs that had the highest number of players with
contracts ending in year Z (or after)?
o X is a year between (2015 and 2022, inclusively).
o Y is a positive integer.
o Z is a year that can hold the value of 2023 or a year after it.
• In sports, maturity and energy of teams depend on the average age of team
players (among other factors). Therefore, it’s important to have a function that
can find clubs with such features.
o List the X clubs with the highest (or lowest) average player age for a given
year Y.
▪ X represents a positive integer, but you should handle a scenario if
X is not positive value.
▪ Y represents a year between 2015 and 2022 inclusively.
▪ Provide the user with the ability to choose if they want the highest
average age or the lowest average age.
▪ Make sure to handle this scenario as well: if the user requests 5
clubs with highest averages but there are 3 clubs that share the
same count at rank number 5, please include all of them in your
output
• What is the most popular nationality in the dataset for each year? (i.e. display the
most frequent nation for 2015, 2016, etc.).

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, max as max_, avg

print("Bullet Point 1")
def top_clubs_by_contracts(spark, x_year, y_clubs, z_contract):
    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/fifaproject") \
        .option("dbtable", "fifa.players_data") \
        .option("user", "fifaproject") \
        .option("password", "18763") \
        .option("driver", "org.postgresql.Driver") \
        .load()
    
    filt_df = df.filter((col("year") == x_year) & (col("club_contract_valid_until") >= z_contract) & (col("sex") == "Male"))
    club_counts = filt_df.groupBy("club_name") \
                         .agg(count("sofifa_id").alias("player_count")) \
                         .orderBy(col("player_count").desc()) \
                         .limit(y_clubs)
    club_counts.show()

top_clubs_by_contracts(spark, 2020, 5, 2024)

print("Bullet Point 2")
def clubs_by_avg_age(spark, y_year, x_clubs, order='highest'):
    df = spark.read.format("jdbc") \
        .option("url", db_properties['url']) \
        .option("dbtable", db_properties['dbtable']) \
        .option("user", db_properties['user']) \
        .option("password", db_properties['password']) \
        .option("driver", db_properties['driver']) \
        .load()
    
    filt_df = df.filter((col("year") == y_year) & (col("sex") == "Male"))
    avg_age_df = filt_df.groupBy("club_name").agg(avg("age").alias("avg_age"))
    sorted_df = avg_age_df.orderBy(col("avg_age").desc() if order == 'highest' else col("avg_age").asc())
    top_df = sorted_df.limit(x_clubs)
    max_age = top_df.select("avg_age").collect()[-1][0] if top_df.count() == x_clubs else None
    res_df = sorted_df.filter(col("avg_age") >= max_age) if max_age else top_df
    res_df.show()

clubs_by_avg_age(spark, 2020, 5, order='lowest')

print("Bullet Point 3")
def most_popular_nationality(spark):
    df = spark.read.format("jdbc") \
        .option("url", db_properties['url']) \
        .option("dbtable", db_properties['dbtable']) \
        .option("user", db_properties['user']) \
        .option("password", db_properties['password']) \
        .option("driver", db_properties['driver']) \
        .load()
    
    nat_counts = df.filter(col("sex") == "Male").groupBy("year", "nationality_name").agg(count("*").alias("count"))
    max_per_year = nat_counts.groupBy("year").agg(max_("count").alias("max_count"))
    most_popular = nat_counts.alias("nc").join(
        max_per_year.alias("mpy"),
        (col("nc.year") == col("mpy.year")) & (col("nc.count") == col("mpy.max_count"))
    ).select(col("nc.year"), col("nc.nationality_name"), col("nc.count")).orderBy("nc.year")
    most_popular.show()

most_popular_nationality(spark)


Bullet Point 1
+-------------------+------------+
|          club_name|player_count|
+-------------------+------------+
|   Deportes Iquique|          12|
|Patriotas Boyacá FC|          12|
|          Al Ain FC|          11|
|  Alianza Petrolera|          11|
|     Atlético Huila|          11|
+-------------------+------------+

Bullet Point 2
+--------------------+------------------+
|           club_name|           avg_age|
+--------------------+------------------+
|            Barnsley|21.566666666666666|
|          Godoy Cruz|21.607142857142858|
|     Fortuna Sittard|             21.68|
|       SC Heerenveen|21.695652173913043|
|Futebol Clube de ...|22.291666666666668|
|       AFC Wimbledon|22.392857142857142|
|FC Würzburger Kic...|22.428571428571427|
|          RB Leipzig|22.454545454545453|
|        Silkeborg IF| 22.48148148148148|
|           Brentford|22.533333333333335|
|    Waasland-Beveren| 22.53846153846154|
|            OGC Nice|22.551724137931036|
|            KRC Genk|22

Task- III Machine Learning Modeling (30% of course project grade)
• Build a machine learning model that can predict the overall value for each player
based on their skillsets.
o Use proper feature engineering principles (including data cleaning and
data engineering)
o Build two versions: one in Spark and the other one in PyTorch or
Tensorflow.
o For each version, choose two different classifiers/regressors. You can use
the same two choices for Spark and PyTorch/Tensorflow, and neural
networks of substantial different structures (deep vs shallow, MLP vs
CNN) count as two different classifiers/regressors. For each
classifier/regressor, identify a few tunable parameters for your model and
tune the parameters (using proper metric(s)). Then, run the best model
(after tuning) on the test data set and record the test accuracy.
o In your ReadMe file, explain why you chose the classifiers/regressors and
provide comments on the impact of the tunable parameters on the
accuracy. Also, compare the selected models.

Data Cleaning and Data Engineering

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, when, regexp_extract
from pyspark.ml.feature import VectorAssembler, StandardScaler

#load data
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/fifaproject") \
    .option("dbtable", "fifa.players_data") \
    .option("user", "fifaproject") \
    .option("password", "18763") \
    .option("driver", "org.postgresql.Driver") \
    .load()

spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

task3_male_data = df.filter((col("sex") == "Male"))
task3_male_data = task3_male_data.drop('dob')

row_count = df.count()

print(f"Total number of rows with 'sex' == 'Male': {row_count}")

for col_name in task3_male_data.columns:
    if '.' in col_name:
        new_col_name = col_name.replace('.', '_')
        task3_male_data = task3_male_data.withColumnRenamed(col_name, new_col_name)

threshold = 0.1 * task3_male_data.count()
dropped_columns = []

for col_name in task3_male_data.columns:
    null_count = task3_male_data.filter(col(col_name).isNull()).count()
    if null_count > threshold:
        task3_male_data = task3_male_data.drop(col_name)
        dropped_columns.append(col_name)

task3_male_data = task3_male_data.na.drop()

def extract_base_value(col_name):
    return regexp_extract(col(col_name), r"(\d+)", 0).cast('int')

columns_to_process = ['ls', 'st', 'rs', 'lw', 'lf', 'cf', 'rf', 'rw', 'lam', 'cam', 'ram', 
                      'lm', 'lcm', 'cm', 'rcm', 'rm', 'lwb', 'ldm', 'cdm', 'rdm', 'rwb', 
                      'lb', 'lcb', 'cb', 'rcb', 'rb', 'gk']

for col_name in columns_to_process:
    if col_name in task3_male_data.columns:
        task3_male_data = task3_male_data.withColumn(col_name, extract_base_value(col_name))

columns_to_keep = [col for col in task3_male_data.columns if col not in dropped_columns]
task3_male_data = task3_male_data.select(columns_to_keep)

numerical_cols = []
categorical_cols = []
binary_cols = []
nominal_cols = []

for col_name in task3_male_data.columns:
    if dict(task3_male_data.dtypes)[col_name] in ['int', 'double']:
        numerical_cols.append(col_name)
    else:
        categorical_cols.append(col_name)

for col_name in categorical_cols:
    unique_values = task3_male_data.select(col_name).distinct().count()
    if unique_values == 2:
        binary_cols.append(col_name)
    elif unique_values > 2:
        nominal_cols.append(col_name)

for col_name in binary_cols:
    task3_male_data = task3_male_data.withColumn(col_name, 
        when(col(col_name) == 'Right', 1).when(col(col_name) == 'Yes', 1).otherwise(0))

indexers = []
encoders = []

for col_name in nominal_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index")
    encoder = OneHotEncoder(inputCol=col_name + "_index", outputCol=col_name + "_onehot")
    indexers.append(indexer)
    encoders.append(encoder)

pipeline = Pipeline(stages=indexers + encoders)
task3_male_data = pipeline.fit(task3_male_data).transform(task3_male_data)

df_pandas = task3_male_data.select(numerical_cols).toPandas()
correlation_matrix = df_pandas.corr()
threshold = 0.95
to_drop = set()

for i in range(len(correlation_matrix.columns)):
    for j in range(i):
        if abs(correlation_matrix.iloc[i, j]) > threshold:
            colname = correlation_matrix.columns[i]
            to_drop.add(colname)

task3_male_data = task3_male_data.drop(*list(to_drop))

task3_male_data = task3_male_data.drop(*['short_name', 'player_positions', 'work_rate','league_name','nationality_name','club_position', 
                                         'player_url', 'long_name', 'club_name', 'club_joined', 'club_contract_valid_until', 
                                         'body_type', 'player_face_url', 'club_logo_url', 'club_flag_url', 'nation_flag_url', 'sex'])

feature_columns = [col for col in task3_male_data.columns if col != 'overall' and col != 'sofifa_id' and 'onehot' not in col]
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="vectorized_features")
df_with_assembled_features = vector_assembler.transform(task3_male_data)

scaler = StandardScaler(inputCol="vectorized_features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_pipeline = Pipeline(stages=[scaler])
df_maleFIFA_scaled = scaler_pipeline.fit(df_with_assembled_features).transform(df_with_assembled_features)

df_maleFIFA_scaled.select("scaled_features").show(5, truncate=False)


Total number of rows with 'sex' == 'Male': 144323


24/11/14 23:29:33 WARN DAGScheduler: Broadcasting large task binary with size 22.8 MiB
24/11/14 23:29:40 WARN DAGScheduler: Broadcasting large task binary with size 22.8 MiB
24/11/14 23:29:41 WARN DAGScheduler: Broadcasting large task binary with size 22.8 MiB
[Stage 643:>                                                        (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

PySpark Linear Regression and Random Forest Regression Training

In [8]:
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, abs, when


train_data, test_data = df_maleFIFA_scaled.select("sofifa_id", "scaled_features", "overall").randomSplit([0.8, 0.2], seed=42)


rf = RandomForestRegressor(featuresCol="scaled_features", labelCol="overall")
lr = LinearRegression(featuresCol="scaled_features", labelCol="overall")


rf_model = rf.fit(train_data)
lr_model = lr.fit(train_data)


rf_train_predictions = rf_model.transform(train_data)
rf_test_predictions = rf_model.transform(test_data)
lr_train_predictions = lr_model.transform(train_data)
lr_test_predictions = lr_model.transform(test_data)


evaluator = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="rmse")


rf_rmse_train = evaluator.evaluate(rf_train_predictions)
rf_rmse_test = evaluator.evaluate(rf_test_predictions)
lr_rmse_train = evaluator.evaluate(lr_train_predictions)
lr_rmse_test = evaluator.evaluate(lr_test_predictions)

print(f"Random Forest Train RMSE: {rf_rmse_train}")
print(f"Random Forest Test RMSE: {rf_rmse_test}")
print(f"Linear Regression Train RMSE: {lr_rmse_train}")
print(f"Linear Regression Test RMSE: {lr_rmse_test}")


def calculate_accuracy(predictions, threshold=0.05):
    correct_predictions = predictions.withColumn(
        "accuracy",
        when(abs(col("prediction") - col("overall")) / col("overall") <= threshold, 1).otherwise(0)
    )
    accuracy = correct_predictions.select("accuracy").agg({"accuracy": "avg"}).collect()[0][0]
    return accuracy * 100  


rf_accuracy_train = calculate_accuracy(rf_train_predictions)
rf_accuracy_test = calculate_accuracy(rf_test_predictions)
lr_accuracy_train = calculate_accuracy(lr_train_predictions)
lr_accuracy_test = calculate_accuracy(lr_test_predictions)

print(f"Random Forest Train Accuracy: {rf_accuracy_train}%")
print(f"Random Forest Test Accuracy: {rf_accuracy_test}%")
print(f"Linear Regression Train Accuracy: {lr_accuracy_train}%")
print(f"Linear Regression Test Accuracy: {lr_accuracy_test}%")


24/11/14 09:52:04 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:52:09 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:52:15 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:52:20 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:52:27 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:52:27 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:52:28 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
24/11/14 09:52:30 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
24/11/14 09:52:32 WARN Instrumentation: [f361cc90] regParam is zero, which might cause numerical instability and overfitting.
24/11/14 09:52:32 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:52:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netl

Random Forest Train RMSE: 1.6136317765839434
Random Forest Test RMSE: 1.6007248797426743
Linear Regression Train RMSE: 1.8376141742704286
Linear Regression Test RMSE: 1.819436355220917


24/11/14 09:53:06 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:53:12 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:53:17 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/11/14 09:53:23 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
[Stage 1195:>                                                       (0 + 1) / 1]

Random Forest Train Accuracy: 93.75315245010802%
Random Forest Test Accuracy: 93.96486445555051%
Linear Regression Train Accuracy: 91.63185092836682%
Linear Regression Test Accuracy: 91.89004997728306%


                                                                                

PySpark Linear Regression and Random Forest Regression Hyper-parameter Optimization

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, abs, when
import matplotlib.pyplot as plt

rf_param_grid = (ParamGridBuilder()
                 .addGrid(rf.maxDepth, [5, 10, 15])
                 .addGrid(rf.numTrees, [10, 20, 30])
                 .build())

lr_param_grid = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.01, 0.1, 0.5])
                 .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                 .build())

evaluator = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="rmse")

print("Starting cross-validation for Random Forest...")
rf_crossval = CrossValidator(estimator=rf, estimatorParamMaps=rf_param_grid, evaluator=evaluator, numFolds=3)
rf_model_tuned = rf_crossval.fit(train_data)
print("Random Forest cross-validation completed.")

print("Starting cross-validation for Linear Regression...")
lr_crossval = CrossValidator(estimator=lr, estimatorParamMaps=lr_param_grid, evaluator=evaluator, numFolds=3)
lr_model_tuned = lr_crossval.fit(train_data)
print("Linear Regression cross-validation completed.")

rf_cv_metrics = rf_model_tuned.avgMetrics
lr_cv_metrics = lr_model_tuned.avgMetrics

plt.figure(figsize=(12, 6))
plt.plot(rf_cv_metrics, label="Random Forest RMSE across hyperparameters", marker="o")
plt.plot(lr_cv_metrics, label="Linear Regression RMSE across hyperparameters", marker="o")
plt.xlabel("Hyperparameter Combination Index")
plt.ylabel("RMSE")
plt.title("Cross-Validation RMSE for Different Hyperparameter Combinations")
plt.legend()
plt.grid(True)
plt.show()

rf_predictions = rf_model_tuned.transform(test_data)
lr_predictions = lr_model_tuned.transform(test_data)

rf_rmse_test = evaluator.evaluate(rf_predictions)
lr_rmse_test = evaluator.evaluate(lr_predictions)

print(f"Tuned Random Forest Test RMSE: {rf_rmse_test}")
print(f"Tuned Linear Regression Test RMSE: {lr_rmse_test}")

def calculate_accuracy(predictions, threshold=0.05):
    correct_predictions = predictions.withColumn(
        "accuracy",
        when(abs(col("prediction") - col("overall")) / col("overall") <= threshold, 1).otherwise(0)
    )
    accuracy = correct_predictions.select("accuracy").agg({"accuracy": "avg"}).collect()[0][0]
    return accuracy * 100  

rf_accuracy_test = calculate_accuracy(rf_predictions)
lr_accuracy_test = calculate_accuracy(lr_predictions)

print(f"Tuned Random Forest Test Accuracy: {rf_accuracy_test}%")
print(f"Tuned Linear Regression Test Accuracy: {lr_accuracy_test}%")


'\nfrom pyspark.ml.tuning import ParamGridBuilder, CrossValidator\nfrom pyspark.ml.evaluation import RegressionEvaluator\nfrom pyspark.sql.functions import col, abs, when\nimport matplotlib.pyplot as plt\n\nrf_param_grid = (ParamGridBuilder()\n                 .addGrid(rf.maxDepth, [5, 10, 15])\n                 .addGrid(rf.numTrees, [10, 20, 30])\n                 .build())\n\nlr_param_grid = (ParamGridBuilder()\n                 .addGrid(lr.regParam, [0.01, 0.1, 0.5])\n                 .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\n                 .build())\n\nevaluator = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="rmse")\n\nprint("Starting cross-validation for Random Forest...")\nrf_crossval = CrossValidator(estimator=rf, estimatorParamMaps=rf_param_grid, evaluator=evaluator, numFolds=3)\nrf_model_tuned = rf_crossval.fit(train_data)\nprint("Random Forest cross-validation completed.")\n\nprint("Starting cross-validation for Linear Regression...")\

PyTorch Multi Layer Perceptron PyTorch Neural Network

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

df_pandas = df_maleFIFA_scaled.select("scaled_features", "overall").toPandas()
features = np.array(df_pandas['scaled_features'].to_list())
labels = np.array(df_pandas['overall'])

X = torch.tensor(features, dtype=torch.float32)
y = torch.tensor(labels, dtype=torch.float32)

train_size = int(0.8 * len(X))
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

train_data = TensorDataset(X_train, y_train)
test_data = TensorDataset(X_test, y_test)

class MLPModel(nn.Module):
    def __init__(self, input_dim, hidden_dim=128):
        super(MLPModel, self).__init__()
        self.hidden = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.output = nn.Linear(hidden_dim, 1)
        
    def forward(self, x):
        x = self.relu(self.hidden(x))
        x = self.output(x)
        return x

model = MLPModel(input_dim=X_train.shape[1])
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)

num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss}")


test_loader = DataLoader(test_data, batch_size=64, shuffle=False)
def evaluate_model(model, data_loader, criterion):
    model.eval()
    total_loss = 0.0
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for inputs, targets in data_loader:
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), targets)
            total_loss += loss.item()
            all_preds.append(outputs.squeeze())
            all_targets.append(targets)
    all_preds = torch.cat(all_preds)
    all_targets = torch.cat(all_targets)
    rmse = torch.sqrt(torch.tensor(total_loss / len(data_loader)))
    correct = (torch.abs(all_preds - all_targets) / all_targets <= 0.05).float()
    accuracy = correct.mean().item() * 100
    return rmse.item(), accuracy

test_rmse, test_accuracy = evaluate_model(model, test_loader, criterion)
print(f"Test RMSE: {test_rmse}")
print(f"Test Accuracy: {test_accuracy}%")


24/11/14 09:54:42 WARN DAGScheduler: Broadcasting large task binary with size 22.8 MiB
                                                                                

Epoch [1/20], Loss: 401.4917938233582
Epoch [2/20], Loss: 15.706721324990266
Epoch [3/20], Loss: 6.836238778275585
Epoch [4/20], Loss: 3.3112486896723725
Epoch [5/20], Loss: 1.86684951871416
Epoch [6/20], Loss: 1.4007103569678727
Epoch [7/20], Loss: 1.2366277145135722
Epoch [8/20], Loss: 1.1777973684987593
Epoch [9/20], Loss: 1.1290401318031216
Epoch [10/20], Loss: 1.1032842197194876
Epoch [11/20], Loss: 1.0817701754561306
Epoch [12/20], Loss: 1.0613238231942892
Epoch [13/20], Loss: 1.054746926900389
Epoch [14/20], Loss: 1.0456731467532705
Epoch [15/20], Loss: 1.0310680531055967
Epoch [16/20], Loss: 1.0201650545037286
Epoch [17/20], Loss: 1.0133134761140874
Epoch [18/20], Loss: 1.007603878594953
Epoch [19/20], Loss: 0.9940403473819549
Epoch [20/20], Loss: 0.9908718523499159
Test RMSE: 1.1512486934661865
Test Accuracy: 98.0150580406189%


PyTorch Multi Layer Perceptron Neural Network Hyper-parameter Optimization

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt

class MLPModel(nn.Module):
    def __init__(self, input_dim, hidden_dim=128):
        super(MLPModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, 1)
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

df_pandas = df_maleFIFA_scaled.select("scaled_features", "overall").toPandas()
features = np.array(df_pandas['scaled_features'].to_list())
labels = np.array(df_pandas['overall'])

X = torch.tensor(features, dtype=torch.float32)
y = torch.tensor(labels, dtype=torch.float32)

train_size = int(0.8 * len(X))
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

train_data = TensorDataset(X_train, y_train)
test_data = TensorDataset(X_test, y_test)

learning_rates = [0.001, 0.01, 0.1]
batch_sizes = [32, 64, 128]
hidden_dims = [64, 128, 256]
num_epochs = 20

best_model = None
best_loss = float('inf')
best_params = {}
losses_per_config = {}

for lr in learning_rates:
    for batch_size in batch_sizes:
        for hidden_dim in hidden_dims:
            print(f"Starting optimization with lr={lr}, batch_size={batch_size}, hidden_dim={hidden_dim}")
            
            model = MLPModel(input_dim=X_train.shape[1], hidden_dim=hidden_dim)
            criterion = nn.MSELoss()
            optimizer = optim.Adam(model.parameters(), lr=lr)
            train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
            
            epoch_losses = []
            for epoch in range(num_epochs):
                model.train()
                running_loss = 0.0
                for inputs, targets in train_loader:
                    optimizer.zero_grad()
                    outputs = model(inputs)
                    loss = criterion(outputs.squeeze(), targets)
                    loss.backward()
                    optimizer.step()
                    running_loss += loss.item()
                avg_loss = running_loss / len(train_loader)
                epoch_losses.append(avg_loss)
            
            losses_per_config[(lr, batch_size, hidden_dim)] = epoch_losses

            if avg_loss < best_loss:
                best_loss = avg_loss
                best_model = model
                best_params = {'learning_rate': lr, 'batch_size': batch_size, 'hidden_dim': hidden_dim}
            
            print(f"Finished optimization with lr={lr}, batch_size={batch_size}, hidden_dim={hidden_dim}, avg_loss={avg_loss}")

print("Best Hyperparameters:", best_params)
print("Best Training Loss:", best_loss)

plt.figure(figsize=(12, 8))
for config, losses in losses_per_config.items():
    plt.plot(range(1, num_epochs + 1), losses, label=f"lr={config[0]}, batch_size={config[1]}, hidden_dim={config[2]}")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss over Epochs for Each Hyperparameter Configuration")
plt.legend()
plt.show()

test_loader = DataLoader(test_data, batch_size=best_params['batch_size'], shuffle=False)
test_rmse, test_accuracy = evaluate_model(best_model, test_loader, criterion)
print(f"Test RMSE: {test_rmse}")
print(f"Test Accuracy: {test_accuracy}%")


'\nimport torch\nimport torch.nn as nn\nimport torch.optim as optim\nfrom torch.utils.data import DataLoader, TensorDataset\nimport numpy as np\nimport matplotlib.pyplot as plt\n\nclass MLPModel(nn.Module):\n    def __init__(self, input_dim, hidden_dim=128):\n        super(MLPModel, self).__init__()\n        self.fc1 = nn.Linear(input_dim, hidden_dim)\n        self.relu = nn.ReLU()\n        self.fc2 = nn.Linear(hidden_dim, 1)\n        \n    def forward(self, x):\n        x = self.relu(self.fc1(x))\n        x = self.fc2(x)\n        return x\n\ndf_pandas = df_maleFIFA_scaled.select("scaled_features", "overall").toPandas()\nfeatures = np.array(df_pandas[\'scaled_features\'].to_list())\nlabels = np.array(df_pandas[\'overall\'])\n\nX = torch.tensor(features, dtype=torch.float32)\ny = torch.tensor(labels, dtype=torch.float32)\n\ntrain_size = int(0.8 * len(X))\nX_train, X_test = X[:train_size], X[train_size:]\ny_train, y_test = y[:train_size], y[train_size:]\n\ntrain_data = TensorDataset(X_tr

PyTorch Convolutional Neural Network 

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

class CNNModel(nn.Module):
    def __init__(self, input_dim, conv1_out=32, conv2_out=64):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=conv1_out, kernel_size=3)
        self.conv2 = nn.Conv1d(in_channels=conv1_out, out_channels=conv2_out, kernel_size=3)
        self.fc1 = nn.Linear(conv2_out * (input_dim - 4), 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 1)
        
    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

df_pandas = df_maleFIFA_scaled.select("scaled_features", "overall").toPandas()
features = np.array(df_pandas['scaled_features'].to_list())
labels = np.array(df_pandas['overall'])

X = torch.tensor(features, dtype=torch.float32)
y = torch.tensor(labels, dtype=torch.float32)

train_size = int(0.8 * len(X))
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

train_data = TensorDataset(X_train, y_train)
test_data = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

model = CNNModel(input_dim=X_train.shape[1])

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss}")

def evaluate_model(model, data_loader, criterion, threshold=0.1):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_samples = 0
    with torch.no_grad():
        for inputs, targets in data_loader:
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), targets)
            total_loss += loss.item()
            predictions = outputs.squeeze()
            correct_predictions = (torch.abs(predictions - targets) / targets <= threshold).float()
            total_correct += correct_predictions.sum().item()
            total_samples += len(targets)

    avg_loss = total_loss / len(data_loader)
    rmse = np.sqrt(avg_loss)
    accuracy = (total_correct / total_samples) * 100

    return rmse, accuracy

test_rmse, test_accuracy = evaluate_model(model, test_loader, criterion)
print(f"Test RMSE: {test_rmse}")
print(f"Test Accuracy: {test_accuracy}%")

train_rmse, train_accuracy = evaluate_model(model, train_loader, criterion)
print(f"Train RMSE: {train_rmse}")
print(f"Train Accuracy: {train_accuracy}%")


24/11/14 10:03:55 WARN DAGScheduler: Broadcasting large task binary with size 22.8 MiB
                                                                                

Epoch [1/20], Loss: 30.030838017372318
Epoch [2/20], Loss: 1.3464040856654336
Epoch [3/20], Loss: 1.255457040420994
Epoch [4/20], Loss: 1.1651119395474159
Epoch [5/20], Loss: 1.0923448641152278
Epoch [6/20], Loss: 0.999741376697147
Epoch [7/20], Loss: 0.9361867991677166
Epoch [8/20], Loss: 0.8392286824375174
Epoch [9/20], Loss: 0.8030362333513235
Epoch [10/20], Loss: 0.7571272055489303
Epoch [11/20], Loss: 0.7096649416679304
Epoch [12/20], Loss: 0.6861014712302789
Epoch [13/20], Loss: 0.6642446117317009
Epoch [14/20], Loss: 0.6206975517445528
Epoch [15/20], Loss: 0.6093787457386073
Epoch [16/20], Loss: 0.5677181321701574
Epoch [17/20], Loss: 0.5594481528167887
Epoch [18/20], Loss: 0.5466648838290188
Epoch [19/20], Loss: 0.5200288699545564
Epoch [20/20], Loss: 0.5166397211107895
Test RMSE: 1.2684226837496366
Test Accuracy: 99.9733820062362%
Train RMSE: 0.9943061919845065
Train Accuracy: 99.97338175319182%


24/11/14 11:37:27 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 935280 ms exceeds timeout 120000 ms
24/11/14 11:37:27 WARN SparkContext: Killing executors is not supported by current scheduler.
24/11/14 11:37:30 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

PyTorch Convolutional Neural Network Hyperparameter optimization

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt

class CNNModel(nn.Module):
    def __init__(self, input_dim, conv1_out=32, conv2_out=64):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=conv1_out, kernel_size=3)
        self.conv2 = nn.Conv1d(in_channels=conv1_out, out_channels=conv2_out, kernel_size=3)
        self.fc1 = nn.Linear(conv2_out * (input_dim - 4), 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

df_pandas = df_maleFIFA_scaled.select("scaled_features", "overall").toPandas()
features = np.array(df_pandas['scaled_features'].to_list())
labels = np.array(df_pandas['overall'])

X = torch.tensor(features, dtype=torch.float32)
y = torch.tensor(labels, dtype=torch.float32)

train_size = int(0.8 * len(X))
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

train_data = TensorDataset(X_train, y_train)
test_data = TensorDataset(X_test, y_test)

batch_sizes = [32, 64]
conv1_out_channels = [16, 32]
conv2_out_channels = [32, 64]
learning_rate = 0.001
num_epochs = 20

best_model = None
best_loss = float('inf')
best_params = {}
losses_per_config = {}

for batch_size in batch_sizes:
    for conv1_out in conv1_out_channels:
        for conv2_out in conv2_out_channels:
            print(f"Starting optimization with batch_size={batch_size}, conv1_out={conv1_out}, conv2_out={conv2_out}")

            model = CNNModel(input_dim=X_train.shape[1], conv1_out=conv1_out, conv2_out=conv2_out)
            criterion = nn.MSELoss()
            optimizer = optim.Adam(model.parameters(), lr=learning_rate)

            train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
            epoch_losses = []

            for epoch in range(num_epochs):
                model.train()
                running_loss = 0.0
                for inputs, targets in train_loader:
                    optimizer.zero_grad()
                    outputs = model(inputs)
                    loss = criterion(outputs.squeeze(), targets)
                    loss.backward()
                    optimizer.step()
                    running_loss += loss.item()
                avg_loss = running_loss / len(train_loader)
                epoch_losses.append(avg_loss)

            losses_per_config[(batch_size, conv1_out, conv2_out)] = epoch_losses

            if avg_loss < best_loss:
                best_loss = avg_loss
                best_model = model
                best_params = {
                    'batch_size': batch_size,
                    'conv1_out_channels': conv1_out,
                    'conv2_out_channels': conv2_out
                }

            print(f"Finished optimization with batch_size={batch_size}, conv1_out={conv1_out}, conv2_out={conv2_out}, avg_loss={avg_loss}")

print("Best Hyperparameters:", best_params)
print("Best Training Loss:", best_loss)

plt.figure(figsize=(12, 8))
for config, losses in losses_per_config.items():
    plt.plot(range(1, num_epochs + 1), losses, label=f"batch_size={config[0]}, conv1_out={config[1]}, conv2_out={config[2]}")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss over Epochs for Each Hyperparameter Configuration")
plt.legend()
plt.show()

def evaluate_model(model, data_loader, criterion):
    model.eval()
    total_loss = 0.0
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for inputs, targets in data_loader:
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), targets)
            total_loss += loss.item()
            all_preds.append(outputs.squeeze())
            all_targets.append(targets)

    all_preds = torch.cat(all_preds)
    all_targets = torch.cat(all_targets)
    rmse = torch.sqrt(torch.tensor(total_loss / len(data_loader)))
    correct = (torch.abs(all_preds - all_targets) / all_targets <= 0.1).float()
    accuracy = correct.mean().item() * 100

    return rmse.item(), accuracy

test_loader = DataLoader(test_data, batch_size=best_params['batch_size'], shuffle=False)
test_rmse, test_accuracy = evaluate_model(best_model, test_loader, criterion)
print(f"Test RMSE: {test_rmse}")
print(f"Test Accuracy: {test_accuracy}%")

train_loader = DataLoader(train_data, batch_size=best_params['batch_size'], shuffle=True)
train_rmse, train_accuracy = evaluate_model(best_model, train_loader, criterion)
print(f"Train RMSE: {train_rmse}")
print(f"Train Accuracy: {train_accuracy}%")


'\nimport torch\nimport torch.nn as nn\nimport torch.optim as optim\nfrom torch.utils.data import DataLoader, TensorDataset\nimport numpy as np\nimport matplotlib.pyplot as plt\n\nclass CNNModel(nn.Module):\n    def __init__(self, input_dim, conv1_out=32, conv2_out=64):\n        super(CNNModel, self).__init__()\n        self.conv1 = nn.Conv1d(in_channels=1, out_channels=conv1_out, kernel_size=3)\n        self.conv2 = nn.Conv1d(in_channels=conv1_out, out_channels=conv2_out, kernel_size=3)\n        self.fc1 = nn.Linear(conv2_out * (input_dim - 4), 128)\n        self.relu = nn.ReLU()\n        self.fc2 = nn.Linear(128, 1)\n\n    def forward(self, x):\n        x = x.unsqueeze(1)\n        x = self.relu(self.conv1(x))\n        x = self.relu(self.conv2(x))\n        x = x.view(x.size(0), -1)\n        x = self.relu(self.fc1(x))\n        x = self.fc2(x)\n        return x\n\ndf_pandas = df_maleFIFA_scaled.select("scaled_features", "overall").toPandas()\nfeatures = np.array(df_pandas[\'scaled_featu

Task- IV Deploy your code to the Cloud: (10% of the course grade)
• Run a version of your code for the three tasks above on the cloud.
• In this version, you may skip the creation of the Database on the cloud (i.e. on
the cloud version, you don’t need to write data to table for simplicity). You may
ingest the data from CSVs directly.
• If you run the PostgreSQL on the cloud: you will receive 10% extra-credit.