# Project - Option 1

## Task 1

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import numpy as np
import os
import sys
import pandas as pd
import seaborn as sb
import matplotlib.pyplot as plt
from functools import reduce
from sklearn.linear_model import LinearRegression
from pyspark.ml.regression import LinearRegression as spark_LR, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
appName = "Big Data Analytics"
master = "local[*]"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
 .set('spark.driver.host','127.0.0.1')\
   .setAppName(appName)\
    .setMaster(master)

spark = SparkSession.builder.config(conf = conf).getOrCreate()

In [3]:
db_properties={}
db_properties['username']="postgres"
db_properties['password']="systems"
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['table']="fifa.fifa"
db_properties['driver']="org.postgresql.Driver"

In [4]:
df_male = spark.read.csv('./Data/players_15.csv', header = True)
combined_df = df_male.withColumn("year", lit(2015))
combined_df = combined_df.withColumn("record_id", monotonically_increasing_id()).select("record_id", *combined_df.columns)
combined_df = combined_df.withColumn("gender", lit("Male"))
folder_path = "./Data"
for file_name in os.listdir(folder_path):
    if file_name == "players_15.csv":
        continue
    year = "20" + file_name[-6:-4]
    file_path = os.path.join(folder_path, file_name)
    df_read = spark.read.csv(file_path, header = True)
    df_read = df_read.withColumn("year", lit(int(year)))
    df_read = df_read.withColumn("record_id", monotonically_increasing_id()).select("record_id", *df_read.columns)
    if "female" in file_name:
        df_read = df_read.withColumn("gender", lit("Female"))
    else:
        df_read = df_read.withColumn("gender", lit("Male"))
    combined_df = combined_df.union(df_read)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\Jainam\anaconda3\envs\cmu18763\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\Jainam\anaconda3\envs\cmu18763\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\Jainam\anaconda3\envs\cmu18763\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [16]:
# Write to PostgreSQL
combined_df.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

                                                                                

In [4]:
# Read from PostgreSQL to verify
df_from_postgres = spark.read \
    .format("jdbc") \
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

In [None]:
df_from_postgres.columns

