In [69]:
### import libraries
import os 
import pandas as pd
from tqdm import tqdm
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import lit, monotonically_increasing_id

import argparse

####  Help function

In [70]:
# For pandas
# Reducing dataframe memory usage :-
def ReduceMemory(df: pd.DataFrame):
    """
    This function reduces the associated dataframe's memory usage.
    It reassigns the data-types of columns according to their min-max values.
    It also displays the dataframe information after memory reduction.
    """;
    
    # Reducing float column memory usage:-
    for col in tqdm(df.iloc[0:2, 1:].select_dtypes('float').columns):
        col_min = np.amin(df[col].dropna());
        col_max = np.amax(df[col].dropna());
        
        if col_min >= np.finfo(np.float16).min and col_max <= np.finfo(np.float16).max: 
            df[col] = df[col].astype(np.float16)
        elif col_min >= np.finfo(np.float32).min and col_max <= np.finfo(np.float32).max : 
            df[col] = df[col].astype(np.float32)
        else: pass;

    # Reducing integer column memory usage:-
    for col in tqdm(df.iloc[0:2, 1:].select_dtypes('int').columns):
        col_min = df[col].min(); 
        col_max = df[col].max();
        
        if col_min >= np.iinfo(np.int8).min and col_max <= np.iinfo(np.int8).max:
            df[col] = df[col].astype(np.int8);
        elif col_min >= np.iinfo(np.int16).min and col_max <= np.iinfo(np.int16).max:
            df[col] = df[col].astype(np.int16);
        elif col_min >= np.iinfo(np.int32).min & col_max <= np.iinfo(np.int32).max:
            df[col] = df[col].astype(np.int32);
        else: pass;
        
    display(df.info());
    
    return df;

## Read Data

In [71]:
# init a spark session
appName = "Project1"
master = "local"


sc = SparkSession.builder.appName(appName).getOrCreate()
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession.builder.getOrCreate()

In [72]:
# fifa data folder should contain all the csv files from Fifa(Kaggle), 2015-2022
# assume that you are working in the same directory as the data folder


def load_data(input_path, output_path):
    full_data_path = os.path.join(output_path, "full_data.csv")

    if not os.path.exists(full_data_path):
        os.mkdir(output_path)
        data_path = input_path
        if os.path.exists(data_path):
            print("Data folder exists")
        else:
            print("Data folder does not exist")
            os.makedirs(data_path)
            print("Sussessfully created data folder")

        csv_files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.csv')]
        print(csv_files)
        combined_df = None
        for file in csv_files:
            year = file.split("players_")[1].split(".csv")[0]
            df = spark.read.csv(file, header=True, inferSchema=True)
            df = df.withColumn("year", lit(year)) # this is the unique column 'year'
            if combined_df is None:
                combined_df = df
            else:
                combined_df = combined_df.union(df)
        combined_df = combined_df.withColumn("id", monotonically_increasing_id())
        df = combined_df
        # Write the concatenated DataFrame to a new CSV file
        ReduceMemory(combined_df.toPandas()).to_csv(full_data_path)
    else: 
        df = spark.read.csv(full_data_path, header=True, inferSchema=True)
    return df

data_input = "../data/"
data_output = "../data/output/"
data = load_data(data_input, data_output)
if '_c0' in data.columns:
    data = data.drop('_c0')
df = data

