### Task 1:

In [1]:
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler

from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np

In [2]:
import findspark
findspark.init()
findspark.find()

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("project") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()
columns = [
    "sofifa_id", "player_url", "short_name", "long_name", "player_positions", "overall", 
    "potential", "value_eur", "wage_eur", "age", "dob", "height_cm", "weight_kg", 
    "club_team_id", "club_name", "league_name", "league_level", "club_position", 
    "club_jersey_number", "club_loaned_from", "club_joined", "club_contract_valid_until", 
    "nationality_id", "nationality_name", "nation_team_id", "nation_position", 
    "nation_jersey_number", "preferred_foot", "weak_foot", "skill_moves", 
    "international_reputation", "work_rate", "body_type", "real_face", 
    "release_clause_eur", "player_tags", "player_traits", "pace", "shooting", 
    "passing", "dribbling", "defending", "physic", "attacking_crossing", 
    "attacking_finishing", "attacking_heading_accuracy", "attacking_short_passing", 
    "attacking_volleys", "skill_dribbling", "skill_curve", "skill_fk_accuracy", 
    "skill_long_passing", "skill_ball_control", "movement_acceleration", 
    "movement_sprint_speed", "movement_agility", "movement_reactions", 
    "movement_balance", "power_shot_power", "power_jumping", "power_stamina", 
    "power_strength", "power_long_shots", "mentality_aggression", 
    "mentality_interceptions", "mentality_positioning", "mentality_vision", 
    "mentality_penalties", "mentality_composure", "defending_marking_awareness", 
    "defending_standing_tackle", "defending_sliding_tackle", "goalkeeping_diving", 
    "goalkeeping_handling", "goalkeeping_kicking", "goalkeeping_positioning", 
    "goalkeeping_reflexes", "goalkeeping_speed", "ls", "st", "rs", "lw", "lf", 
    "cf", "rf", "rw", "lam", "cam", "ram", "lm", "lcm", "cm", "rcm", "rm", 
    "lwb", "ldm", "cdm", "rdm", "rwb", "lb", "lcb", "cb", "rcb", "rb", "gk", 
    "player_face_url", "club_logo_url", "club_flag_url", "nation_logo_url", 
    "nation_flag_url"
]
df_male_16 = spark.read.csv('./fifa/players_15.csv',header=True).toDF(*columns)
df_male_16 = spark.read.csv('./fifa/players_16.csv',header=True).toDF(*columns)
df_male_17 = spark.read.csv('./fifa/players_17.csv',header=True).toDF(*columns)
df_male_18 = spark.read.csv('./fifa/players_18.csv',header=True).toDF(*columns)
df_male_19 = spark.read.csv('./fifa/players_19.csv',header=True).toDF(*columns)
df_male_20 = spark.read.csv('./fifa/players_20.csv',header=True).toDF(*columns)
df_male_21 = spark.read.csv('./fifa/players_21.csv',header=True).toDF(*columns)
df_male_22 = spark.read.csv('./fifa/players_22.csv',header=True).toDF(*columns)
df_female_16 = spark.read.csv('./fifa/female_players_16.csv',header=True).toDF(*columns)
df_female_17 = spark.read.csv('./fifa/female_players_17.csv',header=True).toDF(*columns)
df_female_18 = spark.read.csv('./fifa/female_players_18.csv',header=True).toDF(*columns)
df_female_19 = spark.read.csv('./fifa/female_players_19.csv',header=True).toDF(*columns)
df_female_20 = spark.read.csv('./fifa/female_players_20.csv',header=True).toDF(*columns)
df_female_21 = spark.read.csv('./fifa/female_players_21.csv',header=True).toDF(*columns)
df_female_22 = spark.read.csv('./fifa/female_players_22.csv',header=True).toDF(*columns)

df_male_16 = df_male_16.withColumn("year", lit(2016))
df_male_17 = df_male_17.withColumn("year", lit(2017))
df_male_18 = df_male_18.withColumn("year", lit(2018))
df_male_19 = df_male_19.withColumn("year", lit(2019))
df_male_20 = df_male_20.withColumn("year", lit(2020))
df_male_21 = df_male_21.withColumn("year", lit(2021))
df_male_22 = df_male_22.withColumn("year", lit(2022))

