# Course Project 
*Option 1*
Daren Yao & Jinsong Yuan

## TASK I


In [1]:
# Uncomment the following lines if you are using Windows!
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
 .set('spark.driver.host','127.0.0.1')\
 .setAppName('SPARK-POSTGRES')\
 .setMaster("local[*]")\
 .set("spark.jars", "C:\spark\spark-3.5.2-bin-hadoop3\postgresql-42.7.4.jar")

# need to change the postgresql path 

# Create Spark Context with the new configurations rather than relying on the default 
sc = SparkContext.getOrCreate(conf=conf)
# You need to create SQL Context to conduct some database operations like what we will
sqlContext = SQLContext(sc)
# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()



In [2]:
from pyspark.sql.functions import col, when, isnull, count, lit, avg, sum, isnan, desc, row_number, expr, regexp_extract
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType

In [3]:
# DO this everytime reopening the ipynb
# change the username, password, table name.
db_properties={}
db_properties['username']="postgres"  
#db_properties['password']="YYAAOOdaren200@"
db_properties['password']="714179"
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['table']="fifa.fifa_uncleaned"   # remember to change the table name 
db_properties['driver']="org.postgresql.Driver"

#### A. Reading the data

In [4]:
df_list=[]
for i in range(15,23,1):
    df_list.append(spark.read.csv(r"./data/players_{}.csv".format(i),header=True, inferSchema= True))

In [5]:
df_list_female=[]
for i in range(16,23,1):
    df_list_female.append(spark.read.csv(r"./data/female_players_{}.csv".format(i),header=True, inferSchema= True))

#### B. Add new column

In [6]:
for i in range(15,23,1):
    df_list[i-15]=df_list[i-15].withColumn('year',lit(2000+i))
    df_list[i-15]=df_list[i-15].withColumn('gender',lit(1))

In [7]:
for i in range(16,23,1):
    df_list_female[i-16]=df_list_female[i-16].withColumn('year',lit(2000+i))
    df_list_female[i-16]=df_list_female[i-16].withColumn('gender',lit(0))

#### C. Merge the data

In [8]:
# union function needs to be used when two df have the same structure
def union_df(df_list,df_list_female):
    df_union=df_list[0]
    for i in range(15,22,1):
        df_union=df_union.union(df_list[i-14])
    for i in range(16,23,1):
        df_union=df_union.union(df_list_female[i-16])
    return df_union

In [9]:
# check if the dfs have the same structure
len(df_list[0].columns)

112

In [10]:
len(df_list_female[0].columns)

112

In [11]:
df_union=union_df(df_list,df_list_female)

In [12]:
df_union.summary().show()

+-------+-----------------+--------------------+----------+--------------------+----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+------------------+-------------+------------------+--------------------+-----------+-------------------------+-----------------+----------------+-----------------+---------------+--------------------+--------------+------------------+------------------+------------------------+-------------+-----------+---------+--------------------+-----------+-------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+--------------------------+-----------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------------------+------------

In [13]:
df_union.groupBy('year').count().show()

+----+-----+
|year|count|
+----+-----+
|2015|16155|
|2016|15871|
|2017|17895|
|2018|18271|
|2019|18384|
|2020|18828|
|2021|19289|
|2022|19630|
+----+-----+



In [14]:
# saving the data count for each year
# number_of_data[0] --> row(2015,16155)
number_of_data=df_union.groupBy('year').count().collect()
number_of_data[0][1]

16155

In [15]:
df_union.groupBy('gender').count().show()

+------+------+
|gender| count|
+------+------+
|     1|142079|
|     0|  2244|
+------+------+



In [16]:
df_union.groupBy('gender').agg(count('club_team_id').alias('club_team_id_count')).show()

+------+------------------+
|gender|club_team_id_count|
+------+------------------+
|     1|            140449|
|     0|                 0|
+------+------------------+



In [17]:
df_union.select('club_team_id').distinct().count()

1017

