## Task I
#### Question: 
Build and populate necessary tables
#### Core Code with expected output:

In [1]:
import pyspark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder \
    .appName("fifa") \
    .config("spark.driver.extraClassPath", "/opt/anaconda3/envs/myenv/lib/python3.9/site-packages/pyspark/jars/postgresql-42.7.4.jar") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/09 15:21:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Read one of data to get the headers
data_sample = pd.read_csv("./archive/players_15.csv", nrows=2)
column_names = list(data_sample.columns)

In [3]:
def infer_sql_type(dtype):
    """Map pandas data type to PostgreSQL"""
    if pd.api.types.is_integer_dtype(dtype):
        return "INT"
    elif pd.api.types.is_float_dtype(dtype):
        return "FLOAT"
    elif pd.api.types.is_bool_dtype(dtype):
        return "BOOLEAN"
    else:
        return "VARCHAR"

# Infer data type for each column in Postgres
column_types = {col: infer_sql_type(dtype) for col, dtype in zip(data_sample.columns, data_sample.dtypes)}

# Create a new column of year
column_types['year'] = "INT"

# Create a new column of gender
column_types['gender'] = "VARCHAR"

# Print the crrent result
for col, col_type in column_types.items():
    print(f"{col}: {col_type}")

sofifa_id: INT
player_url: VARCHAR
short_name: VARCHAR
long_name: VARCHAR
player_positions: VARCHAR
overall: INT
potential: INT
value_eur: FLOAT
wage_eur: FLOAT
age: INT
dob: VARCHAR
height_cm: INT
weight_kg: INT
club_team_id: FLOAT
club_name: VARCHAR
league_name: VARCHAR
league_level: INT
club_position: VARCHAR
club_jersey_number: INT
club_loaned_from: FLOAT
club_joined: VARCHAR
club_contract_valid_until: INT
nationality_id: INT
nationality_name: VARCHAR
nation_team_id: FLOAT
nation_position: VARCHAR
nation_jersey_number: INT
preferred_foot: VARCHAR
weak_foot: INT
skill_moves: INT
international_reputation: INT
work_rate: VARCHAR
body_type: VARCHAR
real_face: VARCHAR
release_clause_eur: FLOAT
player_tags: VARCHAR
player_traits: VARCHAR
pace: INT
shooting: INT
passing: INT
dribbling: INT
defending: INT
physic: INT
attacking_crossing: INT
attacking_finishing: INT
attacking_heading_accuracy: INT
attacking_short_passing: INT
attacking_volleys: INT
skill_dribbling: INT
skill_curve: INT
skil

24/10/09 15:21:59 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [4]:
# Create a new table names "fifa"
table = "fifa" 
columns = ",\n    ".join([f'"{col}" {dtype}' for col, dtype in column_types.items()])
create_table = f"""
CREATE TABLE IF NOT EXISTS {table} (
    id SERIAL PRIMARY KEY,
    {columns}
);
"""

print(create_table)