['record_id',
 'sofifa_id',
 'player_url',
 'short_name',
 'long_name',
 'player_positions',
 'overall',
 'potential',
 'value_eur',
 'wage_eur',
 'age',
 'dob',
 'height_cm',
 'weight_kg',
 'club_team_id',
 'club_name',
 'league_name',
 'league_level',
 'club_position',
 'club_jersey_number',
 'club_loaned_from',
 'club_joined',
 'club_contract_valid_until',
 'nationality_id',
 'nationality_name',
 'nation_team_id',
 'nation_position',
 'nation_jersey_number',
 'preferred_foot',
 'weak_foot',
 'skill_moves',
 'international_reputation',
 'work_rate',
 'body_type',
 'real_face',
 'release_clause_eur',
 'player_tags',
 'player_traits',
 'pace',
 'shooting',
 'passing',
 'dribbling',
 'defending',
 'physic',
 'attacking_crossing',
 'attacking_finishing',
 'attacking_heading_accuracy',
 'attacking_short_passing',
 'attacking_volleys',
 'skill_dribbling',
 'skill_curve',
 'skill_fk_accuracy',
 'skill_long_passing',
 'skill_ball_control',
 'movement_acceleration',
 'movement_sprint_speed',


In [6]:
df_from_postgres.count()

144323

## Task 2

In [4]:
def read_from_spark(spark, db_properties):
    df_from_postgres = spark.read \
        .format("jdbc") \
        .option("url", db_properties['url'])\
        .option("dbtable", db_properties['table'])\
        .option("user", db_properties['username'])\
        .option("password", db_properties['password'])\
        .option("Driver", db_properties['driver'])\
        .load()
    df = df_from_postgres.filter(df_from_postgres["gender"] == "Male")
    return df

In [8]:
def get_top_clubs_with_contracts_ending(spark, db_properties, X, Y, Z):
    df = read_from_spark(spark, db_properties)
    df_filtered = df.filter(col("year") == X)
    df_expiring = df_filtered.filter(col("club_contract_valid_until").cast("int") >= Z)
    result = df_expiring.groupBy("club_name") \
        .count() \
        .orderBy(col("count").desc()) \
        .limit(Y)
    return result.collect()

In [9]:
def find_clubs_by_average_age(spark, db_properties, X, Y, highest=True):
    if X <= 0:
        return "X must be a positive integer"
    if Y < 2015 or Y > 2022:
        return "Y must be a year between 2015 and 2022 inclusively"
    df = read_from_spark(spark, db_properties)
    
    # Filter data for specified year Y
    df_filtered = df.filter(col("year") == Y)
    avg_age_per_club = df_filtered.groupBy("club_name") \
        .agg(round(avg("age").cast("float"),2).alias("average_age"))
    if highest:
        sorted_clubs = avg_age_per_club.orderBy(desc("average_age"))
    else:
        sorted_clubs = avg_age_per_club.orderBy(asc("average_age"))

    top_clubs = sorted_clubs.limit(X)
    last_club = top_clubs.collect()[-1]
    threshold_age = last_club["average_age"]
    if highest:
        result_clubs = sorted_clubs.filter(col("average_age") >= threshold_age).collect()
    else:
        result_clubs = sorted_clubs.filter(col("average_age") <= threshold_age).collect()
    return result_clubs

In [10]:
def get_most_popular_nationality(spark, db_properties):
    df = read_from_spark(spark, db_properties)
    # df_filtered = df.filter((col("year") >= 2015) & (col("year") <= 2022))
    nationality_counts = df.groupBy("year", "nationality_name") \
        .agg(count("*").alias("count"))
    # Create a window partitioned by year and ordered by count descending
    window = Window.partitionBy("year").orderBy(desc("count"))
    
    # Add row number within each year partition
    ranked_nationalities = nationality_counts.withColumn("rank", row_number().over(window))
    # Filter for the top nationality for each year
    most_popular_nationalities = ranked_nationalities.filter(col("rank") == 1) \
        .select("year", "nationality_name", "count") \
        .orderBy("year")
    
    return most_popular_nationalities.collect()

## Task-2.1

In [11]:
top_clubs = get_top_clubs_with_contracts_ending(spark=spark, db_properties=db_properties, X=2021, Y=10, Z=2023)

In [12]:
top_clubs

[Row(club_name='GwangJu FC', count=28),
 Row(club_name='Zamora Fútbol Club', count=27),
 Row(club_name='Club Plaza de Deportes Colonia', count=27),
 Row(club_name='Gangwon FC', count=26),
 Row(club_name='Sociedad Deportiva Aucas', count=26),
 Row(club_name='SL Benfica', count=26),
 Row(club_name='Club Atlético Nacional Potosí', count=26),
 Row(club_name='Club Deportivo El Nacional', count=26),
 Row(club_name='Busan IPark', count=26),
 Row(club_name='Club Social, Cultural y Deportivo de Blooming', count=25)]

<h1>Task-2.2</h1>

In [13]:
clubs_by_age = find_clubs_by_average_age(spark=spark, db_properties=db_properties, X=10, Y=2017, highest=False)

In [14]:
clubs_by_age

[Row(club_name='Sevilla Atlético', average_age=19.920000076293945),
 Row(club_name='Swindon Town', average_age=21.3700008392334),
 Row(club_name='CD Huachipato', average_age=21.40999984741211),
 Row(club_name='FC Nordsjælland', average_age=21.40999984741211),
 Row(club_name='FC Twente', average_age=21.59000015258789),
 Row(club_name='Envigado FC', average_age=21.610000610351562),
 Row(club_name='KRC Genk', average_age=21.6299991607666),
 Row(club_name='Crewe Alexandra', average_age=21.81999969482422),
 Row(club_name='Barnsley', average_age=21.8700008392334),
 Row(club_name='Ajax', average_age=21.969999313354492)]

<h1>Task-2.3</h1>

In [15]:
popular_nationalities = get_most_popular_nationality(spark=spark, db_properties=db_properties)

In [16]:
popular_nationalities

[Row(year=2015, nationality_name='England', count=1627),
 Row(year=2016, nationality_name='England', count=1519),
 Row(year=2017, nationality_name='England', count=1627),
 Row(year=2018, nationality_name='England', count=1633),
 Row(year=2019, nationality_name='England', count=1625),
 Row(year=2020, nationality_name='England', count=1670),
 Row(year=2021, nationality_name='England', count=1685),
 Row(year=2022, nationality_name='England', count=1719)]

# Task III

## Preprocessing

Refer to preprocessing.ipynb for detailed investigation on correlations and outliers

In [86]:
#Setup column categories
col_names = ['record_id',
 'sofifa_id',
 'player_url',
 'short_name',
 'long_name',
 'player_positions',
 'overall',
 'potential',
 'value_eur',
 'wage_eur',
 'age',
 'dob',
 'height_cm',
 'weight_kg',
 'club_team_id',
 'club_name',
 'league_name',
 'league_level',
 'club_position',
 'club_jersey_number',
 'club_loaned_from',
 'club_joined',
 'club_contract_valid_until',
 'nationality_id',
 'nationality_name',
 'nation_team_id',
 'nation_position',
 'nation_jersey_number',
 'preferred_foot',
 'weak_foot',
 'skill_moves',
 'international_reputation',
 'work_rate',
 'body_type',
 'real_face',
 'release_clause_eur',
 'player_tags',
 'player_traits',
 'pace',
 'shooting',
 'passing',
 'dribbling',
 'defending',
 'physic',
 'attacking_crossing',
 'attacking_finishing',
 'attacking_heading_accuracy',
 'attacking_short_passing',
 'attacking_volleys',
 'skill_dribbling',
 'skill_curve',
 'skill_fk_accuracy',
 'skill_long_passing',
 'skill_ball_control',
 'movement_acceleration',
 'movement_sprint_speed',
 'movement_agility',
 'movement_reactions',
 'movement_balance',
 'power_shot_power',
 'power_jumping',
 'power_stamina',
 'power_strength',
 'power_long_shots',
 'mentality_aggression',
 'mentality_interceptions',
 'mentality_positioning',
 'mentality_vision',
 'mentality_penalties',
 'mentality_composure',
 'defending_marking_awareness',
 'defending_standing_tackle',
 'defending_sliding_tackle',
 'goalkeeping_diving',
 'goalkeeping_handling',
 'goalkeeping_kicking',
 'goalkeeping_positioning',
 'goalkeeping_reflexes',
 'goalkeeping_speed',
 'ls',
 'st',
 'rs',
 'lw',
 'lf',
 'cf',
 'rf',
 'rw',
 'lam',
 'cam',
 'ram',
 'lm',
 'lcm',
 'cm',
 'rcm',
 'rm',
 'lwb',
 'ldm',
 'cdm',
 'rdm',
 'rwb',
 'lb',
 'lcb',
 'cb',
 'rcb',
 'rb',
 'gk',
 'player_face_url',
 'club_logo_url',
 'club_flag_url',
 'nation_logo_url',
 'nation_flag_url',
 'year',
 'gender']

ordinal_cols = ['work_rate']
nominal_cols = ['body_type']
continuous_cols = ['potential', 'value_eur', 'wage_eur', 'age', 'height_cm', 'weight_kg',
                   'weak_foot', 'skill_moves', 'international_reputation',
                   'attacking_crossing','attacking_finishing', 'attacking_heading_accuracy', 'attacking_short_passing', 'attacking_volleys',
                   'movement_sprint_speed', 'movement_agility', 'movement_reactions', 'movement_balance',
                   'power_shot_power', 'power_jumping', 'power_stamina', 'power_strength', 'power_long_shots',
                   'mentality_aggression', 'mentality_interceptions', 'mentality_positioning', 'mentality_vision', 'mentality_penalties',
                   'defending_marking_awareness', 'defending_standing_tackle', 'defending_sliding_tackle']

main_traits_cols = ['pace', 'shooting', 'passing', 'dribbling', 'defending', 'physic', 
                            'goalkeeping_diving', 'goalkeeping_handling', 'goalkeeping_kicking',
                            'goalkeeping_positioning', 'goalkeeping_reflexes', 'goalkeeping_speed']

skill_cols = ['skill_dribbling', 'skill_curve', 'skill_fk_accuracy', 'skill_long_passing', 'skill_ball_control']

position_cols = ['ls', 'st', 'rs', 'lw', 'lf', 'cf', 'rf', 'rw', 'lam', 'cam', 'ram', 'lm', 'lcm', 'cm',
                   'rcm', 'rm', 'lwb', 'ldm', 'cdm', 'rdm', 'rwb', 'lb', 'lcb', 'cb', 'rcb', 'rb', 'gk']

needed_cols = ['record_id', 'player_positions'] #needed for preprocessing but will be dropped

columns_to_drop = ['sofifa_id', 'player_url', 'short_name', 'long_name',
                   'dob', 'club_name', 'league_name', 'club_position', 'club_jersey_number', 'club_loaned_from',
                   'club_joined','club_contract_valid_until', 'nationality_id', 'nationality_name', 'nation_team_id',
                   'nation_position', 'nation_jersey_number', 'preferred_foot', 'real_face', 'player_tags', 'player_traits',
                   'player_face_url', 'club_logo_url', 'club_flag_url', 'nation_logo_url', 'nation_flag_url',
                   'year', 'gender', 'league_level']

null_cols_drop = ['release_clause_eur']

corr_cols_drop = ['movement_acceleration']

dropped_null_rows = ['value_eur', 'wage_eur', 'trait6']

imputed_cols = ['mentality_composure', 'club_team_id']

In [83]:
class NullImputer(Transformer):
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset.withColumn("club_team_id_imputed", when(col("club_team_id").isNull(), -1).otherwise(col("club_team_id")))
        return output_df
    
class MLImputer(Transformer):
    def __init__(self):
        super().__init__()
    
    def _transform(self, dataset):
        s = dataset.select(['record_id','mentality_composure', 'mentality_aggression', 'mentality_interceptions', 'mentality_positioning', 'mentality_vision', 'mentality_penalties']).toPandas()
       
        # Prepare the data
        train_data = s.dropna(subset=['mentality_composure'])
        X = train_data[['mentality_aggression', 'mentality_interceptions', 'mentality_positioning', 'mentality_vision', 'mentality_penalties']]
        Y = train_data['mentality_composure']

        # Fit the linear regression model
        model = LinearRegression()
        model.fit(X, Y)

        # Predict missing values
        X_missing = s[s['mentality_composure'].isnull()][['mentality_aggression', 'mentality_interceptions', 'mentality_positioning', 'mentality_vision', 'mentality_penalties']]
        predicted_values = model.predict(X_missing)
        rounded_predicted_values = np.round(predicted_values).astype(int)
        # Impute the predicted values
        s.loc[s['mentality_composure'].isnull(), 'mentality_composure'] = rounded_predicted_values
        s = s.rename(columns = {'mentality_composure': 'mentality_composure_imputed'})
        spark_s = spark.createDataFrame(s[['record_id','mentality_composure_imputed']])
        spark_df = dataset.join(spark_s, on = 'record_id', how = 'left')
        return spark_df
    
class AverageSkillCreator(Transformer):
    def __init__(self, average_skills_cols = None):
        super().__init__()
        self.average_skills_cols = average_skills_cols

    def _transform(self, dataset):
        spark_df = dataset
        spark_df = spark_df.withColumn(
        'corr_av_skills', 
        aggregate(
            array(*[col(pos) for pos in self.average_skills_cols]),
            lit(0.0),
            lambda acc, x: acc + x,
            lambda acc: acc / size(array(*[col(pos) for pos in self.average_skills_cols]))
        )
        )

        return spark_df
        
class AveragePositionCreator(Transformer):
    def __init__(self, position_cols = None):
        super().__init__()
        self.position_cols = position_cols

    # Define UDF for safe evaluation
    def safe_eval(self, x):
        try:
            return float(eval(str(x)))
        except:
            return None
        
    def _transform(self, dataset):
        attacking_positions = ['ls', 'st', 'rs', 'lw', 'lf', 'cf', 'rf', 'rw',
                           'lam', 'cam', 'ram']
        midfield_positions = ['lam', 'cam', 'ram', 'lm', 'lcm', 'cm', 'rcm', 'rm',
                            'ldm', 'cdm', 'rdm']
        defensive_positions = ['rwb', 'lb', 'lcb', 'cb', 'lwb', 'rcb', 'rb',
                            'ldm', 'cdm', 'rdm']
        gk_positions = ['gk']

        spark_df = dataset
        safe_eval_udf = udf(self.safe_eval, FloatType())

        #Apply safe_eval to each position
        for pos in position_cols:
            spark_df = spark_df.withColumn(pos, safe_eval_udf(pos))

         #Add new columns 'average_val_attacking', 'average_val_midfield', 'average_val_defensive', 'average_val_gk'
        spark_df = spark_df.withColumn(
            'average_val_attacking', 
            aggregate(
                array(*[col(pos) for pos in attacking_positions]),
                lit(0.0),
                lambda acc, x: acc + x,
                lambda acc: acc / size(array(*[col(pos) for pos in attacking_positions]))
            )
        )
        spark_df = spark_df.withColumn(
            'average_val_midfield', 
            aggregate(
                array(*[col(pos) for pos in midfield_positions]),
                lit(0.0),
                lambda acc, x: acc + x,
                lambda acc: acc / size(array(*[col(pos) for pos in midfield_positions]))
            )
        )
        spark_df = spark_df.withColumn(
            'average_val_defensive', 
            aggregate(
                array(*[col(pos) for pos in defensive_positions]),
                lit(0.0),
                lambda acc, x: acc + x,
                lambda acc: acc / size(array(*[col(pos) for pos in defensive_positions]))
            )
        )
        spark_df = spark_df.withColumn(
            'average_val_gk', 
            aggregate(
                array(*[col(pos) for pos in gk_positions]),
                lit(0.0),
                lambda acc, x: acc + x,
                lambda acc: acc / size(array(*[col(pos) for pos in gk_positions]))
            )
        )

        return spark_df

def assign_traits(player_positions, *traits):
        #Trait1 = pace/goalkeeping_diving
        #Trait2 = shooting/goalkeeping_handling
        #Trait3 = passing/goalkeeping_kicking
        #Trait4 = dribbling/goalkeeping_positioning
        #Trait5 = defending/goalkeeping_reflexes
        #Trait6 = physic/goalkeeping_speed

        traits = [float(t) if t is not None else None for t in traits]
        if player_positions and 'GK' in player_positions:
            return traits[6:12]
        else:
            return traits[:6]

class MergeMainTraits(Transformer):
    def __init__(self, main_traits_cols = None):
        super().__init__()
        self.main_traits_cols = main_traits_cols
        
    def _transform(self, dataset):
        output_df = dataset
        assign_traits_udf = udf(assign_traits, ArrayType(FloatType()))
        output_df = output_df.withColumn('traits', 
                                       assign_traits_udf(col('player_positions'), *[col(trait) for trait in self.main_traits_cols]))
        for i in range(1, 7):
            output_df = output_df.withColumn(f'trait{i}', col('traits').getItem(i-1))
        
        output_df = output_df.drop('traits')
        return output_df
    
class DropOutliers(Transformer):
    def __init__(self, continuous_cols = None):
        super().__init__()
        self.continuous_cols = continuous_cols

    def column_add(self, a,b):
        return  a.__add__(b)
    
    def find_outliers(self, df, continuous_cols):
        # Identifying the numerical columns in a spark dataframe

        # Using the `for` loop to create new columns by identifying the outliers for each feature
        for column in continuous_cols:

            less_Q1 = 'less_Q1_{}'.format(column)
            more_Q3 = 'more_Q3_{}'.format(column)
            Q1 = 'Q1_{}'.format(column)
            Q3 = 'Q3_{}'.format(column)

            # Q1 : First Quartile ., Q3 : Third Quartile
            Q1 = df.approxQuantile(column,[0.25],relativeError=0)
            Q3 = df.approxQuantile(column,[0.75],relativeError=0)
            
            # IQR : Inter Quantile Range
            # We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
            # Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
            IQR = Q3[0] - Q1[0]
            
            #selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
            less_Q1 =  Q1[0] - 1.5*IQR
            more_Q3 =  Q3[0] + 1.5*IQR
            
            isOutlierCol = 'is_outlier_{}'.format(column)
            
            df = df.withColumn(isOutlierCol,when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
        

        # Selecting the specific columns which we have added above, to check if there are any outliers
        selected_columns = [column for column in df.columns if column.startswith("is_outlier")]
        # Adding all the outlier columns into a new colum "total_outliers", to see the total number of outliers
        df = df.withColumn('total_outliers',reduce(self.column_add, ( df[col] for col in  selected_columns)))

        # Dropping the extra columns created above, just to create nice dataframe., without extra columns
        df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])

        return df

    def _transform(self, dataset):
        df_outliers = self.find_outliers(dataset, continuous_cols)
        df_no_outliers = df_outliers.filter(df_outliers['total_outliers']<=6) #Drop all rows with more than 6 outliers
        df_no_outliers = df_no_outliers.drop("total_outliers")
        return df_no_outliers    
     