In [73]:
df.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: double (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: double (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: date (nullable = true)
 |-- club_contract_valid_until: double (nullable = true)
 |-- nationality_id: integer (nullable = true)
 |--

## Using pyspark to read table and write to PostgreSQL

In [74]:
db_properties={}
db_properties['username']="postgres"
db_properties['password']="010323"
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['table']="fifa"
db_properties['driver']="org.postgresql.Driver"

In [75]:
df.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

                                                                                

In [76]:
df_read = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()
    
df_read.show()

+---------+--------------------+--------------+--------------------+----------------+-------+---------+---------+--------+---+----------+---------+---------+------------+--------------------+--------------------+------------+-------------+------------------+--------------------+-----------+-------------------------+--------------+-------------------+--------------+---------------+--------------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+------------------+-----------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----

                                                                                

In [77]:
df_read.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: double (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: double (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: date (nullable = true)
 |-- club_contract_valid_until: double (nullable = true)
 |-- nationality_id: integer (nullable = true)
 |--

## Task II 

### Question 1
Find X clubs that have the highest number of players with contracts ending in 2023

We find that En Avant de Guingamp have the most players whose contract end in 2023 in FIFA 2022 dataset.

#### In spark SQL

In [78]:
df_read.createOrReplaceTempView("df_view")

In [79]:
sqlWay1 = spark.sql("""
SELECT dv.club_name, COUNT(*) AS player_count
FROM   df_view dv
WHERE  dv.year = 22 AND dv.club_contract_valid_until = 2023
GROUP BY dv.club_name
ORDER BY player_count DESC
LIMIT 10;
                    """)

In [80]:
sqlWay1.show()

+--------------------+------------+
|           club_name|player_count|
+--------------------+------------+
|En Avant de Guingamp|          19|
| Club Atlético Lanús|          17|
|       Lechia Gdańsk|          17|
|            Barnsley|          16|
|        Kasimpaşa SK|          16|
|        Bengaluru FC|          16|
|        FC Barcelona|          15|
|  SV Wehen Wiesbaden|          15|
|          CA Osasuna|          15|
|      Zagłębie Lubin|          15|
+--------------------+------------+



#### In original pyspark

In [81]:
from pyspark.sql import functions as F

def find_top_clubs_expiring_contracts(df, num_clubs):

  top_clubs = (df.filter((F.col('year') == '22') & 
                        (F.col('club_contract_valid_until') == 2023))
               .groupBy('club_team_id', 'club_name')
               .agg(F.count('sofifa_id').alias('num_players'))
               .orderBy(F.desc('num_players'))
               .limit(num_clubs))
               
  top_names = [row.club_name for row in top_clubs.collect()]
  
  return top_clubs, top_names

In [82]:
clubs, names = find_top_clubs_expiring_contracts(df_read, 10)

In [83]:
print(f"{names} Club with highest number of players with contracts ending in 2023 in FIFA 2022")

['En Avant de Guingamp', 'Club Atlético Lanús', 'Lechia Gdańsk', 'Kasimpaşa SK', 'Barnsley', 'Bengaluru FC', 'FC Barcelona', 'Zagłębie Lubin', 'SV Wehen Wiesbaden', 'KAA Gent'] Club with highest number of players with contracts ending in 2023 in FIFA 2022


In [84]:
clubs.show()

+------------+--------------------+-----------+
|club_team_id|           club_name|num_players|
+------------+--------------------+-----------+
|        62.0|En Avant de Guingamp|         19|
|    110395.0| Club Atlético Lanús|         17|
|    111091.0|       Lechia Gdańsk|         17|
|    111339.0|        Kasimpaşa SK|         16|
|      1932.0|            Barnsley|         16|
|    113302.0|        Bengaluru FC|         16|
|       241.0|        FC Barcelona|         15|
|    110749.0|      Zagłębie Lubin|         15|
|       492.0|  SV Wehen Wiesbaden|         15|
|       674.0|            KAA Gent|         15|
+------------+--------------------+-----------+



### Question 2 


#### In spark SQL

In [85]:
sqlWay2 = spark.sql("""
WITH club_counts AS (
  SELECT 
    dv.club_name, 
    dv.year, 
    COUNT(*) AS player_count
  FROM df_view dv
  WHERE dv.age > 27
  GROUP BY dv.club_name, dv.year
),

club_averages AS (
  SELECT 
    cc.club_name,
    AVG(cc.player_count) AS avg_player_count
  FROM club_counts cc
  GROUP BY cc.club_name  
)

SELECT
  ca.club_name,
  ca.avg_player_count AS average_count
FROM club_averages ca
WHERE (
  SELECT COUNT(*) 
  FROM club_averages ca2 
  WHERE ca2.avg_player_count > ca.avg_player_count
) < 10
ORDER BY ca.avg_player_count DESC;
""")

In [86]:
sqlWay2.show()

+--------------------+-------------+
|           club_name|average_count|
+--------------------+-------------+
|                null|       109.25|
|  Dorados de Sinaloa|         19.0|
| Matsumoto Yamaga FC|         19.0|
| Shanghai Shenhua FC|         18.5|
|          Qingdao FC|         18.0|
|Club Deportivo Jo...|         17.5|
|            Altay SK|         17.0|
|         Guaireña FC|         17.0|
|İstanbul Başakşeh...|       16.625|
|      BB Erzurumspor|         16.5|
|        Club Olimpia|         16.5|
|      Sport Huancayo|         16.5|
+--------------------+-------------+



#### In original pyspark

In [87]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def find_top_clubs_by_avg_older_players(df, num_clubs):

  window = Window.orderBy(F.desc('avg_count'))
  df = df.repartition(100)
  top_clubs = (df.filter(F.col('age') > 27)
               .groupBy('year', 'club_team_id', 'club_name')
               .agg(F.count('sofifa_id').alias('count'))
               .groupBy('club_team_id', 'club_name')
               .agg(F.avg('count').alias('avg_count'))
               .withColumn('rank', F.rank().over(window))
               .filter(F.col('rank') <= num_clubs)
               .select('club_team_id', 'club_name'))
               
  top_names = [row.club_name for row in top_clubs.collect()]



  return top_clubs, top_names

In [88]:
clubs, names = find_top_clubs_by_avg_older_players(df_read, 10)

23/11/16 15:34:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 1

In [89]:
clubs.show()

23/11/16 15:34:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 1

+------------+--------------------+
|club_team_id|           club_name|
+------------+--------------------+
|        null|                null|
|    111032.0|  Dorados de Sinaloa|
|    113158.0| Matsumoto Yamaga FC|
|    110955.0| Shanghai Shenhua FC|
|    112961.0|          Qingdao FC|
|    110974.0|Club Deportivo Jo...|
|    101006.0|            Altay SK|
|    114688.0|         Guaireña FC|
|    101014.0|İstanbul Başakşeh...|
|    114511.0|      Sport Huancayo|
|       749.0|      BB Erzurumspor|
|    101108.0|        Club Olimpia|
+------------+--------------------+



23/11/16 15:34:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/16 15:34:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [90]:
print(f"{names} clubs with highest avg players over 27:")

[None, 'Dorados de Sinaloa', 'Matsumoto Yamaga FC', 'Shanghai Shenhua FC', 'Qingdao FC', 'Club Deportivo Jorge Wilstermann', 'Altay SK', 'Guaireña FC', 'İstanbul Başakşehir FK', 'Sport Huancayo', 'BB Erzurumspor', 'Club Olimpia'] clubs with highest avg players over 27:


### Question 3

#### In spark SQL

In [91]:
sqlWay3 = spark.sql("""
WITH yearly_counts AS (
    SELECT dv.year, dv.nation_position, COUNT(*) AS position_count
    FROM df_view dv
    WHERE dv.nation_position IS NOT NULL
    GROUP BY dv.year, dv.nation_position
),
max_counts AS (
    SELECT yc.year, MAX(yc.position_count) AS max_count
    FROM yearly_counts yc
    GROUP BY yc.year
)
SELECT mc.year, yc.nation_position, mc.max_count
FROM max_counts mc
JOIN yearly_counts yc ON mc.year = yc.year AND mc.max_count = yc.position_count
ORDER BY mc.year;

""")

In [92]:
sqlWay3.show()

+----+---------------+---------+
|year|nation_position|max_count|
+----+---------------+---------+
|  15|            SUB|      564|
|  16|            SUB|      511|
|  17|            SUB|      564|
|  18|            SUB|      600|
|  19|            SUB|      576|
|  20|            SUB|      588|
|  21|            SUB|      588|
|  22|            SUB|      396|
+----+---------------+---------+



#### In original pyspark

In [93]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def most_frequent_nation_position(df):

  window = Window.partitionBy('year').orderBy(F.desc('count'))

  top_positions = (df.filter(F.col('nation_position').isNotNull())
                  .groupBy('year', 'nation_position')
                  .agg(F.count('sofifa_id').alias('count'))
                  .withColumn('rank', F.rank().over(window))
                  .filter(F.col('rank') == 1)
                  .select('year', 'nation_position'))

  return top_positions

In [94]:
most_frequent_nation_position(df_read).show()

+----+---------------+
|year|nation_position|
+----+---------------+
|  15|            SUB|
|  16|            SUB|
|  17|            SUB|
|  18|            SUB|
|  19|            SUB|
|  20|            SUB|
|  21|            SUB|
|  22|            SUB|
+----+---------------+

