In [191]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import col, when, lit, to_date
from pyspark.sql.types import IntegerType, StringType, DoubleType, DateType

In [192]:
spark = SparkSession.builder.getOrCreate()

## Task-I: Build and populate necessary tables 

In [193]:
jdbc_url = "jdbc:postgresql://localhost:5432/postgres"

In [194]:
years = [16, 17, 18, 19, 20, 21, 22]

In [195]:
path = "FIFA/players_15.csv"
df = spark.read.csv(path, header=True, inferSchema=True)
df = df.withColumn("year", lit(2015))
df = df.withColumn("gender", lit("M"))
df.write.jdbc(url=jdbc_url, table="public.FIFA", mode="overwrite")

In [196]:
for year in years:
    male_path = f"FIFA/players_{year}.csv"
    male_df = spark.read.csv(male_path, header=True, inferSchema=True)
    male_df = male_df.withColumn("year", lit(year + 2000).cast(IntegerType()))
    male_df = male_df.withColumn("gender", lit("M"))
    male_df.write.jdbc(url=jdbc_url, table="public.FIFA", mode="append")

#### Since there are many 100% empty columns in csv files of female players, we need to deal with missing values before insertion.

In [197]:
for year in years:
    female_path = f"FIFA/female_players_{year}.csv"
    female_df = spark.read.csv(female_path, header=True, inferSchema=True)
    female_df = female_df.withColumn("year", lit(year).cast(IntegerType()))
    female_df = female_df.withColumn("gender", lit("F"))
    for column_name in female_df.columns:
        male_column_type = df.schema[column_name].dataType
        if isinstance(male_column_type, IntegerType):
            female_df = female_df.withColumn(column_name, when(col(column_name).isNull(), lit(0)).otherwise(col(column_name).cast(IntegerType())))
        elif isinstance(male_column_type, StringType):
            female_df = female_df.withColumn(column_name, when(col(column_name).isNull(), lit("NA")).otherwise(col(column_name).cast(StringType())))
        elif isinstance(male_column_type, DoubleType):
            female_df = female_df.withColumn(column_name, when(col(column_name).isNull(), lit(0.0)).otherwise(col(column_name).cast(DoubleType())))
        elif isinstance(male_column_type, DateType):
            female_df = female_df.withColumn(column_name, when(col(column_name).isNull(), to_date(lit("1970-01-01"), "yyyy-MM-dd")).otherwise(col(column_name).cast(DateType())))
            
    female_df.write.jdbc(url=jdbc_url, table="public.FIFA", mode="append")

#### Read Data back from PostgresDB

In [198]:
df = spark.read.jdbc(url=jdbc_url, table="public.fifa")
output_path = "fifa_data.csv"
df.write.csv(output_path, header=True, mode="overwrite")

                                                                                

## Task-II: Conduct analytics on your dataset

**In Year X, what were the Y clubs that had the highest number of players with contracts ending in year Z (or after)?**  
- X is a year between (2015 and 2022, inclusively).  
- Y is a positive integer.  
- Z is a year that can hold the value of 2023 or a year after it.  

In [217]:
sub_df = df.filter((df['gender'] == "M") & (df['club_name'].isNotNull()))

In [231]:
def get_top_clubs_by_player_contracts(df, year, club_num, end_year):
    if year < 2015 or year > 2022:
        print("Please provide a valid joining year!")
        return
    if end_year < 2023:
        print("Please provide a valid contract ending year after 2023!")
        return
    if club_num <= 0:
        print("Please provide a positive number of clubs!")
        return
    
    df.createOrReplaceTempView("temp")
    query = f"""
        SELECT club_name, COUNT(sofifa_id) AS player_count
        FROM temp
        WHERE club_contract_valid_until >= {end_year} 
        AND year = {year}
        GROUP BY club_name
        ORDER BY player_count DESC
        LIMIT {club_num};"""
    top_clubs = spark.sql(query)
    top_clubs.show()

