In [1]:
import pyspark, os

from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

appName = "Big Data Analytics"
master = "local"

path_to_jar = "/Users/mac/miniforge3/lib/python3.10/site-packages/pyspark/jars/postgresql-42.6.0.jar"
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/adoptopenjdk-11.jdk/Contents/Home'

conf = pyspark.SparkConf()\
    .set('spark.driver.host', '127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)\
    .set("spark.jars", path_to_jar)\
    .set("spark.driver.extraClassPath", path_to_jar)
# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()

23/11/16 21:22:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Task 1:

In [2]:
from pyspark.sql.functions import *
players_15 = spark.read.csv("data/players_15.csv",header=True, inferSchema= True).withColumn("year", lit(2015))
players_16 = spark.read.csv("data/players_16.csv",header=True, inferSchema= True).withColumn("year", lit(2016))
players_17 = spark.read.csv("data/players_17.csv",header=True, inferSchema= True).withColumn("year", lit(2017))
players_18 = spark.read.csv("data/players_18.csv",header=True, inferSchema= True).withColumn("year", lit(2018))
players_19 = spark.read.csv("data/players_19.csv",header=True, inferSchema= True).withColumn("year", lit(2019))
players_20 = spark.read.csv("data/players_20.csv",header=True, inferSchema= True).withColumn("year", lit(2020))
players_21 = spark.read.csv("data/players_21.csv",header=True, inferSchema= True).withColumn("year", lit(2021))
players_22 = spark.read.csv("data/players_22.csv",header=True, inferSchema= True).withColumn("year", lit(2022))

df=players_15.union(players_16).union(players_17).union(players_18).union(players_19).union(players_20).union(players_21).union(players_22)
df=df.withColumn("unique_id", monotonically_increasing_id())
df.show(1,vertical=True)



                                                                                

23/11/16 21:22:23 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24 00:00:00  
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

In [3]:
db_properties={}
#update your db username
db_properties['username']="postgres"
#update your db password
db_properties['password']="wsyt"
#make sure you got the right port number here
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
#make sure you had the Postgres JAR file in the right location
db_properties['driver']="org.postgresql.Driver"
db_properties['table']= "fifa"

df.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()
df = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

df.show(10)


[Stage 18:>                                                         (0 + 1) / 1]

+---------+--------------------+-----------------+--------------------+----------------+-------+---------+---------+--------+---+-------------------+---------+---------+------------+-------------------+--------------------+------------+-------------+------------------+----------------+-------------------+-------------------------+--------------+----------------+--------------+---------------+--------------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+------------------+--------------------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+-----

                                                                                

## Task 2 :

### What are the X clubs that have the highest number of players with contracts ending in 2023?

In [4]:
contracts_2023 = df.filter((df.year == 2022) & (df.club_contract_valid_until == "2023"))
result = contracts_2023.groupby("club_name").agg(count("unique_id").alias("number_of_players"))
result.sort(desc("number_of_players")).show(1)

[Stage 19:>                                                         (0 + 1) / 1]

+--------------------+-----------------+
|           club_name|number_of_players|
+--------------------+-----------------+
|En Avant de Guingamp|               19|
+--------------------+-----------------+
only showing top 1 row



                                                                                

### List the Y clubs with highest average number of players that are older than 27 years across all years

In [5]:
from pyspark.sql.window import Window
def find_older_clubs(df, year, Y):
    older_club=df.filter(df.age > year).groupBy("club_name","year").agg(count("unique_id").alias("number_of_players"))
    avg_older_club=older_club.groupBy("club_name").agg(avg("number_of_players").alias("avg_number"))
    top_Y_clubs = avg_older_club.orderBy(desc("avg_number")).limit(Y)
    min_value_at_Y = top_Y_clubs.agg({"avg_number": "min"}).collect()[0][0]
    result = avg_older_club.filter(col("avg_number") >= min_value_at_Y)
    return result

find_older_clubs(df, 27, 2).show()
## Because we don't have to drop NA values accroding to the piazza, I just kept them. 
## But I also showed the highest average number of players with not-null club.

+-------------------+----------+
|          club_name|avg_number|
+-------------------+----------+
|               null|    109.25|
| Dorados de Sinaloa|      19.0|
|Matsumoto Yamaga FC|      19.0|
+-------------------+----------+



### What is the most frequent nation_position in the dataset for each year?

In [6]:
most_nation_position=df.groupBy("year","nation_position").agg(count("nation_position").alias("count"))
max_counts=most_nation_position.groupBy("year").agg(max("count").alias("max_count"))
result=most_nation_position.join(max_counts,"year").filter(col("count")==col("max_count")).drop("max_count")
result=result.orderBy(desc("count"))
result.show()

+----+---------------+-----+
|year|nation_position|count|
+----+---------------+-----+
|2018|            SUB|  600|
|2021|            SUB|  588|
|2020|            SUB|  588|
|2019|            SUB|  576|
|2017|            SUB|  564|
|2015|            SUB|  564|
|2016|            SUB|  511|
|2022|            SUB|  396|
+----+---------------+-----+



## Task 3:

### Data Clean

In [7]:
df_clean=df.drop('club_name',
 'league_name',
 'dob',
 'player_url',
 'short_name',
 'long_name',
 'player_positions',
 'player_face_url',
 'club_logo_url',
 'club_flag_url',
 'nation_logo_url',
 'nation_flag_url',
 'club_jersey_number',
 'club_loaned_from',
 'nationality_name',
 'nation_team_id',
 'nation_position',
 'nation_jersey_number',
 'release_clause_eur',
 'player_tags',
 'player_traits',
 'mentality_composure',
 'goalkeeping_speed',
 'unique_id',
 'ls',
 'st',
 'rs',
 'lw',
 'lf',
 'cf',
 'rf',
 'rw',
 'lam',
 'cam',
 'ram',
 'lm',
 'lcm',
 'cm',
 'rcm',
 'rm',
 'lwb',
 'ldm',
 'cdm',
 'rdm',
 'rwb',
 'lb',
 'lcb',
 'cb',
 'rcb',
 'rb',
 'gk',
 "club_joined",
 'club_position',
 'sofifa_id',
 'club_team_id',
 'nationality_id'
 )

df_clean=df_clean.dropna(subset=('pace'))

In [8]:
from pyspark.sql.functions import *

null_counts_plays_df = df_clean.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) \
                        for c in df_clean.columns])

