## Task-I: Build and populate necessary tables (30% of course project grade)

In [1]:
# Uncomment the following lines if you are using Windows
import findspark
findspark.init()
findspark.find()

import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id

spark = SparkSession.builder.appName('fifa').getOrCreate()

In [3]:
file_paths = []
for year in range(15, 23):
    file_path = f"archive/players_{year}.csv"
    file_paths.append(file_path)

In [4]:
from pyspark.sql.functions import *
from functools import reduce
data_frames = []

for file_path in file_paths:
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    # Add a new column for the year.
    year = int(file_path.split("_")[1].split(".")[0])
    df = df.withColumn('year', lit(year))
    
    # for test
    df.show(1, vertical = True)
    
    data_frames.append(df)

merged_df = reduce(lambda x, y: x.union(y), data_frames)

merged_df.show(5, vertical = True)

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

-RECORD 0-------------------------------------------
 sofifa_id                   | 20801                
 player_url                  | https://sofifa.co... 
 short_name                  | Cristiano Ronaldo    
 long_name                   | Cristiano Ronaldo... 
 player_positions            | LW, LM, ST           
 overall                     | 94                   
 potential                   | 94                   
 value_eur                   | 8.7E7                
 wage_eur                    | 575000.0             
 age                         | 31                   
 dob                         | 1985-02-05           
 height_cm                   | 185                  
 weight_kg                   | 80                   
 club_team_id                | 243.0                
 club_name                   | Real Madrid CF       
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | LW             

-RECORD 0-------------------------------------------
 sofifa_id                   | 20801                
 player_url                  | https://sofifa.co... 
 short_name                  | Cristiano Ronaldo    
 long_name                   | Cristiano Ronaldo... 
 player_positions            | ST, LW               
 overall                     | 94                   
 potential                   | 94                   
 value_eur                   | 7.7E7                
 wage_eur                    | 400000.0             
 age                         | 33                   
 dob                         | 1985-02-05           
 height_cm                   | 187                  
 weight_kg                   | 83                   
 club_team_id                | 45.0                 
 club_name                   | Juventus             
 league_name                 | Italian Serie A      
 league_level                | 1                    
 club_position               | ST             

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | RW, ST, CF           
 overall                     | 93                   
 potential                   | 93                   
 value_eur                   | 1.035E8              
 wage_eur                    | 560000.0             
 age                         | 33                   
 dob                         | 1987-06-24           
 height_cm                   | 170                  
 weight_kg                   | 72                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CAM            

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

In [5]:
# Add a column unique_id so that every record can be uniquely identified
merged_df = merged_df.withColumn("unique_id", monotonically_increasing_id())
merged_df.show(5, vertical=True)

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

In [6]:
# Ingest the data from all years (2015-2022) into one Postgres Database table
postgres_url = "jdbc:postgresql://localhost:5432/postgres"
properties = {
    "user": "postgres",
    "password": "123",
    "driver": "org.postgresql.Driver"
}

table_name = "fifa.fifa_table" 

merged_df.write.jdbc(url=postgres_url, table=table_name, mode="overwrite", properties=properties)

## Task-II: Conduct analytics on your dataset (20% of course project grade)

In [7]:
from pyspark.sql.functions import col, when, count, avg, desc, dense_rank, rank, row_number
from pyspark.sql.window import Window

# Read data from PostgreSQL
jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
db_properties = {"user": "postgres", "password": "123", "driver": "org.postgresql.Driver"}
df = spark.read.jdbc(url=jdbc_url, table="fifa.fifa_table", properties=db_properties)

In [9]:
def find_x(x):

    # players whose contracts end in 2023
    filtered_df = df.filter(df.year == 22)
    filtered_df = filtered_df.filter(df.club_contract_valid_until == 2023)

    # Task 1: X clubs with the highest number of players with contracts ending in 2023
    x_clubs = filtered_df.groupBy("club_name").agg(count("club_name").alias("player_count")) \
        .orderBy(desc("player_count")).limit(x)
    print(str(x)+' clubs that have the highest number of players with contracts ending in 2023')
    x_clubs.show()
    