CREATE TABLE IF NOT EXISTS fifa (
    id SERIAL PRIMARY KEY,
    "sofifa_id" INT,
    "player_url" VARCHAR,
    "short_name" VARCHAR,
    "long_name" VARCHAR,
    "player_positions" VARCHAR,
    "overall" INT,
    "potential" INT,
    "value_eur" FLOAT,
    "wage_eur" FLOAT,
    "age" INT,
    "dob" VARCHAR,
    "height_cm" INT,
    "weight_kg" INT,
    "club_team_id" FLOAT,
    "club_name" VARCHAR,
    "league_name" VARCHAR,
    "league_level" INT,
    "club_position" VARCHAR,
    "club_jersey_number" INT,
    "club_loaned_from" FLOAT,
    "club_joined" VARCHAR,
    "club_contract_valid_until" INT,
    "nationality_id" INT,
    "nationality_name" VARCHAR,
    "nation_team_id" FLOAT,
    "nation_position" VARCHAR,
    "nation_jersey_number" INT,
    "preferred_foot" VARCHAR,
    "weak_foot" INT,
    "skill_moves" INT,
    "international_reputation" INT,
    "work_rate" VARCHAR,
    "body_type" VARCHAR,
    "real_face" VARCHAR,
    "release_clause_eur" FLOAT,
    "player_tags" VARCHAR,

In [5]:
# Read male data
total_data = spark.read.csv('/Users/jiayi/Desktop/System-ToolChain-AI/Course Project/archive/players_15.csv',header=True, inferSchema=True).toDF(*column_names)
total_data = total_data.withColumn("year", lit(2015))
total_data = total_data.withColumn("gender", lit("male"))

for i in range(16,23):
    data = spark.read.csv(f'/Users/jiayi/Desktop/System-ToolChain-AI/Course Project/archive/players_{i}.csv',header=True, inferSchema=True)
    data = data.toDF(*column_names)
    data = data.withColumn("year", lit(2000+i))
    data = data.withColumn("gender", lit("male"))
    total_data = total_data.union(data)
print("total male players: ",total_data.count())

# Read female data
for i in range(16,23):
    data = spark.read.csv(f'/Users/jiayi/Desktop/System-ToolChain-AI/Course Project/archive/female_players_{i}.csv',header=True, inferSchema=True)
    data = data.toDF(*column_names)
    data = data.withColumn("year", lit(2000+i))
    data = data.withColumn("gender", lit("female"))
    total_data = total_data.union(data)
    
print("total female players: ",total_data.count())   

# Ensure every record can be uniquely identified in the database table
print("total unique records: ",total_data.distinct().count())

total male players:  142079
total female players:  144323


24/10/09 15:22:18 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

total unique records:  144323


                                                                                

In [6]:
# Ingest data to PostgresDB
postgres_url = "jdbc:postgresql://localhost:5432/jiayi"
postgres_properties = {
    "user": "jiayi", 
    "password": "Liu123jiayi",
    "driver": "org.postgresql.Driver"
}

# Write data to DB
total_data.write \
    .jdbc(url=postgres_url, table="fifa", mode="overwrite", properties=postgres_properties)

# Read data from DB
data_postgres = spark.read \
    .jdbc(url=postgres_url, table="fifa", properties=postgres_properties)

data_postgres.printSchema()
data_postgres.show(5, vertical=True)

                                                                                

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: string (nullable = true)
 |-- wage_eur: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: string (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: string (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: string (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: string (nullable = true)
 |-- club_contract_valid_until: string (nullable = true)
 |-- nationality_id: integer (nullable = true)
 |

In [7]:
data_postgres.tail(5)

                                                                                

[Row(sofifa_id=262371, player_url='https://sofifa.com/player/262371/craig-dundas/220002', short_name='C. Dundas', long_name='Craig Dundas', player_positions='ST', overall=56, potential=56, value_eur=None, wage_eur='1000.0', age=40, dob=datetime.date(1981, 2, 16), height_cm=188, weight_kg=89, club_team_id='110799.0', club_name='Sutton United', league_name='English League Two', league_level='4', club_position='RES', club_jersey_number='14', club_loaned_from=None, club_joined='2019-07-01', club_contract_valid_until='2022', nationality_id=14, nationality_name='England', nation_team_id=None, nation_position=None, nation_jersey_number=None, preferred_foot='Right', weak_foot=3, skill_moves=2, international_reputation=1, work_rate='Medium/Low', body_type='Normal (185+)', real_face='No', release_clause_eur=None, player_tags=None, player_traits=None, pace=50, shooting=56, passing=39, dribbling=53, defending=23, physic=63, attacking_crossing=34, attacking_finishing=58, attacking_heading_accuracy=

## Task II

### Question 1: 
In Year X, what were the Y clubs that had the highest number of players with
contracts ending in year Z (or after)?
- X is a year between (2015 and 2022, inclusively).
- Y is a positive integer.
- Z is a year that can hold the value of 2023 or a year after it.

#### Core Code with expected output:

***Experiment***

In [8]:
# Explore "club_contract_valid_until" column to see possible values of Z
data_postgres.select("club_contract_valid_until").distinct().show()

+-------------------------+
|club_contract_valid_until|
+-------------------------+
|                     2016|
|                     2020|
|                     2031|
|                     2026|
|                     2019|
|                     2017|
|                     2014|
|                     2028|
|                     2018|
|                     2022|
|                     2025|
|                     2023|
|                     2021|
|                     2024|
|                     2015|
|                     2027|
|                     NULL|
+-------------------------+



*Result*

Possible values for Z: 2031, 2026, 2028, 2025, 2023, 2024, 2027

***Query Function***

In [13]:
from pyspark.sql.functions import col
def query_no_clubs(X, Y, Z):
    # Filter the data as needed and group by club_name
    club_counts = data_postgres.filter(f"club_contract_valid_until == {Z}") \
                               .filter(f"year == {X}") \
                               .filter("gender == 'male'") \
                               .groupBy("club_name") \
                               .count() \
                               .sort("count", ascending=False) \
                               .limit(Y)
    return club_counts

***Function Testing***

In [14]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("year", IntegerType(), True),
    StructField("contract_ending", IntegerType(), True),
    StructField("no_of_clubs", IntegerType(), True)
])

# Create an empty DataFrame with the defined schema
resultDF = spark.createDataFrame([], schema=schema)


data =[]
X = [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022]
Z = [2031, 2026, 2028, 2025, 2023, 2024, 2027]

# Specify y: #clubs that had the highest number of players 
# For example, y=5
y = 5
for i in X:
    for j in Z:
        no_of_clubs = query_no_clubs(i,y,j)
        print(f"In year {i} and club contract valid until {j}:")
        no_of_clubs.show()

In year 2015 and club contract valid until 2031:
+---------+-----+
|club_name|count|
+---------+-----+
+---------+-----+

In year 2015 and club contract valid until 2026:
+---------+-----+
|club_name|count|
+---------+-----+
+---------+-----+

In year 2015 and club contract valid until 2028:
+---------+-----+
|club_name|count|
+---------+-----+
+---------+-----+

In year 2015 and club contract valid until 2025:
+---------+-----+
|club_name|count|
+---------+-----+
+---------+-----+

In year 2015 and club contract valid until 2023:
+---------+-----+
|club_name|count|
+---------+-----+
+---------+-----+

In year 2015 and club contract valid until 2024:
+---------+-----+
|club_name|count|
+---------+-----+
+---------+-----+

In year 2015 and club contract valid until 2027:
+---------+-----+
|club_name|count|
+---------+-----+
+---------+-----+

In year 2016 and club contract valid until 2031:
+---------+-----+
|club_name|count|
+---------+-----+
+---------+-----+

In year 2016 and club co

### Question 2: 
In sports, maturity and energy of teams depend on the average age of team players (among other factors). Therefore, it’s important to have a function that can find clubs with such features.
- List the X clubs with the highest (or lowest) average player age for a given year Y.
    - X represents a positive integer, but you should handle a scenario if X is not positive value.
    - Y represents a year between 2015 and 2022 inclusively.
    - Provide the user with the ability to choose if they want the highest average age or the lowest average age.
    - Make sure to handle this scenario as well: if the user requests 5 clubs with highest averages but there are 3 clubs that share the same count at rank number 5, please include all of them in your output

#### Core Code with expected output:

***Query Function***

In [15]:
def top_avg_age_clubs(X, Y, filter):
    avg_by_group = data_postgres.filter(f"year == {Y}") \
                            .filter("gender == 'male'") \
                            .groupBy("club_name") \
                            .agg(avg("age").alias("average_age"))
    
    # Find the distinct avg age
    avg_by_group.select("average_age").distinct().orderBy("average_age")
    
    # Filter from the highest avg age
    if filter == "highest":
        top = avg_by_group.select("average_age").distinct().sort("average_age", ascending=False).limit(X).collect()
        top_boundary = top[X-1][0]
        return avg_by_group.filter(f"average_age >= {top_boundary}").sort("average_age", ascending=False).show()
    
    # Filter from the lowest avg age
    else:
        bottom = avg_by_group.select("average_age").distinct().sort("average_age", ascending=True).limit(X).collect()
        bottom_boundary = bottom[X-1][0]
        return avg_by_group.filter(f"average_age <= {bottom_boundary}").sort("average_age", ascending=True).show()

***Function Testing***

In [16]:
top_avg_age_clubs(5, 2015, "highest")

+--------------------+------------------+
|           club_name|       average_age|
+--------------------+------------------+
|           Cruz Azul|28.071428571428573|
|        Arsenal Tula|             28.04|
|Podbeskidzie Biel...|27.962962962962962|
|       Fenerbahçe SK|             27.88|
|Leones Negros de ...| 27.79310344827586|
+--------------------+------------------+



### Question 3: 
What is the most popular nationality in the dataset for each year? (i.e. display the
most frequent nation for 2015, 2016, etc.).

#### Core Code with expected output:

***Query Function***

In [17]:
def display_popular_nation():
    # Find the minimum and maximum year in the db
    max_year = data_postgres.agg(max("year")).collect()[0][0]
    min_year = data_postgres.agg(min("year")).collect()[0][0]

    # Group nations for each years
    for i in range(min_year,max_year + 1):
        no_nations = data_postgres.filter(f"year == {i}") \
                            .filter("gender == 'male'") \
                            .groupBy("nationality_name") \
                            .count()
        
        max_count = no_nations.agg(max("count")).collect()[0][0]
        print(f"The most popularity nationality & its total players in year {i}:")
        no_of_nations = no_nations.filter(col("count") == max_count).show()
    return no_nations

#### Function Testing:

In [18]:
no_nations = display_popular_nation()
for row in no_nations.collect():
    print(row)

The most popularity nationality & its total players in year 2015:
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1627|
+----------------+-----+

The most popularity nationality & its total players in year 2016:
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1519|
+----------------+-----+

The most popularity nationality & its total players in year 2017:
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1627|
+----------------+-----+

The most popularity nationality & its total players in year 2018:
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1633|
+----------------+-----+

The most popularity nationality & its total players in year 2019:
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1625|
+----------------+-----+

The most popularity nationality & its to