null_counts_plays_df.show(truncate=False, vertical=True)



-RECORD 0---------------------------
 overall                     | 0    
 potential                   | 0    
 value_eur                   | 1649 
 wage_eur                    | 1374 
 age                         | 0    
 height_cm                   | 0    
 weight_kg                   | 0    
 league_level                | 1726 
 club_contract_valid_until   | 1382 
 preferred_foot              | 0    
 weak_foot                   | 0    
 skill_moves                 | 0    
 international_reputation    | 0    
 work_rate                   | 0    
 body_type                   | 0    
 real_face                   | 0    
 pace                        | 0    
 shooting                    | 0    
 passing                     | 0    
 dribbling                   | 0    
 defending                   | 0    
 physic                      | 0    
 attacking_crossing          | 0    
 attacking_finishing         | 0    
 attacking_heading_accuracy  | 0    
 attacking_short_passing     | 0    
 

                                                                                

In [9]:
from pyspark.ml.feature import Imputer
# missing_value = "NA"
# df_with_substituted_na = (df_clean\
#     .withColumn('value_eur', \
#                 when(df_clean.value_eur==missing_value,\
#                     regexp_replace(df_clean.value_eur,missing_value,None)) \
#                 .otherwise(df_clean.value_eur))\
#                 )
# df_with_substituted_na = (df_clean\
#     .withColumn('league_level', \
#                 when(df_clean.league_level==missing_value,\
#                     regexp_replace(df_clean.league_level,missing_value,None)) \
#                 .otherwise(df_clean.league_level))\
#                 )