In [241]:
get_top_clubs_by_player_contracts(sub_df, 2019, 3, 2023)

+--------------------+------------+
|           club_name|player_count|
+--------------------+------------+
|Jaguares de Córdo...|          18|
|    Rionegro Águilas|          15|
|     Deportivo Pasto|          15|
+--------------------+------------+



**List the X clubs with the highest (or lowest) average player age for a given year Y.**  
- X represents a positive integer, but you should handle a scenario if X is not positive value.
- Y represents a year between 2015 and 2022 inclusively.
- Provide the user with the ability to choose if they want the highest average age or the lowest average age.
- *Make sure to handle this scenario as well: if the user requests 5 clubs with highest averages but there are 3 clubs that share the same count at rank number 5, please include all of them in your output*

In [224]:
def find_clubs_by_average_age(df, club_num, year, highest=True):
    if club_num <= 0:
        print("Please provide a positive number of clubs!")
        return
    if year < 2015 or year > 2022:
        print("Please provide a valid year! 2015 <= year <= 2022")
        return
    sub_df = df.filter((df['year'] == year))
    sub_df.createOrReplaceTempView("temp")
    order = "DESC" if highest else "ASC"
    query_cal_avg = """
        SELECT club_name, AVG(age) AS average_age
        FROM temp
        GROUP BY club_name;"""
    sub_df_with_avg = spark.sql(query_cal_avg)

    sub_df_with_avg.createOrReplaceTempView("temp1")
    group_query = f"""
        SELECT average_age, COUNT(club_name) FROM temp1
        GROUP BY average_age
        ORDER BY average_age {order}
        LIMIT {club_num};"""
    top_values = spark.sql(group_query)
    top_values_list = [row['average_age'] for row in top_values.collect()]
    
    select_query = f"""
        SELECT club_name, average_age FROM temp1
        WHERE average_age IN ({','.join(map(str, top_values_list))})
        ORDER BY average_age {order};"""
    result = spark.sql(select_query)
    result.show()

In [225]:
find_clubs_by_average_age(sub_df, 3, 2020, highest=True)

+--------------------+-----------+
|           club_name|average_age|
+--------------------+-----------+
|           Fortaleza|       32.6|
|            Cruzeiro|       31.6|
|Club Athletico Pa...|       31.4|
|            Botafogo|       31.4|
|Associação Chapec...|       31.4|
+--------------------+-----------+



In [226]:
find_clubs_by_average_age(sub_df, 3, 2020, highest=False)

+--------------------+------------------+
|           club_name|       average_age|
+--------------------+------------------+
|    Bolton Wanderers| 20.26086956521739|
|             UCD AFC|20.428571428571427|
|FC Bayern München II|20.892857142857142|
+--------------------+------------------+



In [208]:
def find_most_popular_nationality(df):
    df.createOrReplaceTempView("temp")
    query1 = """
        SELECT nationality_name, year, COUNT(sofifa_id) AS count FROM temp
        WHERE gender = "M"
        GROUP BY nationality_name, year;"""
    result = spark.sql(query1)
    result.createOrReplaceTempView("temp1")
    query2 = """
        SELECT temp1.year, temp1.nationality_name, temp1.count
        FROM temp1 
        JOIN (
            SELECT year, MAX(count) AS max_count
            FROM temp1
            GROUP BY year
        ) AS max ON temp1.year = max.year AND temp1.count = max.max_count
        ORDER BY temp1.year;"""
    result = spark.sql(query2)
    result.show()

In [209]:
find_most_popular_nationality(sub_df)

+----+----------------+-----+
|year|nationality_name|count|
+----+----------------+-----+
|2015|         England| 1627|
|2016|         England| 1519|
|2017|         England| 1627|
|2018|         England| 1633|
|2019|         England| 1625|
|2020|         England| 1670|
|2021|         England| 1685|
|2022|         England| 1719|
+----+----------------+-----+

