### Importing required packages

In [1]:
import pyspark
from pyspark.sql import SparkSession, SQLContext, DataFrame
from pyspark.sql.functions import isnan, when, count, col, to_timestamp, to_date, lit, udf
from pyspark.ml.feature import Imputer
from pyspark.sql.types import IntegerType

### Display spark dataframes such that they can be scrolled horizontally
reference: https://stackoverflow.com/questions/43427138/pyspark-show-dataframe-as-table-with-horizontal-scroll-in-ipython-notebook



In [2]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Reading Data

In [3]:
appName = "Course Project"
master = "local"

conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1').setAppName(appName).setMaster(master)
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession.builder.getOrCreate()

players_15_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("../data/players_15.csv")
      )

players_16_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("../data/players_16.csv")
      )

players_17_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("../data/players_17.csv")
      )

players_18_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("../data/players_18.csv")
      )

players_19_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("../data/players_19.csv")
      )

players_20_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("../data/players_20.csv")
      )

players_21_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("../data/players_21.csv")
      )

players_22_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("../data/players_22.csv")
      )

                                                                                

### Printing schema to ensure column names match (before the update on kaggle, there was one column with differing name) 

In [4]:
players_15_df.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: string (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable = tru

In [5]:
players_22_df.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: string (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable = tru

### Adding year column to every dataset

In [6]:
players_15_df_year = players_15_df.withColumn('year', lit(2015))
players_16_df_year = players_16_df.withColumn('year', lit(2016))
players_17_df_year = players_17_df.withColumn('year', lit(2017))
players_18_df_year = players_18_df.withColumn('year', lit(2018))
players_19_df_year = players_19_df.withColumn('year', lit(2019))
players_20_df_year = players_20_df.withColumn('year', lit(2020))
players_21_df_year = players_21_df.withColumn('year', lit(2021))
players_22_df_year = players_22_df.withColumn('year', lit(2022))

### Combining datasets

In [7]:
# Get total count 

players_15_df_year.count() + players_16_df_year.count() + players_17_df_year.count() + \
players_18_df_year.count() + players_19_df_year.count() + players_20_df_year.count() + \
players_21_df_year.count() + players_22_df_year.count()


142079

In [8]:
# Combine all datasets using union

players_df_all = players_15_df_year.union(players_16_df_year).union(players_17_df_year).union(players_18_df_year)\
                    .union(players_19_df_year).union(players_20_df_year).union(players_21_df_year).union(players_22_df_year)

In [9]:
# Verify that count matches

players_df_all.count()

142079

In [10]:
players_df_all.show(5)

22/03/25 18:39:37 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---------+--------------------+-----------------+--------------------+----------------+-------+---------+---------+--------+---+----------+---------+---------+------------+-------------------+--------------------+------------+-------------+------------------+----------------+-----------+-------------------------+--------------+----------------+--------------+---------------+--------------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+------------------+--------------------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-

In [11]:
players_df_all.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: string (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable = tru

### Changing datatypes

In [12]:
casted_types_df = (players_df_all.withColumn("fifa_id", players_df_all["sofifa_id"].cast("string")).drop("sofifa_id")
              .withColumn("date_of_birth", to_date(col("dob"), "yyyy-MM-dd")).drop("dob")
              .withColumn("club_team_id_str", players_df_all["club_team_id"].cast("string")).drop("club_team_id")
              .withColumn("club_jersey_number_str", players_df_all["club_jersey_number"].cast("string")).drop("club_jersey_number")
              .withColumn("club_joined_date", to_date(col("club_joined"), "yyyy-MM-dd")).drop("club_joined")
              .withColumn("nationality_id_str", players_df_all["nationality_id"].cast("string")).drop("nationality_id")
              .withColumn("nation_team_id_str", players_df_all["nation_team_id"].cast("string")).drop("nation_team_id")
              .withColumn("nation_jersey_number_str", players_df_all["nation_jersey_number"].cast("string")).drop("nation_jersey_number")
              .withColumn("release_clause_euro", players_df_all["release_clause_eur"].cast("double")).drop("release_clause_eur")   
              .withColumn("mentality_composure_", players_df_all["mentality_composure"].cast("integer")).drop("mentality_composure")   
              .distinct()
           )

In [13]:
casted_types_df.show(5)



+--------------------+-----------+--------------------+----------------+-------+---------+---------+--------+---+---------+---------+--------------------+--------------------+------------+-------------+----------------+-------------------------+----------------+---------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+-----------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+---------------------------+-------------------------+-------------

                                                                                

### Preprocessing rows that requiring adding up values or subtracting values

In [14]:
# define a function to add/subtract a string equation with single operand

def compute_arithmetic(val):
    if '+' in val:
        nums = val.split('+')
        return int(nums[0])+int(nums[1])
    elif '-' in val:
        nums = val.split('-')
        return int(nums[0])-int(nums[1])
    else:
        return int(val)

In [15]:
# define a udf with the above function 

udf_compute_arithmetic = udf(lambda x:compute_arithmetic(x),IntegerType() )

In [16]:
# applying udf to required rows

parsed_df = casted_types_df.withColumn("ls",udf_compute_arithmetic(col("ls")))\
                            .withColumn("st",udf_compute_arithmetic(col("st")))\
                            .withColumn("rs",udf_compute_arithmetic(col("rs")))\
                            .withColumn("lw",udf_compute_arithmetic(col("lw")))\
                            .withColumn("lf",udf_compute_arithmetic(col("lf")))\
                            .withColumn("cf",udf_compute_arithmetic(col("cf")))\
                            .withColumn("rf",udf_compute_arithmetic(col("rf")))\
                            .withColumn("rw",udf_compute_arithmetic(col("rw")))\
                            .withColumn("lam",udf_compute_arithmetic(col("lam")))\
                            .withColumn("cam",udf_compute_arithmetic(col("cam")))\
                            .withColumn("ram",udf_compute_arithmetic(col("ram")))\
                            .withColumn("lm",udf_compute_arithmetic(col("lm")))\
                            .withColumn("lcm",udf_compute_arithmetic(col("lcm")))\
                            .withColumn("cm",udf_compute_arithmetic(col("cm")))\
                            .withColumn("rcm",udf_compute_arithmetic(col("rcm")))\
                            .withColumn("rm",udf_compute_arithmetic(col("rm")))\
                            .withColumn("lwb",udf_compute_arithmetic(col("lwb")))\
                            .withColumn("ldm",udf_compute_arithmetic(col("ldm")))\
                            .withColumn("cdm",udf_compute_arithmetic(col("cdm")))\
                            .withColumn("rdm",udf_compute_arithmetic(col("rdm")))\
                            .withColumn("rwb",udf_compute_arithmetic(col("rwb")))\
                            .withColumn("lb",udf_compute_arithmetic(col("lb")))\
                            .withColumn("lcb",udf_compute_arithmetic(col("lcb")))\
                            .withColumn("cb",udf_compute_arithmetic(col("cb")))\
                            .withColumn("rcb",udf_compute_arithmetic(col("rcb")))\
                            .withColumn("rb",udf_compute_arithmetic(col("rb")))\
                            .withColumn("gk",udf_compute_arithmetic(col("gk")))

In [18]:
parsed_df.show(5)



+--------------------+-----------+--------------------+----------------+-------+---------+---------+--------+---+---------+---------+--------------------+--------------------+------------+-------------+----------------+-------------------------+----------------+---------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+-----------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+---------------------------+-------------------------+-------------

                                                                                

### Imputing values

In [19]:
drop_types_df = parsed_df.drop(col("date_of_birth")).drop(col("club_joined_date"))

drop_types_df.select([count(when(isnan(c), c)).alias(c) for c in drop_types_df.columns]).show()

drop_types_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in drop_types_df.columns]).show()

                                                                                

+----------+----------+---------+----------------+-------+---------+---------+--------+---+---------+---------+---------+-----------+------------+-------------+----------------+-------------------------+----------------+---------------+--------------+---------+-----------+------------------------+---------+---------+---------+-----------+-------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+---------------------------+-------------------------+------------------------+------------------+--------------------+--------



+----------+----------+---------+----------------+-------+---------+---------+--------+---+---------+---------+---------+-----------+------------+-------------+----------------+-------------------------+----------------+---------------+--------------+---------+-----------+------------------------+---------+---------+---------+-----------+-------------+-----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+---------------------------+-------------------------+------------------------+------------------+--------------------+-------

                                                                                

Though some columns like nation_position are missing for ~133k records, I will not be dropping these 
because the reason for this is known. 

Many players only play for clubs and are not part of national teams. This information might be useful 
for analysis on national players.

Fields like league_level aren't being filled because it probably doesn't make sence for players who only play in 
national teams.

In [20]:
# imputing with mean 

imputing_cols = ['value_eur','wage_eur', 'pace', 'shooting', 'passing', \
                                 'dribbling', 'defending', 'physic', 'mentality_composure_']

filled_df = parsed_df.fillna(-1,imputing_cols)

imputer = Imputer (
inputCols=imputing_cols,
outputCols=["{}_imputed".format(c) for c in imputing_cols]
).setStrategy("mean").setMissingValue(-1) 

imputed_df = imputer.fit(filled_df).transform(filled_df)

                                                                                

In [21]:
#set goalkeeping_speed, release_clause_euro to 0

imputed_filled_df = imputed_df.fillna(0,['goalkeeping_speed', 'release_clause_euro'])

In [22]:
imputed_filled_df.show(5)



+--------------------+-----------+--------------------+----------------+-------+---------+---------+--------+---+---------+---------+--------------------+--------------------+------------+-------------+----------------+-------------------------+----------------+---------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+-----------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+---------------------------+-------------------------+-------------

                                                                                Traceback (most recent call last):
  File "/opt/homebrew/Cellar/apache-spark/3.2.0/libexec/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/opt/homebrew/Cellar/apache-spark/3.2.0/libexec/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/homebrew/Cellar/apache-spark/3.2.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/homebrew/Cellar/apache-spark/3.2.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 562, in read_int
    length = stream.read(4)
ConnectionResetError: [Errno 54] Connection reset by peer


In [23]:
drop_types_df = imputed_filled_df.drop(col("date_of_birth")).drop(col("club_joined_date"))

drop_types_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in drop_types_df.columns]).show()



+----------+----------+---------+----------------+-------+---------+---------+--------+---+---------+---------+---------+-----------+------------+-------------+----------------+-------------------------+----------------+---------------+--------------+---------+-----------+------------------------+---------+---------+---------+-----------+-------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+---------------------------+-------------------------+------------------------+------------------+--------------------+--------

                                                                                

In [24]:
players_df_final = imputed_filled_df.drop('value_eur','wage_eur', 'pace', 'shooting', 'passing', \
                                 'dribbling', 'defending', 'physic', 'mentality_composure_')

Imputation with mode is not being done for categorical missing values for national team/club related attributes because it doesn't make logical sense. 

### Writing to postgres

In [25]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
db_properties={}
db_properties['username']="postgres"
db_properties['password']="postgres"
# make sure to use the correct port number. These 
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['driver']="org.postgresql.Driver"

In [26]:
players_df_final.write.format("jdbc")\
.mode("overwrite")\
.option("url", "jdbc:postgresql://localhost:5432/postgres")\
.option("dbtable", "FIFA.PLAYERS")\
.option("user", "postgres")\
.option("password", "postgres")\
.option("Driver", "org.postgresql.Driver")\
.save()

                                                                                