In [18]:
#overwrite it to PostgresSQL database
df_union.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

#### D. Data Cleaning

##### D.1 _Dropping columns with high nullValue ratio_

In [19]:
# read back the df
df_uncleaned  = sqlContext.read.format("jdbc")\
 .option("url", db_properties['url'])\
 .option("dbtable", db_properties['table'])\
 .option("user", db_properties['username'])\
 .option("password", db_properties['password'])\
 .option("Driver", db_properties['driver'])\
 .load()

In [20]:
missing_values = ['NA', 'NULL', 'NAN', 'NaN', 'na', 'null', 'nan', '']
def replace_missing_values(df, missing_values):
    for c in df.columns:
        df = df.withColumn(c, when(col(c).isin(missing_values), None).otherwise(col(c)))
    return df
# Transferring all the nullValues into None
df_null_transferred = replace_missing_values(df_uncleaned, missing_values)

In [21]:
# Function to calculate the null ratio

def show_null_ratio(df):
    total_count = df.count()
    columns = df.columns
    null_count_expr = [
        count(when(col(c).isNull() , c)).alias(f'{c}_null_count') for c in columns
    ]
    
    null_counts_df = df.agg(*null_count_expr)
    
    # Calculate the percentage of null values in each column
    for c in columns:
        null_counts_df = null_counts_df.withColumn(c, col(f'{c}_null_count') / total_count)
        null_counts_df=null_counts_df.drop(f'{c}_null_count')
    
    null_counts_df.show(vertical=True)
    # return the null_ratio df for further use
    return null_counts_df

In [22]:
df_uncleaned.printSchema