find_x(5)

5 clubs that have the highest number of players with contracts ending in 2023
+--------------------+------------+
|           club_name|player_count|
+--------------------+------------+
|En Avant de Guingamp|          19|
| Club Atlético Lanús|          17|
|       Lechia Gdańsk|          17|
|            Barnsley|          16|
|        Kasimpaşa SK|          16|
+--------------------+------------+



In [10]:
def find_y(y):
    # drop row with club_name is null
    filtered_df = df.filter(col("club_name").isNotNull())
    
    # players whose age > 27
    older_players = filtered_df.filter(col("age") > 27)

    # players whose age > 27 for each club per year
    club_per_year = older_players.groupBy("club_name", "year").agg(count("sofifa_id").alias("older_players_count"))

    # average count per club 
    # Notice: taking into account clubs not available in certain years
    avg_club_counts = club_per_year.groupBy("club_name").agg(avg("older_players_count").alias("average_older_players"))

    # rank clubs based on average count
    window_spec = Window.orderBy(desc("average_older_players"))
    ranked_clubs = avg_club_counts.withColumn("rank", rank().over(window_spec))
    top_y_clubs = ranked_clubs.filter(col("rank") <= y)
    print('Top {} clubs with the highest average number of players older than 27 years:'.format(y))
    top_y_clubs.show()

find_y(5)

Top 5 clubs with the highest average number of players older than 27 years:
+--------------------+---------------------+----+
|           club_name|average_older_players|rank|
+--------------------+---------------------+----+
|  Dorados de Sinaloa|                 19.0|   1|
| Matsumoto Yamaga FC|                 19.0|   1|
| Shanghai Shenhua FC|                 18.5|   3|
|          Qingdao FC|                 18.0|   4|
|Club Deportivo Jo...|                 17.5|   5|
+--------------------+---------------------+----+



In [22]:
def find_frequent_nation_position(substitute):
    # drop row with nation_position is SUB (substitute)
    if substitute:
        filtered_df = df
    else:
        filtered_df = df.filter(col("nation_position") != "SUB")
    
    window_spec = Window.partitionBy("year").orderBy(desc("count"))

    # count nation_position per year
    position_freq = filtered_df.groupBy("year", "nation_position").agg(count("nation_position").alias("count"))
    
    # rank frequency per year
    position_freq_ranked = position_freq.withColumn("rank", rank().over(window_spec))
    most_frequent_positions = position_freq_ranked.filter(col("rank") == 1).select("year", "nation_position", "count").orderBy("year")
    
    print('The most frequent nation_position in the dataset for each year', end = " -- ")
    if(substitute):
        print("with substitute")
    else:
        print("without substitute")
    most_frequent_positions.show(most_frequent_positions.count()) # To display the complete data

find_frequent_nation_position(True)
find_frequent_nation_position(False)

The most frequent nation_position in the dataset for each year -- with substitute
+----+---------------+-----+
|year|nation_position|count|
+----+---------------+-----+
|  15|            SUB|  564|
|  16|            SUB|  511|
|  17|            SUB|  564|
|  18|            SUB|  600|
|  19|            SUB|  576|
|  20|            SUB|  588|
|  21|            SUB|  588|
|  22|            SUB|  396|
+----+---------------+-----+

The most frequent nation_position in the dataset for each year -- without substitute
+----+---------------+-----+
|year|nation_position|count|
+----+---------------+-----+
|  15|            LCB|   47|
|  15|            RCB|   47|
|  15|             GK|   47|
|  16|            RCB|   46|
|  16|             GK|   46|
|  17|             GK|   47|
|  17|            LCB|   47|
|  17|            RCB|   47|
|  18|            LCB|   50|
|  18|             GK|   50|
|  18|            RCB|   50|
|  19|            RCB|   48|
|  19|             GK|   48|
|  19|            LC