df_female_16 = df_female_16.withColumn("year", lit(2016))
df_female_17 = df_female_17.withColumn("year", lit(2017))
df_female_18 = df_female_18.withColumn("year", lit(2018))
df_female_19 = df_female_19.withColumn("year", lit(2019))
df_female_20 = df_female_20.withColumn("year", lit(2020))
df_female_21 = df_female_21.withColumn("year", lit(2021))
df_female_22 = df_female_22.withColumn("year", lit(2022))

df_male_combined = df_male_16.union(df_male_17) \
    .union(df_male_18) \
    .union(df_male_19) \
    .union(df_male_20) \
    .union(df_male_21) \
    .union(df_male_22)
df_male_combined = df_male_combined.withColumn("gender", lit("male"))
df_female_combined = df_female_16.union(df_female_17) \
    .union(df_female_18) \
    .union(df_female_19) \
    .union(df_female_20) \
    .union(df_female_21) \
    .union(df_female_22)
df_female_combined = df_female_combined.withColumn("gender", lit("female"))
df_all_players = df_male_combined.union(df_female_combined)

In [3]:
df_all_players.show(5)

+---------+--------------------+-----------------+--------------------+----------------+-------+---------+-----------+--------+---+----------+---------+---------+------------+-----------------+--------------------+------------+-------------+------------------+----------------+-----------+-------------------------+--------------+----------------+--------------+---------------+--------------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+------------------+--------------------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-

In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import re
# since start from ls to gk, it contains + and -, we just evaluate it before loading into sql
# evaluate_expression is helped by gpt
def evaluate_expression(expr_str):
    try:
        cleaned_expr = re.sub(r'[^0-9+\-]', '', expr_str)
        return eval(cleaned_expr)
    except:
        return None
evaluate_expression_udf = udf(evaluate_expression, IntegerType())

columns_to_process = ['ls', 'st', 'rs', 'lw', 'lf', 'cf', 'rf', 'rw', 'lam', 'cam', 'ram', 'lm', 
                      'lcm', 'cm', 'rcm', 'rm', 'lwb', 'ldm', 'cdm', 'rdm', 'rwb', 'lb', 
                      'lcb', 'cb', 'rcb', 'rb', 'gk']

int_columns = ['sofifa_id', 'overall', 'potential', 'value_eur', 'wage_eur', 'age', 
               'height_cm', 'weight_kg', 'club_team_id', 'club_jersey_number','club_loaned_from', 'club_contract_valid_until','nationality_id',
               'nation_team_id', 'nation_jersey_number', 'weak_foot', 'skill_moves', 
               'international_reputation', 'pace', 'shooting', 'passing', 'dribbling', 'defending',
               'physic', 'attacking_crossing', 'attacking_finishing', 'attacking_heading_accuracy', 
               'attacking_short_passing', 'attacking_volleys', 'skill_dribbling', 'skill_curve',
               'skill_fk_accuracy', 'skill_long_passing', 'skill_ball_control', 'movement_acceleration',
               'movement_sprint_speed', 'movement_agility', 'movement_reactions', 'movement_balance', 
               'power_shot_power', 'power_jumping', 'power_stamina', 'power_strength', 
               'power_long_shots', 'mentality_aggression', 'mentality_interceptions', 
               'mentality_positioning', 'mentality_vision', 'mentality_penalties', 'mentality_composure', 
               'defending_marking_awareness', 'defending_standing_tackle', 'defending_sliding_tackle', 
               'goalkeeping_diving', 'goalkeeping_handling', 'goalkeeping_kicking', 'goalkeeping_positioning',
               'goalkeeping_reflexes', 'goalkeeping_speed']
for col_name in int_columns:
    df_all_players = df_all_players.withColumn(col_name, col(col_name).cast(IntegerType()))

for col_name in columns_to_process:
    df_all_players = df_all_players.withColumn(col_name, evaluate_expression_udf(col(col_name)))

df_all_players.show(5)

+---------+--------------------+-----------------+--------------------+----------------+-------+---------+---------+--------+---+----------+---------+---------+------------+-----------------+--------------------+------------+-------------+------------------+----------------+-----------+-------------------------+--------------+----------------+--------------+---------------+--------------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+------------------+--------------------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+---