columns_to_be_imputed = ["value_eur",'league_level','wage_eur','club_contract_valid_until']
value_not_in_dataset = -200

# Replace None/Missing Value with a value that can't be present in the dataset.
df_with_filled_na = df_clean.fillna(-200, columns_to_be_imputed)

#Create new columns with imputed values. New columns will be suffixed with "_imputed"
imputer = Imputer (
            inputCols=columns_to_be_imputed,
            outputCols=["{}_imputed".format(c) for c in columns_to_be_imputed])\
            .setStrategy("median").setMissingValue(value_not_in_dataset)

df_imputed = imputer.fit(df_with_filled_na).transform(df_with_filled_na)
# we will drop the old column without imputation. We have only one column to be imputed
df_imputed_enhanced = df_imputed.drop(columns_to_be_imputed[0],columns_to_be_imputed[1],columns_to_be_imputed[2],columns_to_be_imputed[3])
# We will rename our newly imputed column with the correct name
df_fully_imputed = df_imputed_enhanced.withColumnRenamed("value_eur_imputed","value_eur")
df_fully_imputed = df_fully_imputed.withColumnRenamed("league_level_imputed","league_level")
df_fully_imputed = df_fully_imputed.withColumnRenamed("wage_eur_imputed","wage_eur")
df_fully_imputed = df_fully_imputed.withColumnRenamed("club_contract_valid_until_imputed","club_contract_valid_until")

                                                                                

In [10]:
null_counts_plays_df = df_fully_imputed.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) \
                        for c in df_fully_imputed.columns])

null_counts_plays_df.show(truncate=False, vertical=True)

[Stage 47:>                                                         (0 + 1) / 1]

-RECORD 0--------------------------
 overall                     | 0   
 potential                   | 0   
 age                         | 0   
 height_cm                   | 0   
 weight_kg                   | 0   
 preferred_foot              | 0   
 weak_foot                   | 0   
 skill_moves                 | 0   
 international_reputation    | 0   
 work_rate                   | 0   
 body_type                   | 0   
 real_face                   | 0   
 pace                        | 0   
 shooting                    | 0   
 passing                     | 0   
 dribbling                   | 0   
 defending                   | 0   
 physic                      | 0   
 attacking_crossing          | 0   
 attacking_finishing         | 0   
 attacking_heading_accuracy  | 0   
 attacking_short_passing     | 0   
 attacking_volleys           | 0   
 skill_dribbling             | 0   
 skill_curve                 | 0   
 skill_fk_accuracy           | 0   
 skill_long_passing         

                                                                                

In [11]:
df_fully_imputed=df_fully_imputed.withColumn("wage_eur",df_fully_imputed.wage_eur.cast("int")).withColumn("value_eur",df_fully_imputed.value_eur.cast("int"))
df_fully_imputed.printSchema()

root
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- weak_foot: integer (nullable = true)
 |-- skill_moves: integer (nullable = true)
 |-- international_reputation: integer (nullable = true)
 |-- work_rate: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- real_face: string (nullable = true)
 |-- pace: integer (nullable = true)
 |-- shooting: integer (nullable = true)
 |-- passing: integer (nullable = true)
 |-- dribbling: integer (nullable = true)
 |-- defending: integer (nullable = true)
 |-- physic: integer (nullable = true)
 |-- attacking_crossing: integer (nullable = true)
 |-- attacking_finishing: integer (nullable = true)
 |-- attacking_heading_accuracy: integer (nullable = true)
 |-- attacking_short_passing: integer (nullable = true)
 |-- attacking_volleys

