# Task I

In [1]:
#IMPORTING ALL THE LIBRARIES 
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, BooleanType
from pyspark.sql.types import StructType, StructField, TimestampType, DateType, StringType
from pyspark.sql.functions import col, trim
from pyspark.sql.functions import unix_timestamp, current_timestamp
from pyspark.sql.functions import lit
from pyspark.sql.functions import when
from pyspark.sql.functions import count, avg, first, year
import seaborn as sb
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
import matplotlib.pyplot as plt
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import monotonically_increasing_id

In [31]:
#DOWNLOADING THE DATASET FROM GITHUB REPOSITORY
!wget "https://github.com/jjanbol/Test-Toolchains/raw/refs/heads/main/Archive.zip"

/bin/bash: wget: command not found


In [32]:
#UNZIPPING FOR DATA INGESTION
!unzip -o Archive.zip

unzip:  cannot find or open Archive.zip, Archive.zip.zip or Archive.zip.ZIP.


In [44]:
#CHECKING THAT FILES WERE UNZIPPED CORRECTLY
!ls

Archive.zip           female_players_20.csv players_19.csv
[34m__MACOSX[m[m              female_players_21.csv players_20.csv
[34marchive[m[m               female_players_22.csv players_21.csv
female_players_16.csv players_15.csv        players_22.csv
female_players_17.csv players_16.csv        postgresql-42.7.4.jar
female_players_18.csv players_17.csv        project.ipynb
female_players_19.csv players_18.csv


In [60]:
# INITIALIZING SPARK SESSION 
jdbc_jar_path = "postgresql-42.7.4.jar"

appName = "Project"
master = "local"

conf = SparkConf() \
    .set('spark.driver.host', '127.0.0.1') \
    .setAppName(appName) \
    .setMaster(master) \
    .set("spark.jars", jdbc_jar_path)

sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession.builder.getOrCreate()

In [61]:
#DEFINING FUNCTION TO CREATE DATAFRAMES

schema = StructType([
    StructField("sofifa_id", IntegerType(), True),  
    StructField("player_url", StringType(), True),  
    StructField("short_name", StringType(), True),  
    StructField("long_name", StringType(), True),  
    StructField("player_positions", StringType(), True), 
    StructField("overall", IntegerType(), True),  
    StructField("potential", IntegerType(), True),
    StructField("value_eur", IntegerType(), True),  
    StructField("wage_eur", IntegerType(), True),  
    StructField("age", IntegerType(), True),  
    StructField("dob", DateType(), True),  
    StructField("height_cm", IntegerType(), True),  
    StructField("weight_kg", IntegerType(), True),  
    StructField("club_team_id", IntegerType(), True),
    StructField("club_name", StringType(), True),  
    StructField("league_name", StringType(), True),
    StructField("league_level", IntegerType(), True), 
    StructField("club_position", StringType(), True),  
    StructField("club_jersey_number", IntegerType(), True),  
    StructField("club_loaned_from", StringType(), True),  
    StructField("club_joined", DateType(), True),  
    StructField("club_contract_valid_until", DateType(), True),  
    StructField("nationality_id", IntegerType(), True),  
    StructField("nationality_name", StringType(), True), 
    StructField("nation_team_id", IntegerType(), True),  
    StructField("nation_position", StringType(), True),  
    StructField("nation_jersey_number", IntegerType(), True),  
    StructField("preferred_foot", StringType(), True),  
    StructField("weak_foot", IntegerType(), True),  
    StructField("skill_moves", IntegerType(), True), 
    StructField("international_reputation", IntegerType(), True),  
    StructField("work_rate", StringType(), True),  
    StructField("body_type", StringType(), True),  
    StructField("real_face", BooleanType(), True),  
    StructField("release_clause_eur", IntegerType(), True),  
    StructField("player_tags", StringType(), True),  
    StructField("player_traits", StringType(), True),  
    StructField("pace", IntegerType(), True),  
    StructField("shooting", IntegerType(), True),
    StructField("passing", IntegerType(), True), 
    StructField("dribbling", IntegerType(), True),
    StructField("defending", IntegerType(), True),  
    StructField("physic", IntegerType(), True),  
    StructField("attacking_crossing", IntegerType(), True),  
    StructField("attacking_finishing", IntegerType(), True),  
    StructField("attacking_heading_accuracy", IntegerType(), True),  
    StructField("attacking_short_passing", IntegerType(), True),  
    StructField("attacking_volleys", IntegerType(), True),  
    StructField("skill_dribbling", IntegerType(), True),  
    StructField("skill_curve", IntegerType(), True),  
    StructField("skill_fk_accuracy", IntegerType(), True),  
    StructField("skill_long_passing", IntegerType(), True),  
    StructField("skill_ball_control", IntegerType(), True),  
    StructField("movement_acceleration", IntegerType(), True),  
    StructField("movement_sprint_speed", IntegerType(), True),  
    StructField("movement_agility", IntegerType(), True),  
    StructField("movement_reactions", IntegerType(), True),  
    StructField("movement_balance", IntegerType(), True),  
    StructField("power_shot_power", IntegerType(), True),  
    StructField("power_jumping", IntegerType(), True),  
    StructField("power_stamina", IntegerType(), True),  
    StructField("power_strength", IntegerType(), True), 
    StructField("power_long_shots", IntegerType(), True),  
    StructField("mentality_aggression", IntegerType(), True),
    StructField("mentality_interceptions", IntegerType(), True),
    StructField("mentality_positioning", IntegerType(), True),  
    StructField("mentality_vision", IntegerType(), True),  
    StructField("mentality_penalties", IntegerType(), True),
    StructField("mentality_composure", IntegerType(), True), 
    StructField("defending_marking_awareness", IntegerType(), True),  
    StructField("defending_standing_tackle", IntegerType(), True),  
    StructField("defending_sliding_tackle", IntegerType(), True),  
    StructField("goalkeeping_diving", IntegerType(), True),  
    StructField("goalkeeping_handling", IntegerType(), True),
    StructField("goalkeeping_kicking", IntegerType(), True),  
    StructField("goalkeeping_positioning", IntegerType(), True),  
    StructField("goalkeeping_reflexes", IntegerType(), True),  
    StructField("goalkeeping_speed", IntegerType(), True),  
    StructField("ls", IntegerType(), True),  
    StructField("st", IntegerType(), True),  
    StructField("rs", IntegerType(), True),  
    StructField("lw", IntegerType(), True),  
    StructField("lf", IntegerType(), True),  
    StructField("cf", IntegerType(), True),  
    StructField("rf", IntegerType(), True),  
    StructField("rw", IntegerType(), True),  
    StructField("lam", IntegerType(), True), 
    StructField("cam", IntegerType(), True), 
    StructField("ram", IntegerType(), True), 
    StructField("lm", IntegerType(), True),  
    StructField("lcm", IntegerType(), True), 
    StructField("cm", IntegerType(), True),  
    StructField("rcm", IntegerType(), True), 
    StructField("rm", IntegerType(), True),  
    StructField("lwb", IntegerType(), True), 
    StructField("ldm", IntegerType(), True), 
    StructField("cdm", IntegerType(), True), 
    StructField("rdm", IntegerType(), True), 
    StructField("rwb", IntegerType(), True), 
    StructField("lb", IntegerType(), True),  
    StructField("lcb", IntegerType(), True),  
    StructField("cb", IntegerType(), True),  
    StructField("rcb", IntegerType(), True), 
    StructField("rb", IntegerType(), True),  
    StructField("gk", IntegerType(), True),  
    StructField("player_face_url", StringType(), True), 
    StructField("club_logo_url", StringType(), True),  
    StructField("club_flag_url", StringType(), True),  
    StructField("nation_logo_url", StringType(), True),
    StructField("nation_flag_url", StringType(), True)
])


files = ["female_players_16.csv", "female_players_17.csv", "female_players_18.csv", "female_players_19.csv", 
         "female_players_20.csv", "female_players_21.csv", "female_players_22.csv", 
         "players_15.csv", "players_16.csv", "players_17.csv", "players_18.csv", "players_19.csv", 
         "players_20.csv", "players_21.csv", "players_22.csv"]

def create_dataframe(csv):
    directory = csv
    years = ["15", "16", "17", "18", "19", "20", "21", "22"]
    for year in years:
        if (year in csv) and ("female" in csv):
            df = spark.read.csv(directory,header=True, schema= schema)
            df = df.withColumn('year', lit(int("20"+year)))
            df = df.withColumn('gender', lit("female"))
            return df
        elif (year in csv) and ("female" not in csv):
            df = spark.read.csv(directory,header=True, schema= schema)
            df = df.withColumn('year', lit(int("20"+year)))
            df = df.withColumn('gender', lit("male"))
            return df
                               

#CREATING DATAFRAMES WITH YEAR AND GENDER COLUMNS
female_players_16 = create_dataframe("female_players_16.csv")
female_players_17 = create_dataframe("female_players_17.csv")
female_players_18 = create_dataframe("female_players_18.csv")
female_players_19 = create_dataframe("female_players_19.csv")
female_players_20 = create_dataframe("female_players_20.csv")
female_players_21 = create_dataframe("female_players_21.csv")
female_players_22 = create_dataframe("female_players_22.csv")
players_15 = create_dataframe("players_15.csv")
players_16 = create_dataframe("players_16.csv")
players_17 = create_dataframe("players_17.csv")
players_18 = create_dataframe("players_18.csv")
players_19 = create_dataframe("players_19.csv")
players_20 = create_dataframe("players_20.csv")
players_21 = create_dataframe("players_21.csv")
players_22 = create_dataframe("players_22.csv")


dataframes = [female_players_16, female_players_17,female_players_18 ,female_players_19,
              female_players_20 ,female_players_21,female_players_22,
              players_15, players_16, players_17, players_18, players_19, players_20,
              players_21, players_22]


In [62]:
# WRITING THE DATAFRAME TO GOOGLE CLOUD SQL
db_url = "jdbc:postgresql://34.135.160.25:5432/postgres" 

db_properties = {
    "user": "postgres",
    "password": "Tools2024",
    "driver": "org.postgresql.Driver"
}


for df in dataframes:
    df.write.jdbc(url=db_url, table="fifa.players", mode="append", properties=db_properties)

                                                                                

In [72]:
# PULLING THE DATAFRAME FROM GOOGLE CLOUD SQL AS A TEST
df = spark.read.jdbc(url=db_url, table="fifa.players", properties=db_properties)
# CACHING DATAFRAME TO MEMORY, THIS WILL IMPROVE PERFORMANCE
df.cache()  

DataFrame[unique_id: int, sofifa_id: int, player_url: string, short_name: string, long_name: string, player_positions: string, overall: int, potential: int, value_eur: int, wage_eur: int, age: int, dob: date, height_cm: int, weight_kg: int, club_team_id: int, club_name: string, league_name: string, league_level: int, club_position: string, club_jersey_number: int, club_loaned_from: string, club_joined: date, club_contract_valid_until: date, nationality_id: int, nationality_name: string, nation_team_id: int, nation_position: string, nation_jersey_number: int, preferred_foot: string, weak_foot: int, skill_moves: int, international_reputation: int, work_rate: string, body_type: string, real_face: boolean, release_clause_eur: int, player_tags: string, player_traits: string, pace: int, shooting: int, passing: int, dribbling: int, defending: int, physic: int, attacking_crossing: int, attacking_finishing: int, attacking_heading_accuracy: int, attacking_short_passing: int, attacking_volleys: i

In [51]:
df.columns

['unique_id',
 'sofifa_id',
 'player_url',
 'short_name',
 'long_name',
 'player_positions',
 'overall',
 'potential',
 'value_eur',
 'wage_eur',
 'age',
 'dob',
 'height_cm',
 'weight_kg',
 'club_team_id',
 'club_name',
 'league_name',
 'league_level',
 'club_position',
 'club_jersey_number',
 'club_loaned_from',
 'club_joined',
 'club_contract_valid_until',
 'nationality_id',
 'nationality_name',
 'nation_team_id',
 'nation_position',
 'nation_jersey_number',
 'preferred_foot',
 'weak_foot',
 'skill_moves',
 'international_reputation',
 'work_rate',
 'body_type',
 'real_face',
 'release_clause_eur',
 'player_tags',
 'player_traits',
 'pace',
 'shooting',
 'passing',
 'dribbling',
 'defending',
 'physic',
 'attacking_crossing',
 'attacking_finishing',
 'attacking_heading_accuracy',
 'attacking_short_passing',
 'attacking_volleys',
 'skill_dribbling',
 'skill_curve',
 'skill_fk_accuracy',
 'skill_long_passing',
 'skill_ball_control',
 'movement_acceleration',
 'movement_sprint_speed',


# Task II

In [2]:
# INITIALIZING SPARK SESSION 
jdbc_jar_path = "postgresql-42.7.4.jar"

appName = "Project"
master = "local"

conf = SparkConf() \
    .set('spark.driver.host', '127.0.0.1') \
    .setAppName(appName) \
    .setMaster(master) \
    .set("spark.jars", jdbc_jar_path)

sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession.builder.getOrCreate()

24/10/10 00:15:12 WARN Utils: Your hostname, Janbols-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.126 instead (on interface en0)
24/10/10 00:15:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/10/10 00:15:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
#PULLING THE DATAFRAME FROM GOOGLE CLOUDSQL
db_url = "jdbc:postgresql://34.135.160.25:5432/postgres"

db_properties = {
    "user": "postgres",
    "password": "Tools2024",
    "driver": "org.postgresql.Driver"
}
df = spark.read.jdbc(url=db_url, table="fifa.players", properties=db_properties)

24/10/10 00:15:25 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [36]:
#CACHING DATAFRAME TO MEMORY, THIS WILL IMPROVE PERFORMANCE
df.cache()

DataFrame[unique_id: int, sofifa_id: int, player_url: string, short_name: string, long_name: string, player_positions: string, overall: int, potential: int, value_eur: int, wage_eur: int, age: int, dob: date, height_cm: int, weight_kg: int, club_team_id: int, club_name: string, league_name: string, league_level: int, club_position: string, club_jersey_number: int, club_loaned_from: string, club_joined: date, club_contract_valid_until: date, nationality_id: int, nationality_name: string, nation_team_id: int, nation_position: string, nation_jersey_number: int, preferred_foot: string, weak_foot: int, skill_moves: int, international_reputation: int, work_rate: string, body_type: string, real_face: boolean, release_clause_eur: int, player_tags: string, player_traits: string, pace: int, shooting: int, passing: int, dribbling: int, defending: int, physic: int, attacking_crossing: int, attacking_finishing: int, attacking_heading_accuracy: int, attacking_short_passing: int, attacking_volleys: i

## Task 2 Q 1

In [61]:


def contract_expiry_analysis(df,year_x,y,year_z):

    df_male = df.filter(df['gender'] == 'male') #only using the male player data

    filtered_df = df_male.filter(df_male['year'] == year_x) # Filtering the dataframe to only include the year X

    filtered_yrz_df = filtered_df.filter(year(filtered_df['club_contract_valid_until']) >= year_z)
                                                                     #filter for contracts that end in or after year Z


    club_ordered_df = filtered_yrz_df.groupBy("club_name") #grouping the filtered dataframe by the club names
    player_count_df = club_ordered_df.agg(count("*").alias("player_count")) # Counting the number of players


    ordered_players_df = player_count_df.orderBy("player_count", ascending=False) # Ordering the dataframe in descending order of player count
    limited_ordered_df = ordered_players_df.limit(y) #limiting the output to Y Clubs



    limited_ordered_df.show()


In [62]:
year_x = 2022# Enter a year X (between 2015 and 2022)
y = 5# Enter a positive integer Y - number of clubs we want to analyze
year_z = 2023 # Enter a year Z, contract end year
contract_expiry_analysis(df, year_x,y,year_z)

[Stage 152:>                                                        (0 + 1) / 1]

+--------------------+------------+
|           club_name|player_count|
+--------------------+------------+
|        Leeds United|          29|
|          Sevilla FC|          29|
|        FC Barcelona|          27|
|       VfB Stuttgart|          27|
|Borussia Möncheng...|          27|
+--------------------+------------+



                                                                                

## Task 2 Q 2

In [25]:

  
def average_age_analysis(df,x,year_y,find_lowest=True):
    if x <= 0:  # Accounting for an invalid x input
        print("Invalid x")
        return
    
    df_male = df.filter(df['gender'] == 'male') #only using the male player data
    year_filtered_df = df_male.filter(df_male['year'] == year_y) # Filtering the dataframe to only include the year Y

    club_grouped_df = year_filtered_df.groupBy("club_name") # Grouping the data frame by the names of the clubs
    
    avg_age_df = club_grouped_df.agg(avg("age").alias("average_age")) # Aggregating the average player age, and storing it under an aliased column

  
    arranged_df = avg_age_df.orderBy("average_age", ascending=find_lowest) # Arranging the average age column depending on the input condition of ascending or descending 


    limited_df = arranged_df.limit(x) #Limiting the arranged dataframe to 'X' entries

    # Accounting for multiple occurrences of the same average age for the Xth ranking

    last_avg_age = limited_df.tail(1)[0]['average_age'] #Finding the value of the Xth ranking
    
    tied_clubs_df = arranged_df.filter(arranged_df['average_age'] == last_avg_age) # Filtering the prior dataframe to include every entry that has the same value of average age
    unordered_final_df = limited_df.union(tied_clubs_df).distinct() #union of the limited dataframe with the one consisting of the tied clubs

    ordered_final_df = unordered_final_df.orderBy("average_age", ascending=find_lowest) # accounting for the case in which the union function does not order the data as we wished


    ordered_final_df.show()
    



In [26]:
x = 5# Enter a positivr integer x - number of clubs we want to rank
year_y = 2015 # Enter a year to find the data for
find_lowest = False # 'True' if we arrange in ascending, 'False' if in descending
average_age_analysis(df,x, year_y,find_lowest)

[Stage 219:>                                                        (0 + 1) / 1]

+--------------------+------------------+
|           club_name|       average_age|
+--------------------+------------------+
|           Cruz Azul|28.071428571428573|
|        Arsenal Tula|             28.04|
|Podbeskidzie Biel...|27.962962962962962|
|       Fenerbahçe SK|             27.88|
|Leones Negros de ...| 27.79310344827586|
+--------------------+------------------+



                                                                                

## Task 2 Q 3

In [65]:

def popular_nationality(df):

    df_male = df.filter(df['gender'] == 'male') #only using the male player data
   
    nationality_counts = df_male.groupBy("year", "nationality_name").agg(count("*").alias("nat_count")) # grouping the data by year and nationality
                                                                                                   # and counting the number of occurrences

    
    ordered_nat_df = nationality_counts.orderBy('nat_count', ascending =False) # Ordering the nationality counts in descending order

    
    final_df = ordered_nat_df.groupBy('year').agg(first('nationality_name')) # Picking out the maximum count of nationality for each year
    final_df.show()
    





In [66]:
popular_nationality(df)

[Stage 166:>                                                        (0 + 1) / 1]

+----+-----------------------+
|year|first(nationality_name)|
+----+-----------------------+
|2015|                England|
|2016|                England|
|2017|                England|
|2018|                England|
|2019|                England|
|2020|                England|
|2021|                England|
|2022|                England|
+----+-----------------------+



                                                                                