In [5]:
df_all_players.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: integer (nullable = true)
 |-- wage_eur: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: integer (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: string (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: integer (nullable = true)
 |-- club_joined: string (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable = 

In [6]:
df_male_players = df_all_players.filter(df_all_players.gender == "male")

df_male_players.show()

+---------+--------------------+-----------------+--------------------+----------------+-------+---------+---------+--------+---+----------+---------+---------+------------+-------------------+--------------------+------------+-------------+------------------+----------------+-----------+-------------------------+--------------+----------------+--------------+---------------+--------------------+--------------+---------+-----------+------------------------+-------------+----------------+---------+------------------+--------------------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-

In [6]:
db_properties={}
db_properties['username']="postgres"
db_properties['password']="88062601"
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['table']="fifa.players"
db_properties['driver']="org.postgresql.Driver"

df_all_players.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

### Task 2:

In [7]:
from pyspark.sql import functions as F

def top_clubs_ending(X, Y, Z):
    # handle the invalid input
    if not (2015 <= X <= 2022):
        raise ValueError("Parameter X must be between 2015 and 2022, inclusively.")
    if Y <= 0:
        raise ValueError("Parameter Y must be a positive integer.")
    if Z < 2023:
        raise ValueError("Parameter Z must be 2023 or later.")

    filtered_df = df_male_players.filter((df_male_players.year == X) & (df_male_players.club_contract_valid_until >= Z))
    return filtered_df.groupBy("club_name").count().orderBy(desc("count")).limit(Y)

In [8]:
top_clubs_ending(2020,3,2023).show()

+-----------+-----+
|  club_name|count|
+-----------+-----+
| SL Benfica|   22|
|  Al Ain FC|   20|
|Sporting CP|   20|
+-----------+-----+



In [9]:
def clubs_age_most_least(X, Y, highest=True):
    # x is the number of clubs
    # y is the year we need to search for
    # highest true means to find the highest or find the lowest

    spark = SparkSession.builder.getOrCreate()
    
    # Define the schema for the result DataFrame
    schema = StructType([
        StructField("club_name", StringType(), True),
        StructField("average_age", DoubleType(), True)
    ])
    # If the x is invalid, we return an empty df
    if X <= 0:
        return spark.createDataFrame([], schema)
    # filter the df and sort
    filtered_df = df_male_players.filter(F.col("year") == Y)
    avg_age_df = filtered_df.groupBy("club_name").agg(F.avg("age").alias("average_age"))
    sorted_df = avg_age_df.orderBy(F.desc("average_age") if highest else F.asc("average_age"))
    # handle the scenario  if the user requests 5 clubs with highest averages but there are 3 clubs that share the same count
    # Here is helped by gpt
    top_x = sorted_df.limit(X).collect()
    if top_x:
        cutoff_age = top_x[-1]["average_age"]
        condition = F.col("average_age") >= cutoff_age if highest else F.col("average_age") <= cutoff_age
        result_df = sorted_df.filter(condition)
    else:
        result_df = sorted_df
    return result_df


In [10]:
clubs_age_most_least(3,2019,True).show()

+--------------------+-----------+
|           club_name|average_age|
+--------------------+-----------+
|              Paraná|       31.6|
|            Cruzeiro|       30.6|
|Club Athletico Pa...|       30.4|
|            Botafogo|       30.4|
|Associação Chapec...|       30.4|
+--------------------+-----------+



In [11]:
from pyspark.sql.window import Window

def most_popular_nationality():
    nationality_count_df = df_male_players.groupBy("year", "nationality_name").agg(count("*").alias("count")).orderBy("year", desc("count"))
    # using the window can keep the code clean and efficient
    # The use of window is from gpt
    window_spec = Window.partitionBy("year").orderBy(desc("count"))
    ranked_df = nationality_count_df.withColumn("rank", row_number().over(window_spec))
    most_popular_nationality_df = ranked_df.filter(col("rank") == 1).select("year", "nationality_name", "count")
    return most_popular_nationality_df


In [12]:
most_popular_nationality().show()

+----+----------------+-----+
|year|nationality_name|count|
+----+----------------+-----+
|2016|         England| 1519|
|2017|         England| 1627|
|2018|         England| 1633|
|2019|         England| 1625|
|2020|         England| 1670|
|2021|         England| 1685|
|2022|         England| 1719|
+----+----------------+-----+