In [12]:
df_fully_imputed = df_fully_imputed.withColumn("preferred_foot", when(col("preferred_foot") == "Left", 1).otherwise(0))
df_fully_imputed = df_fully_imputed.withColumn("real_face", when(col("real_face") == "Yes", 1).otherwise(0))

In [13]:
from pyspark.sql.types import *

# Define a Python function that maps strings to ordinal values
def map_work_rate_to_ordinal(work_rate):
    mapping = {'Low': 1, 'Medium': 2, 'High': 3}
    return mapping.get(work_rate, 0)  # Return 0 if the work_rate is not found

# Register the function as a UDF
map_work_rate_to_ordinal_udf = udf(map_work_rate_to_ordinal, IntegerType())

# Split the work_rate column and then apply the mapping to each part
df_features = df_fully_imputed.withColumn("attack_work_rate", map_work_rate_to_ordinal_udf(split(col("work_rate"), "/").getItem(0))) \
                     .withColumn("defense_work_rate", map_work_rate_to_ordinal_udf(split(col("work_rate"), "/").getItem(1)))
df_features = df_features.drop("work_rate")

In [14]:
def standardize_body_type(body_type):
    if 'Lean' in body_type:
        return 'Lean'
    elif 'Stocky' in body_type:
        return 'Stocky'
    elif 'Normal' in body_type:
        return 'Normal'
    else:
        return 'Unique'  # For any type that doesn't fit the above categories

# Register UDF
standardize_body_type_udf = udf(standardize_body_type, StringType())

# Apply the UDF to standardize the body_type column
df_features = df_features.withColumn("body_type", standardize_body_type_udf(col("body_type")))

df_features.select("body_type").distinct().show()

[Stage 50:>                                                         (0 + 1) / 1]

+---------+
|body_type|
+---------+
|   Stocky|
|     Lean|
|   Unique|
|   Normal|
+---------+



                                                                                

In [15]:
df_features.printSchema()

root
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- preferred_foot: integer (nullable = false)
 |-- weak_foot: integer (nullable = true)
 |-- skill_moves: integer (nullable = true)
 |-- international_reputation: integer (nullable = true)
 |-- body_type: string (nullable = true)
 |-- real_face: integer (nullable = false)
 |-- pace: integer (nullable = true)
 |-- shooting: integer (nullable = true)
 |-- passing: integer (nullable = true)
 |-- dribbling: integer (nullable = true)
 |-- defending: integer (nullable = true)
 |-- physic: integer (nullable = true)
 |-- attacking_crossing: integer (nullable = true)
 |-- attacking_finishing: integer (nullable = true)
 |-- attacking_heading_accuracy: integer (nullable = true)
 |-- attacking_short_passing: integer (nullable = true)
 |-- attacking_volleys: integer (nullable = true)
 |-- skil

In [16]:
from functools import reduce

def column_add(a,b):
     return  a.__add__(b)
    
def find_outliers(df):
    # Identifying the numerical columns in a spark dataframe
    numeric_columns = [column[0] for column in df.dtypes if column[1]=="int"]

    # Using the `for` loop to create new columns by identifying the outliers for each feature
    for column in numeric_columns:

        less_Q1 = 'less_Q1_{}'.format(column)
        more_Q3 = 'more_Q3_{}'.format(column)
        Q1 = 'Q1_{}'.format(column)
        Q3 = 'Q3_{}'.format(column)

        # Q1 : First Quartile ., Q3 : Third Quartile
        Q1 = df.approxQuantile(column,[0.25],relativeError=0)
        Q3 = df.approxQuantile(column,[0.75],relativeError=0)
        
        # IQR : Inter Quantile Range
        # We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
        # Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
        IQR = Q3[0] - Q1[0]
        
        #selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
        less_Q1 =  Q1[0] - 1.5*IQR
        more_Q3 =  Q3[0] + 1.5*IQR
        
        isOutlierCol = 'is_outlier_{}'.format(column)
        
        df = df.withColumn(isOutlierCol,when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
    

    # Selecting the specific columns which we have added above, to check if there are any outliers
    selected_columns = [column for column in df.columns if column.startswith("is_outlier")]
    # Adding all the outlier columns into a new colum "total_outliers", to see the total number of outliers
    df = df.withColumn('total_outliers',reduce(column_add, ( df[col] for col in  selected_columns)))

    # Dropping the extra columns created above, just to create nice dataframe., without extra columns
    df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])

    return df