<bound method DataFrame.printSchema of DataFrame[sofifa_id: int, player_url: string, short_name: string, long_name: string, player_positions: string, overall: int, potential: int, value_eur: string, wage_eur: string, age: int, dob: date, height_cm: int, weight_kg: int, club_team_id: string, club_name: string, league_name: string, league_level: string, club_position: string, club_jersey_number: string, club_loaned_from: string, club_joined: string, club_contract_valid_until: string, nationality_id: int, nationality_name: string, nation_team_id: double, nation_position: string, nation_jersey_number: int, preferred_foot: string, weak_foot: int, skill_moves: int, international_reputation: int, work_rate: string, body_type: string, real_face: string, release_clause_eur: string, player_tags: string, player_traits: string, pace: int, shooting: int, passing: int, dribbling: int, defending: int, physic: int, attacking_crossing: int, attacking_finishing: int, attacking_heading_accuracy: int, att

In [23]:
# A table showing the null ratio of each columns vertically
null_ratio_df=show_null_ratio(df_null_transferred)

-RECORD 0-------------------------------------------
 sofifa_id                   | 0.0                  
 player_url                  | 0.0                  
 short_name                  | 0.0                  
 long_name                   | 0.0                  
 player_positions            | 0.0                  
 overall                     | 0.0                  
 potential                   | 0.0                  
 value_eur                   | 0.028373855864969547 
 wage_eur                    | 0.026468407668909323 
 age                         | 0.0                  
 dob                         | 0.0                  
 height_cm                   | 0.0                  
 weight_kg                   | 0.0                  
 club_team_id                | 0.02684256840559024  
 club_name                   | 0.02684256840559024  
 league_name                 | 0.02684256840559024  
 league_level                | 0.029510195880074554 
 club_position               | 0.0268425684055

In [24]:
#getting a list of ratios
ratio_row = null_ratio_df.collect()[0]
col_toomuchnull = [null_ratio_df.columns[i] for i in range(len(ratio_row)) if ratio_row[i] > 0.5]
print(col_toomuchnull)
# now we know the columns with nullValue ratio over 50%.

['club_loaned_from', 'nation_team_id', 'nation_position', 'nation_jersey_number', 'player_tags', 'player_traits', 'goalkeeping_speed', 'nation_logo_url']


In [25]:
# dropping the columns with high nullValue ratio
df_afterdrop=df_null_transferred
for c in col_toomuchnull:
    df_afterdrop=df_afterdrop.drop(c)

Besides, all of the url colums are useless

In [26]:
columns_to_drop = [col_name for col_name in df_afterdrop.columns if 'url' in col_name]

# drop the columns that contain url
df_afterdrop = df_afterdrop.drop(*columns_to_drop)

##### D.2 _Checking the dtypes_

In [27]:
# Getting a list of the rest null cols
col_ratio = show_null_ratio(df_afterdrop).collect()[0]

-RECORD 0-------------------------------------------
 sofifa_id                   | 0.0                  
 short_name                  | 0.0                  
 long_name                   | 0.0                  
 player_positions            | 0.0                  
 overall                     | 0.0                  
 potential                   | 0.0                  
 value_eur                   | 0.028373855864969547 
 wage_eur                    | 0.026468407668909323 
 age                         | 0.0                  
 dob                         | 0.0                  
 height_cm                   | 0.0                  
 weight_kg                   | 0.0                  
 club_team_id                | 0.02684256840559024  
 club_name                   | 0.02684256840559024  
 league_name                 | 0.02684256840559024  
 league_level                | 0.029510195880074554 
 club_position               | 0.02684256840559024  
 club_jersey_number          | 0.0268425684055

In [28]:
col_have_null = [df_afterdrop.columns[i] for i in range(len(col_ratio)) if col_ratio[i] > 0]
print(col_have_null)
# now we know the columns that have null value.

['value_eur', 'wage_eur', 'club_team_id', 'club_name', 'league_name', 'league_level', 'club_position', 'club_jersey_number', 'club_joined', 'club_contract_valid_until', 'release_clause_eur', 'pace', 'shooting', 'passing', 'dribbling', 'defending', 'physic', 'mentality_composure']


In [29]:
df_afterdrop.select(col_have_null).show(3)

+---------+--------+------------+----------+------------------+------------+-------------+------------------+-----------+-------------------------+------------------+----+--------+-------+---------+---------+------+-------------------+
|value_eur|wage_eur|club_team_id| club_name|       league_name|league_level|club_position|club_jersey_number|club_joined|club_contract_valid_until|release_clause_eur|pace|shooting|passing|dribbling|defending|physic|mentality_composure|
+---------+--------+------------+----------+------------------+------------+-------------+------------------+-----------+-------------------------+------------------+----+--------+-------+---------+---------+------+-------------------+
|  45000.0|  2000.0|       361.0| Stevenage|English League Two|           4|          RES|                31| 2016-08-12|                     2019|             98000|  65|      49|     37|       47|       15|    45|                 43|
|  60000.0|  1000.0|      1446.0|AC Horsens|  Danish Sup

In [30]:
# checking the true dtype (because there's a lot of NULL in this column, I'm not sure if it's a int)
df_afterdrop.groupby('mentality_composure').count().orderBy(col('count').desc()).show(truncate=False)

+-------------------+-----+
|mentality_composure|count|
+-------------------+-----+
|NULL               |32026|
|60                 |4248 |
|65                 |4160 |
|58                 |4038 |
|55                 |3996 |
|62                 |3966 |
|64                 |3705 |
|59                 |3642 |
|63                 |3498 |
|68                 |3381 |
|66                 |3348 |
|57                 |3323 |
|56                 |3255 |
|67                 |3237 |
|61                 |3165 |
|70                 |3017 |
|52                 |2878 |
|54                 |2756 |
|50                 |2749 |
|69                 |2648 |
+-------------------+-----+
only showing top 20 rows



In [31]:
df_afterdrop.select(col_have_null).dtypes

[('value_eur', 'string'),
 ('wage_eur', 'string'),
 ('club_team_id', 'string'),
 ('club_name', 'string'),
 ('league_name', 'string'),
 ('league_level', 'string'),
 ('club_position', 'string'),
 ('club_jersey_number', 'string'),
 ('club_joined', 'string'),
 ('club_contract_valid_until', 'string'),
 ('release_clause_eur', 'string'),
 ('pace', 'int'),
 ('shooting', 'int'),
 ('passing', 'int'),
 ('dribbling', 'int'),
 ('defending', 'int'),
 ('physic', 'int'),
 ('mentality_composure', 'string')]

_Some of the columns that have null value has a wrong dtype_

In [32]:
col_int=['value_eur','wage_eur','club_team_id','league_level','club_jersey_number','club_contract_valid_until','release_clause_eur','pace','shooting','passing','dribbling','defending','physic','mentality_composure']

In [33]:
# Transferring the data type
def cast_columns(df, columns_to_cast):
    for col_name in columns_to_cast:
        df = df.withColumn(col_name, df[col_name].cast(IntegerType()))
    return df

df_transferType = cast_columns(df_afterdrop, col_int)

In [34]:
df_transferType.describe().show()

+-------+-----------------+----------+--------------------+----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+------------------+-------------+------------------+-----------+-------------------------+-----------------+----------------+--------------+------------------+------------------+------------------------+-------------+-----------+---------+-------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+--------------------------+-----------------------+-----------------+------------------+------------------+------------------+------------------+------------------+---------------------+---------------------+------------------+------------------+------------------+------------------+-----------------+------------------+--

Except the columns that has nullValue, some other columns are inferred incorrectly

In [35]:
def add_schema_row(df, num_rows=5):
    # Get column names and data types
    schema_info = [(field.name, str(field.dataType)) for field in df.schema.fields]
    columns = [name for name, dtype in schema_info]
    data_types = [dtype for name, dtype in schema_info]
    
    # Create a row containing the data type
    schema_row = Row(*data_types)
    schema_df = spark.createDataFrame([schema_row], columns)
    
    # Getting the first few rows of data
    data_df = df.limit(num_rows)
    
    combined_df = schema_df.union(data_df)
    
    return combined_df

# Creating a df to easily check whether the data type is inferred correctly.
combined_df = add_schema_row(df_transferType, num_rows=5)
combined_df.show(truncate=False)

# I used GPT for constructing the code to show the df with the data type[1].
# [1]OpenAI. (2024). *ChatGPT* [Large language model]. https://chat.openai.com/


+-------------+------------+--------------+----------------+-------------+-------------+-------------+-------------+-------------+----------+-------------+-------------+-------------+----------------+---------------------------+-------------+-------------+------------------+------------+-------------------------+--------------+-------------------+--------------+-------------+-------------+------------------------+-------------+----------------+------------+------------------+-------------+-------------+-------------+-------------+-------------+-------------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-------------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+-

There are several potential problems with the datatype. Further work could be done when optimizing different ML models. Among them, the position rating (i.e. ls          |st          |rs          |lw .....) should be IntegerType() to represent the players capability of different roles.

In [36]:
# dob is a DataType(), maybe could also be changed into Timestamp? 
# club_joined could be changed to timestamp (now it's StringType)
# body_type is like Normal (170-) ,Normal (170-185). Maybe it could also be changed to integers?

In [37]:
# The Position Ratings has a format of n+3, which means the boost a player can gain in the FIFA game. 
# For example, player performance can be influenced by team chemistry and player morale, leading to temporary boosts.
# However, it got inferred as StringType(), which isn't helpful for ML modeling.
# Therefore, I transferred it into IntegerType():
position_columns = [
    'ls', 'st', 'rs', 'lw', 'lf', 'cf', 'rf', 'rw',
    'lam', 'cam', 'ram', 'lm', 'lcm', 'cm', 'rcm',
    'rm', 'lwb', 'ldm', 'cdm', 'rdm', 'rwb', 'lb',
    'lcb', 'cb', 'rcb', 'rb', 'gk'
]
# Pyspark has a built-in funcion for this : imported before (regexp_extract)

# Split the string and calculate the sum (by overwriting the original columns)
# colums have scenarios that players position ratings are N+m, N-m or N
# Pattern to extract the base number (N)
pattern_base = r'^(\d+)'

# Pattern to extract the modifier with its sign (+M or -M)
pattern_modifier = r'([+-]\d+)$'

df_position_calculated=df_transferType
for col_name in position_columns:
    # Extract the base rating
    base = regexp_extract(col(col_name), pattern_base, 1).cast(IntegerType())
    
    # Extract the modifier (if any), including the sign
    modifier = regexp_extract(col(col_name), pattern_modifier, 1).cast(IntegerType())
    
    # Compute the effective rating
    df_position_calculated = df_position_calculated.withColumn(
        col_name,
        base + when(modifier.isNotNull(), modifier).otherwise(0)
    )

# I used GPT for constructing the code. Also, GPT inspired me to use regexp_extract[2].
# [2]OpenAI. (2024). *ChatGPT* [Large language model]. https://chat.openai.com/

In [38]:
df_position_calculated.select(position_columns).show()
df_transferType.select(position_columns).show()

+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| ls| st| rs| lw| lf| cf| rf| rw|lam|cam|ram| lm|lcm| cm|rcm| rm|lwb|ldm|cdm|rdm|rwb| lb|lcb| cb|rcb| rb| gk|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 50| 50| 50| 48| 48| 48| 48| 48| 48| 48| 48| 47| 40| 40| 40| 47| 33| 30| 30| 30| 33| 32| 28| 28| 28| 32| 17|
| 39| 39| 39| 43| 40| 40| 40| 43| 42| 42| 42| 45| 41| 41| 41| 45| 49| 45| 45| 45| 49| 50| 48| 48| 48| 50| 14|
| 50| 50| 50| 53| 52| 52| 52| 53| 54| 54| 54| 54| 49| 49| 49| 54| 39| 36| 36| 36| 39| 35| 28| 28| 28| 35| 15|
| 44| 44| 44| 44| 44| 44| 44| 44| 47| 47| 47| 47| 48| 48| 48| 47| 48| 50| 50| 50| 48| 48| 50| 50| 50| 48| 14|
| 19| 19| 19| 18| 18| 18| 18| 18| 20| 20| 20| 20| 21| 21| 21| 20| 19| 21| 21| 21| 19| 19| 20| 20| 20| 19| 50|
| 18| 18| 18| 16| 16| 16| 16| 16| 18| 18| 18| 17| 18| 18| 18| 17| 16| 18| 18| 18| 16| 16| 20| 20| 20| 16| 50|
| 19| 19| 

##### D.3 _Null Value Handling_

D.3.a _Female players_

All of the female players have a lot of null columns. Therefore, female players' data should be seperately treated since treating the whole data with imputation may cause some unexpected bias. 


In [39]:
df_female=df_position_calculated.filter(col('gender')==0)
df_male=df_position_calculated.filter(col('gender')==1)

In [40]:
#getting a list of the rest null cols
col_ratio_female = show_null_ratio(df_female).collect()[0]

-RECORD 0------------------------------------------
 sofifa_id                   | 0.0                 
 short_name                  | 0.0                 
 long_name                   | 0.0                 
 player_positions            | 0.0                 
 overall                     | 0.0                 
 potential                   | 0.0                 
 value_eur                   | 0.9808377896613191  
 wage_eur                    | 0.9795008912655971  
 age                         | 0.0                 
 dob                         | 0.0                 
 height_cm                   | 0.0                 
 weight_kg                   | 0.0                 
 club_team_id                | 1.0                 
 club_name                   | 1.0                 
 league_name                 | 1.0                 
 league_level                | 1.0                 
 club_position               | 1.0                 
 club_jersey_number          | 1.0                 
 club_joined

In [41]:
# showing the lists that female players has a very high ratio of nullValue
col_have_null = [df_female.columns[i] for i in range(len(col_ratio_female)) if col_ratio_female[i]>0.9]
print(col_have_null)

['value_eur', 'wage_eur', 'club_team_id', 'club_name', 'league_name', 'league_level', 'club_position', 'club_jersey_number', 'club_joined', 'club_contract_valid_until', 'release_clause_eur']


In [42]:
col_female_high_null=['value_eur', 'wage_eur', 'club_team_id', 'club_name', 'league_name', 'league_level', 'club_position', 'club_jersey_number', 'club_joined', 'club_contract_valid_until', 'release_clause_eur']

In [43]:
df_female.select(col_female_high_null).printSchema()

root
 |-- value_eur: integer (nullable = true)
 |-- wage_eur: integer (nullable = true)
 |-- club_team_id: integer (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_joined: string (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- release_clause_eur: integer (nullable = true)



In [44]:
# filling all the null string columns with Unknown
df_female = df_female.fillna({'club_name': 'Unknown','league_name': 'Unknown','club_position': 'Unknown','club_joined': 'Unknown'})

- Although it may cause bias without seperately treating null columns, for most of the integer columns in the female df, there isn't much other data for correspondence.
- So we still have to put them back together

In [45]:
df_all_gender=df_male.union(df_female)

D.3.b All players

In [46]:
schema = df_all_gender.schema

# obtaining the string cols
string_columns = [field.name for field in schema.fields if isinstance(field.dataType, StringType)]

# obtaining the numeric cols
numeric_columns = [field.name for field in schema.fields if isinstance(field.dataType, (IntegerType, FloatType, DoubleType))]


In [47]:
# Finding cols with nullValues in StringTypes
string_columns_with_null = []
for col_name in string_columns:
    null_count = df_all_gender.filter(isnull(col(col_name))).count()
    if null_count > 0:
        string_columns_with_null.append(col_name)


In [48]:
for col_name in string_columns_with_null:
    # It seems that there isn't a built-in function to compute mode
    mode_df = df_all_gender.groupBy(col_name).agg(count(col_name).alias('count')).orderBy(desc('count'))
    mode = mode_df.first()[0]
    
    # fill it
    df_all_gender = df_all_gender.fillna({col_name: mode})

In [49]:
df_string_filled=df_all_gender

In [50]:
# Finding cols with nullValues in numeric columns
numeric_columns_with_null = []
for col_name in numeric_columns:
    null_count = df_all_gender.filter(isnull(col(col_name))).count()
    if null_count > 0:
        numeric_columns_with_null.append(col_name)

In [51]:
# obtain median for integers
for col_name in numeric_columns_with_null:
    median = df_string_filled.approxQuantile(col_name, [0.5], 0.001)[0]
    df_string_filled = df_string_filled.fillna({col_name: median})

In [52]:
df_filled=df_string_filled

In [53]:
df_filled.show(4)

+---------+------------+--------------------+----------------+-------+---------+---------+--------+---+----------+---------+---------+------------+--------------------+--------------------+------------+-------------+------------------+-----------+-------------------------+--------------+----------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+-------------------+---------------------------+------

In [54]:
df_filled.show(1,vertical=True)

-RECORD 0-------------------------------------------
 sofifa_id                   | 227290               
 short_name                  | M. Dmitrović         
 long_name                   | Marko Dmitrović      
 player_positions            | GK                   
 overall                     | 72                   
 potential                   | 78                   
 value_eur                   | 2500000              
 wage_eur                    | 6000                 
 age                         | 24                   
 dob                         | 1992-01-24           
 height_cm                   | 194                  
 weight_kg                   | 90                   
 club_team_id                | 100831               
 club_name                   | AD Alcorcón          
 league_name                 | Spanish Segunda D... 
 league_level                | 2                    
 club_position               | GK                   
 club_jersey_number          | 1              

In [55]:
# Checking whether there are any nullValue

'''missing_values_count = [
    sum(
        when(isnull(c), 1).otherwise(0)
    ).alias(c)
    for c in df_filled.columns
]

df_filled.select(missing_values_count).show()
'''

'missing_values_count = [\n    sum(\n        when(isnull(c), 1).otherwise(0)\n    ).alias(c)\n    for c in df_filled.columns\n]\n\ndf_filled.select(missing_values_count).show()\n'

In [56]:
df_cleaned=df_filled

#### E. Data Ingestion

In [57]:
#overwrite it to PostgresSQL database
df_cleaned.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", 'fifa.fifa_cleaned')\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

In [58]:
db_properties['table']='fifa.fifa_cleaned'

In [59]:
df_read = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

df_read.show(1, vertical=True)

-RECORD 0-------------------------------------------
 sofifa_id                   | 227290               
 short_name                  | M. Dmitrović         
 long_name                   | Marko Dmitrović      
 player_positions            | GK                   
 overall                     | 72                   
 potential                   | 78                   
 value_eur                   | 2500000              
 wage_eur                    | 6000                 
 age                         | 24                   
 dob                         | 1992-01-24           
 height_cm                   | 194                  
 weight_kg                   | 90                   
 club_team_id                | 100831               
 club_name                   | AD Alcorcón          
 league_name                 | Spanish Segunda D... 
 league_level                | 2                    
 club_position               | GK                   
 club_jersey_number          | 1              

In [60]:
x = 2015  
y = 5   
z = 2018

In [61]:
from pyspark.sql import functions as F
df_filter=df_read.filter((df_read['year']==x)&(df_read['club_contract_valid_until']>=z))
df_agg=df_filter.groupBy('club_name').agg(F.count('sofifa_id').alias('player_count'))
df_club_order=df_agg.orderBy(F.desc('player_count')).limit(y)
df_club_order.select('club_name').show()
 
                                          

+----------------+
|       club_name|
+----------------+
|         Unknown|
|  Fortaleza CEIF|
|   CD Huachipato|
| Boyacá Chicó FC|
|Rionegro Águilas|
+----------------+



In [62]:
from pyspark.sql import Window
df_agg=df_read.groupBy('year','nationality_name').agg(F.count('sofifa_id').alias('players_count'))
windows=Window.partitionBy('year')
windows_desc=windows.orderBy(F.desc('players_count'))
df_year_nation_rank=df_agg.withColumn('rank',F.row_number().over(windows_desc))
df_rank1=df_year_nation_rank.filter(F.col('rank')==1).select('year', 'nationality_name', 'players_count').show()

+----+----------------+-------------+
|year|nationality_name|players_count|
+----+----------------+-------------+
|2015|         England|         1627|
|2016|         England|         1540|
|2017|         England|         1650|
|2018|         England|         1656|
|2019|         England|         1648|
|2020|         England|         1693|
|2021|         England|         1708|
|2022|         England|         1742|
+----+----------------+-------------+



In [63]:
df=df_read

In [64]:
import sys
from pyspark.sql.functions import avg, desc,asc, col
x=5
y=2015
sequence='desc'
if x<=0:
    print('error')
    sys.exit()
df_year=df.filter(df['year']==y)
df_club=df_year.groupBy('club_name')
df_avg=df_club.agg(avg('age').alias('avg_age'))
if sequence=='desc':
    df_seq=df_avg.orderBy(desc('avg_age'))
elif sequence=='asc':
    df_seq=df_avg.orderBy(asc('avg_age'))


In [65]:
df_rows=df_seq.limit(x).collect()
last_row=df_rows[-1]
df_same=df_seq.filter(col('avg_age')==last_row['avg_age']).collect()

df_other=[row for row in df_rows if row['avg_age']!=last_row['avg_age']]
df_final=df_other+df_same
df_final

[Row(club_name='Cruz Azul', avg_age=28.071428571428573),
 Row(club_name='Arsenal Tula', avg_age=28.04),
 Row(club_name='Podbeskidzie Bielsko-Biała', avg_age=27.962962962962962),
 Row(club_name='Fenerbahçe SK', avg_age=27.88),
 Row(club_name='Leones Negros de la UdeG', avg_age=27.79310344827586)]