class DropNullRows(Transformer):
    def __init__(self, dropped_null_rows = None):
        super().__init__()
        self.null_cols_drop = dropped_null_rows

    def _transform(self, dataset):
        output_df = dataset.na.drop(subset=self.null_cols_drop)
        return output_df

class FeatureTypeCaster(Transformer): # this transformer will cast the columns as appropriate types  
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset
        for col_name in (continuous_cols + imputed_cols):
            output_df = output_df.withColumn(col_name,col(col_name).cast(DoubleType()))
        return output_df
    
class ColumnDropper(Transformer): # this transformer drops unnecessary columns
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop=columns_to_drop
    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
        return output_df

In [85]:
def get_preprocessing_pipeline():
    #Drop all initial columns that are not needed
    stage_dropper = ColumnDropper(columns_to_drop = columns_to_drop + null_cols_drop + corr_cols_drop)

    # Stage where columns are casted as appropriate types
    stage_typecaster = FeatureTypeCaster()

    #Engineer features
    stage_maintraits = MergeMainTraits(main_traits_cols)
    trait_cols = ['trait1', 'trait2', 'trait3', 'trait4', 'trait5', 'trait6']

    stage_average_positions = AveragePositionCreator(position_cols)
    avg_position_cols = ['average_val_attacking', 'average_val_midfield', 'average_val_defensive', 'average_val_gk']
   
    stage_average_skills = AverageSkillCreator(skill_cols)
    avg_skill_cols = ["corr_av_skills"]

    #Impute features
    imputed_id_cols = [x+"_imputed" for x in imputed_cols]
    stage_imputer = NullImputer()
    stage_ml_imputer = MLImputer()
    
    #Stage where null rows are dropped based on certain features
    stage_null_dropper = DropNullRows(dropped_null_rows)

    #Stage where outlier rows are dropped for continuous features
    stage_outlier_dropper = DropOutliers(continuous_cols)

    # Stage where nominal columns are transformed to index columns using StringIndexer
    nominal_id_cols = [x+"_index" for x in nominal_cols]
    nominal_onehot_cols = [x+"_encoded" for x in nominal_cols]
    stage_nominal_indexer = StringIndexer(inputCols = nominal_cols, outputCols = nominal_id_cols )

    # Stage where the index columns are further transformed using OneHotEncoder
    stage_nominal_onehot_encoder = OneHotEncoder(inputCols=nominal_id_cols, outputCols=nominal_onehot_cols)

    # Stage where ordinal columns are transformed to index columns using StringIndexer
    ordinal_id_cols = [x+"_index" for x in ordinal_cols]
    stage_ordinal_indexer = StringIndexer(inputCols = ordinal_cols, outputCols = ordinal_id_cols )

    # Stage where all relevant features are assembled into a vector (and dropping a few)
    feature_cols = continuous_cols + nominal_onehot_cols + ordinal_id_cols + trait_cols + avg_position_cols + avg_skill_cols + imputed_id_cols
    stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vectorized_features")

    # Stage where we scale the columns
    stage_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')
    
    # Removing all unnecessary columbs, only keeping the 'features' and 'outcome' columns
    stage_column_dropper = ColumnDropper(columns_to_drop = feature_cols + main_traits_cols + 
                                         position_cols + skill_cols + imputed_cols + needed_cols + ['vectorized_features'])
    
    # Connect the columns into a pipeline
    '''
    pipeline = Pipeline(stages=[stage_dropper, stage_typecaster, stage_maintraits, stage_average_positions, stage_average_skills,
                                stage_imputer, stage_ml_imputer, stage_null_dropper, stage_outlier_dropper,
                                stage_nominal_indexer, stage_nominal_onehot_encoder, stage_ordinal_indexer,
                                stage_vector_assembler, stage_scaler, stage_column_dropper])
    '''

    pipeline = Pipeline(stages=[stage_dropper, stage_typecaster, stage_average_skills, stage_imputer, stage_ml_imputer])

    
    return pipeline