In [17]:
df_with_outlier_handling = find_outliers(df_features)
df_with_outlier_handling.show(1, vertical=True)

[Stage 165:>                                                        (0 + 1) / 1]

-RECORD 0--------------------------------
 overall                     | 93        
 potential                   | 95        
 age                         | 27        
 height_cm                   | 169       
 weight_kg                   | 67        
 preferred_foot              | 1         
 weak_foot                   | 3         
 skill_moves                 | 4         
 international_reputation    | 5         
 body_type                   | Normal    
 real_face                   | 1         
 pace                        | 93        
 shooting                    | 89        
 passing                     | 86        
 dribbling                   | 96        
 defending                   | 27        
 physic                      | 63        
 attacking_crossing          | 84        
 attacking_finishing         | 94        
 attacking_heading_accuracy  | 71        
 attacking_short_passing     | 89        
 attacking_volleys           | 85        
 skill_dribbling             | 96 

                                                                                

In [18]:
df_with_outlier_handling.groupby("total_outliers").count().show()

[Stage 166:>                                                        (0 + 1) / 1]

+--------------+-----+
|total_outliers|count|
+--------------+-----+
|            12|   49|
|             1|39192|
|            13|   30|
|             6| 2619|
|            16|    3|
|             3|15884|
|             5| 5160|
|            15|    2|
|             9|  282|
|            17|    2|
|             4| 8274|
|             8|  580|
|             7| 1173|
|            10|  166|
|            11|   79|
|            14|   26|
|             2|30914|
|             0|21852|
|            18|    1|
+--------------+-----+



                                                                                

In [19]:
df_with_substituted_na_and_outliers = df_with_outlier_handling.\
        filter(df_with_outlier_handling['total_Outliers']<=4)
print(df_features.count(),df_with_substituted_na_and_outliers.count())

[Stage 172:>                                                        (0 + 1) / 1]

126288 116116


                                                                                

In [20]:
import numpy as np
df_with_substituted_na_and_outliers=df_with_substituted_na_and_outliers.drop('total_outliers')
features = [col for col in df_with_substituted_na_and_outliers.columns if col not in ["body_type",'overall']]

# Convert to Pandas DataFrame for correlation computation
correlation_matrix = df_with_substituted_na_and_outliers.select(features).toPandas().corr()
columns_drop = set()
for i in range(len(correlation_matrix.columns)):
    for j in range(i + 1, len(correlation_matrix.columns)):
        corr_value = correlation_matrix.iloc[i, j]
        if np.abs(corr_value) > 0.8:  # Use Python's built-in abs function
            columns_drop.add(correlation_matrix.columns[j])

columns_drop

                                                                                

{'attacking_crossing',
 'attacking_finishing',
 'attacking_short_passing',
 'attacking_volleys',
 'defending_marking_awareness',
 'defending_sliding_tackle',
 'defending_standing_tackle',
 'mentality_interceptions',
 'mentality_positioning',
 'mentality_vision',
 'movement_acceleration',
 'movement_sprint_speed',
 'power_long_shots',
 'power_shot_power',
 'power_strength',
 'skill_ball_control',
 'skill_dribbling',
 'skill_long_passing'}

In [21]:
for c in columns_drop:
    df_with_substituted_na_and_outliers = df_with_substituted_na_and_outliers.drop(c)

In [22]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
stage_1 = StringIndexer(inputCol= 'body_type', outputCol= 'body_type_index')
stage_2 = OneHotEncoder(inputCol= 'body_type_index', outputCol= 'body_type_encoded')
pipeline = Pipeline(stages=[stage_1, stage_2])

# fit the pipeline model and transform the data as defined

pipeline_model = pipeline.fit(df_with_substituted_na_and_outliers)
df_encoded = pipeline_model.transform(df_with_substituted_na_and_outliers)
df_encoded=df_encoded.drop('body_type','body_type_index')
df_encoded.show()

[Stage 179:>                                                        (0 + 1) / 1]

+-------+---------+---+---------+---------+--------------+---------+-----------+------------------------+---------+----+--------+-------+---------+---------+------+--------------------------+-----------+-----------------+----------------+------------------+----------------+-------------+-------------+--------------------+-------------------+------------------+--------------------+-------------------+-----------------------+--------------------+----+---------+------------+--------+-------------------------+----------------+-----------------+-----------------+
|overall|potential|age|height_cm|weight_kg|preferred_foot|weak_foot|skill_moves|international_reputation|real_face|pace|shooting|passing|dribbling|defending|physic|attacking_heading_accuracy|skill_curve|skill_fk_accuracy|movement_agility|movement_reactions|movement_balance|power_jumping|power_stamina|mentality_aggression|mentality_penalties|goalkeeping_diving|goalkeeping_handling|goalkeeping_kicking|goalkeeping_positioning|goalkeep

                                                                                

### Spark Version:

In [23]:
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import *
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