In [38]:
df = read_from_spark(spark, db_properties)

In [84]:
pipeline = get_preprocessing_pipeline()
preprocess_model = pipeline.fit(df)
df_clean = preprocess_model.transform(df)
df_clean.show()

Py4JJavaError: An error occurred while calling o10396.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 25.0 failed 1 times, most recent failure: Lost task 2.0 in stage 25.0 (TID 27) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:695)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:660)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:636)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:582)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:541)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 34 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:695)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:660)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:636)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:582)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:541)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 34 more


In [None]:
#80/20 split into Train and Test
train_data, test_data = df_clean.randomSplit([0.8, 0.2], seed=42)

In [None]:
def get_ML_pipeline(ML_model):
    if ML_model == "LR":
        ml = spark_LR(featuresCol='features', labelCol='target')
        
        paramGrid = (ParamGridBuilder()
                .addGrid(ml.regParam, [0.01, 0.5, 2.0])
                .addGrid(ml.maxIter, [1, 5, 10])
                .build())
        
    elif ML_model == "RF":
        ml = RandomForestRegressor(featuresCol='features', labelCol='target')

        paramGrid = (ParamGridBuilder()
                .addGrid(ml.numTrees, [10, 20, 30])
                .addGrid(ml.maxDepth, [5, 10, 15])
                .build())
    
    evaluator = RegressionEvaluator(predictionCol='prediction', 
            labelCol='target', metricName='r2')
    
    stage_ML = CrossValidator(estimator=ml, estimatorParamMaps=paramGrid, 
                           evaluator=evaluator, numFolds=5)
    
    pipeline = Pipeline(stages=[stage_ML])

    return pipeline

In [None]:
ML_model = ["LR", "RF"]

evaluator = RegressionEvaluator(predictionCol='prediction', 
            labelCol='target', metricName='r2')

for model in ML_model:
    ML_pipeline = get_preprocessing_pipeline(model)
    ML_model_pipeline = ML_pipeline.fit(train_data)
    model_df_train = ML_model_pipeline.transform(train_data)
    model_df_test = ML_model_pipeline.transform(test_data)

    r2_train = evaluator.evaluate(model_df_train)
    r2_test = evaluator.evaluate(model_df_test)
    print(f"{model} train r2 score: {r2_train * 100:.2f}%")
    print(f"{model} test r2 score: {r2_test * 100:.2f}%")