class OutcomeCreater(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    def __init__(self):
        super(OutcomeCreater, self).__init__()

    def _transform(self, df):
        # Renaming the column 'overall' to 'outcome'
        df = df.withColumnRenamed('overall', 'outcome')
        return df

class ColumnDropper(Transformer): # this transformer drops unnecessary columns
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop=columns_to_drop
    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
        return output_df

columns = [column for column in df_encoded.columns if column != 'overall']

vector_assembler = VectorAssembler(inputCols=columns, outputCol="features")
stage_outcome = OutcomeCreater()
stage_column_dropper = ColumnDropper(columns_to_drop = columns)
# Define the pipeline
pipeline = Pipeline(stages=[vector_assembler, stage_outcome,stage_column_dropper])

# Fit the pipeline on the data
pipeline_model = pipeline.fit(df_encoded)

# Transform the data
df_transformed = pipeline_model.transform(df_encoded)

# Select the desired columns and display the DataFrame
df_transformed.printSchema()
df_transformed.show()

root
 |-- outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



[Stage 180:>                                                        (0 + 1) / 1]

+-------+--------------------+
|outcome|            features|
+-------+--------------------+
|     83|[83.0,28.0,170.0,...|
|     83|[83.0,27.0,171.0,...|
|     83|[83.0,29.0,186.0,...|
|     83|[85.0,27.0,188.0,...|
|     83|[84.0,27.0,181.0,...|
|     83|[87.0,26.0,176.0,...|
|     82|[82.0,34.0,183.0,...|
|     82|[82.0,32.0,188.0,...|
|     82|[82.0,30.0,180.0,...|
|     82|[82.0,32.0,175.0,...|
|     82|[84.0,26.0,185.0,...|
|     82|[85.0,27.0,169.0,...|
|     82|[84.0,26.0,187.0,...|
|     82|[83.0,27.0,189.0,...|
|     82|[82.0,29.0,180.0,...|
|     82|[84.0,25.0,193.0,...|
|     82|[84.0,26.0,178.0,...|
|     82|[85.0,27.0,185.0,...|
|     75|[75.0,30.0,189.0,...|
|     82|[87.0,21.0,172.0,...|
+-------+--------------------+
only showing top 20 rows



                                                                                

### LinearRegression (Spark)

In [24]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import *
spark.sparkContext.setLogLevel("ERROR")

train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(labelCol="outcome", featuresCol="features")

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1])  # Regularization
             .addGrid(lr.elasticNetParam, [0.0, 1.0])  # Elastic net mixing
             .addGrid(lr.maxIter, [10, 50])  # Number of iterations
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol="outcome"), numFolds=3)

# Fit the model on training data
cv_model = cv.fit(train_data)  # Assuming trainingData is the training dataset

# Get the best model from cross validation
best_lr_model = cv_model.bestModel
print(f"Coefficients: {best_lr_model.coefficients} Intercept: {best_lr_model.intercept}")
# Apply the best model to test data
test_predictions = best_lr_model.transform(test_data)  # Assuming testData is the test dataset

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="outcome", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(test_predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")


                                                                                

Coefficients: [0.47991383550384736,0.534257778324787,0.0037523409827978542,0.0194747464773174,0.09823776091555862,0.04845610245358452,0.468596996090899,-0.7363019066206801,-0.15282312286561434,0.04829928980890646,0.002763643074050007,0.050015456438580706,0.07143761489644053,0.027396856972187744,0.057335852653977906,0.05341241245725065,-0.003491433909142846,-0.0032754254101197687,-0.0007031329286457859,0.11899721948448333,-0.009006258079080082,0.0013716315423922907,0.014037168210148016,-0.011799212848366138,-0.0011298863623053607,0.009680658709717668,0.009444665805624542,0.012950387910164585,0.008362553967452387,0.010684293594393482,-0.04126073681720642,1.9715348403233892e-07,-0.17215922217124902,2.66581117342262e-05,-0.011424800110923044,-0.2121744819953004,-0.2216535242161276,0.14097672436045208,-0.0012487731484845306,0.2535979231861699] Intercept: 96.53258982972481


[Stage 219:>                                                        (0 + 1) / 1]

Root Mean Squared Error (RMSE) on test data = 1.798313575118126


                                                                                

### DecisionTree (Spark)

In [25]:
spark.sparkContext.setLogLevel("ERROR")
from pyspark.ml.regression import DecisionTreeRegressor

# Initialize the Decision Tree Regressor
dt = DecisionTreeRegressor(labelCol="outcome", featuresCol="features")

# Create the parameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [5, 10])  # Tune maxDepth
             .addGrid(dt.maxBins, [32, 64])  # Tune maxBins
             .build())

# Define the evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="outcome", predictionCol="prediction")

# Set up 3-fold cross-validation
crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Fit the model on the training data
cvModel = crossval.fit(train_data)  # Assuming 'train_data' is the training dataset

# Extract the best model
bestModel = cvModel.bestModel

# Print out the best model's parameters
print(f"Best parameters: MaxDepth - {bestModel.getMaxDepth()}, MaxBins - {bestModel.getMaxBins()}")

# Compute RMSE on test data
predictions = bestModel.transform(test_data)  # Assuming 'test_data' is the test dataset
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")



                                                                                

Best parameters: MaxDepth - 10, MaxBins - 64


[Stage 484:>                                                        (0 + 1) / 1]

Root Mean Squared Error (RMSE) on test data = 0.763074523919655


                                                                                

### Pytorch Version:

In [26]:
import torch  # now import the pytorch module
from torch import nn
import numpy as np 
from torch.utils.data import Dataset, DataLoader

to_array = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))
df_train_pandas = train_data.withColumn('features', to_array('features')).toPandas()
df_test_pandas = test_data.withColumn('features', to_array('features')).toPandas()

x_train = torch.from_numpy(np.array(df_train_pandas['features'].values.tolist(),np.float32))
y_train = torch.from_numpy(np.array(df_train_pandas['outcome'].values.tolist(),np.float32))
x_test = torch.from_numpy(np.array(df_test_pandas['features'].values.tolist(),np.float32))
y_test = torch.from_numpy(np.array(df_test_pandas['outcome'].values.tolist(),np.float32))

                                                                                

In [27]:
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader


class MyDataset(Dataset): 
    def __init__(self,x,y):
        self.x = x
        self.y = y.float()

    def __len__(self):
        return self.x.shape[0]

    def __getitem__(self, idx):
        return (self.x[idx],self.y[idx])
    
train_dataset = MyDataset(x_train,y_train)
test_dataset = MyDataset(x_test,y_test)

In [28]:
batch_size = 64  
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        for features, labels in train_loader:
            # Forward pass
            outputs = model(features)
            loss = criterion(outputs.view(-1), labels)

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


# Evaluation Function
def evaluate_model(model, test_loader, criterion):
    model.eval()
    with torch.no_grad():
        total_loss = 0
        for features, labels in test_loader:
            outputs = model(features)
            loss = criterion(outputs.view(-1), labels)
            total_loss += loss.item()
    return total_loss / len(test_loader)

### ShallowMLP (Pytorch)

In [45]:
input_size = x_train.shape[1]
class ShallowMLP(nn.Module):
    def __init__(self, input_size):
        super(ShallowMLP, self).__init__()
        # self.fc1 = nn.Linear(input_size, 32)  # Single hidden layer
        # self.fc2 = nn.Linear(32, 1)
        self.fc1 = nn.Linear(input_size, 64)  # Increased number of neurons
        self.dropout = nn.Dropout(0.25)      # Added dropout for regularization
        self.fc2 = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)  # Apply dropout
        x = self.fc2(x)
        return x

# Initialize the shallow MLP
shallow_mlp = ShallowMLP(input_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(shallow_mlp.parameters(), lr=0.001) # reduced learning rate
train_model(shallow_mlp, train_loader, criterion, optimizer, num_epochs=10)
rmse_shallow = np.sqrt(evaluate_model(shallow_mlp, test_loader, criterion))
print(f'Shallow MLP RMSE: {rmse_shallow:.4f}')

Epoch [1/10], Loss: 6054.4639
Epoch [2/10], Loss: 253.8969
Epoch [3/10], Loss: 471.9241
Epoch [4/10], Loss: 526.9057
Epoch [5/10], Loss: 538.2729
Epoch [6/10], Loss: 579.7512
Epoch [7/10], Loss: 760.5797
Epoch [8/10], Loss: 213.6251
Epoch [9/10], Loss: 450.1411
Epoch [10/10], Loss: 235.3135
Shallow MLP RMSE: 8.2044


### DeepMLP (Pytorch)

In [44]:
class DeepMLP(nn.Module):
    def __init__(self, input_size):
        super(DeepMLP, self).__init__()
        # self.fc1 = nn.Linear(input_size, 64)
        # self.fc2 = nn.Linear(64, 64)
        # self.fc3 = nn.Linear(64, 32)
        # self.fc4 = nn.Linear(32, 1)
        self.fc1 = nn.Linear(input_size, 128)  # Increased number of neurons
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 64)
        self.dropout = nn.Dropout(0.5)         # Higher dropout for a deeper network
        self.fc4 = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.dropout(x)  # Apply dropout
        x = torch.relu(self.fc3(x))
        x = self.fc4(x)
        return x
    
mlp = DeepMLP(input_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(mlp.parameters(), lr=0.005)

train_model(mlp, train_loader, criterion, optimizer, num_epochs=8)

rmse = np.sqrt(evaluate_model(mlp, test_loader, criterion))
print(f'MLP Model RMSE: {rmse:.4f}')


Epoch [1/8], Loss: 61.9404
Epoch [2/8], Loss: 23.6483
Epoch [3/8], Loss: 35.2200
Epoch [4/8], Loss: 44.9551
Epoch [5/8], Loss: 30.1873
Epoch [6/8], Loss: 50.1474
Epoch [7/8], Loss: 20.4708
Epoch [8/8], Loss: 14.1512
MLP Model RMSE: 6.3689
