# Group project



## Task1

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F

import os             

In [4]:
# Initialize Spark session      
spark = SparkSession.builder \
    .appName("group_project_option1") \
    .config("spark.jars", "/opt/homebrew/Cellar/apache-spark/3.5.3/libexec/jars/postgresql-42.7.3.jar") \
    .getOrCreate()  

24/10/10 22:18:53 WARN Utils: Your hostname, ccydeMacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.146 instead (on interface en0)
24/10/10 22:18:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/10/10 22:18:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Combine all the data together and add a year column

In [5]:
def get_the_year(file):
    year_suffix = os.path.basename(file).split('_')[-1][0:2]
    year = 2000 + int(year_suffix)
    return year

#### Add a gender column since we would only select male players in Task 2

In [6]:
def get_the_gender(file):
    gender_suffix = os.path.basename(file).split('_')[0]
    #print(gender_suffix)
    if gender_suffix == 'female':
        return 'F'
    else:
        return 'M'

In [7]:
import os

csv_files = [f for f in os.listdir("/Users/ccy/Documents/CMU/Fall2024/Systems and tool chains/HW/Course project/FIFA") if f.endswith(".csv")]
df = None
for file in csv_files:
    file_path = os.path.join("/Users/ccy/Documents/CMU/Fall2024/Systems and tool chains/HW/Course project/FIFA", file)
    temp_df = spark.read.csv(file_path, header=True, inferSchema=True)
    temp_df = temp_df.withColumn('year', F.lit(get_the_year(file)))
    temp_df = temp_df.withColumn('gender', F.lit(get_the_gender(file)))
    if df is None:
        df = temp_df
    else:
        df = df.union(temp_df)
df.printSchema()

                                                                                

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: string (nullable = true)
 |-- wage_eur: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: string (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: string (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: string (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: string (nullable = true)
 |-- club_contract_valid_until: string (nullable = true)
 |-- nationality_id: integer (nullable = true)
 |

In [8]:
df.show(5)

24/10/10 22:19:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---------+--------------------+-----------------+--------------------+----------------+-------+---------+---------+--------+---+----------+---------+---------+------------+-------------------+--------------------+------------+-------------+------------------+----------------+-----------+-------------------------+--------------+----------------+--------------+---------------+--------------------+--------------+---------+-----------+------------------------+-----------+---------+---------+------------------+--------------------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+----------

#### Write the table into Postgres Table

In [9]:
url = "jdbc:postgresql://localhost:5432/postgres"
properties = {
    "user": "postgres",
    "password": "",
    "driver": "org.postgresql.Driver"
}
df.write.jdbc(url=url, table="fifa.players", mode="overwrite", properties=properties)

                                                                                

## Task2

#### Read back the table from Postgres

In [10]:
players_df = spark.read.jdbc(url = url,table='fifa.players',properties = properties)
players_df.show(5)

[Stage 32:>                                                         (0 + 1) / 1]

+---------+--------------------+-----------+-------------+----------------+-------+---------+---------+--------+---+----------+---------+---------+------------+--------------------+--------------------+------------+-------------+------------------+----------------+-----------+-------------------------+--------------+----------------+--------------+---------------+--------------------+--------------+---------+-----------+------------------------+-------------+--------------+---------+------------------+-----------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+

                                                                                

#### 2-1 Analyzing Top Clubs Based on Contract Durations

In [11]:
def highest_contract_players(year_x, y_clubs, year_z):
    # load the male data from the players_df
    male_data = players_df.filter(F.col('gender') == 'M')

    # convert the club_joined (yyyy-MM--dd) column from string into date format
    male_data = male_data.withColumn('club_join_date', F.to_date('club_joined', 'yyyy-MM-dd')) # convert to dat format and store it in new column
    male_data = male_data.drop('club_joined') # drop the old column
    male_data = male_data.withColumnRenamed('club_join_date', 'club_joined') # rename the new column to old name.

    # create a new column to store the year of club_join (yyyy)
    male_data = male_data.withColumn('club_joined_year', F.year('club_joined'))

    # convert the "club_contract_valid_until" from string into int format
    male_data = male_data.withColumn('club_contract_valid_until_int', F.col('club_contract_valid_until').cast('int'))
    male_data = male_data.drop('club_contract_valid_until')
    male_data = male_data.withColumnRenamed('club_contract_valid_until_int', 'club_contract_valid_until')

    # filter out the data that are not in the given period
    filtered_df = male_data.filter(((F.col('club_contract_valid_until') >= year_z) & (F.col('club_joined_year') <= year_x)))
    distinct_filtered_df = filtered_df.dropDuplicates(['sofifa_id', 'long_name']) # remove duplicates


    # group the data by the club_name and count the number of contract
    club_df = distinct_filtered_df.groupBy('club_name').agg(F.count('*').alias('players_count'))
    club_df.orderBy(F.col('players_count').desc()).limit(y_clubs).show()

In [12]:
highest_contract_players(2017,5,2023)

+--------------------+-------------+
|           club_name|players_count|
+--------------------+-------------+
|            FC Seoul|           19|
|         Dynamo Kyiv|           19|
|   Alianza Petrolera|           17|
|      Deportivo Cali|           17|
|Jaguares de Córdo...|           17|
+--------------------+-------------+



#### 2-2 Analyzing Top Clubs Based on Player Age for Specific Years

In [16]:
def average_age_clubs(num_clubs, year_y, highest=True):
    # # load the male data from the players_df
    male_data = players_df.filter(F.col('gender') == 'M')

    # filter out years are not in the given year
    filtered_df = male_data.filter(F.col('year') == year_y)

    # group the dataframe by club_name and calculate by average age
    club_avg_age = filtered_df.groupBy('club_name').agg(F.avg(F.col('age')).alias('avg_age'))

    # check if num_clubs is a positive number. If not, set highest to False and make num_clubs positive.
    if num_clubs < 0:
        highest = False
        num_clubs = abs(num_clubs) # make num_clubs positive if it is negative.
    elif num_clubs == 0:
        print("No clubs available in the given year.")
        return "No clubs available in the given year."

    # show the oldest club_name or the youngest club_name
    if highest == True:
        top_old_club = club_avg_age.orderBy(F.col('avg_age').desc()).limit(num_clubs) # sort the club_avg_age df by average age in descending order
        least_oldest_avg_age = top_old_club.collect()[-1]['avg_age'] # find the least oldest average age in the top n selected rank.)
        final_top_old_club = club_avg_age.filter(F.col('avg_age') >= least_oldest_avg_age) # select all the club that have the average age older than the least average age.
        final_top_old_club.orderBy(F.col('avg_age').desc()).show()

    else:
        top_young_club = club_avg_age.orderBy(F.col('avg_age').asc()).limit(num_clubs)
        least_youngest_avg_age = top_young_club.collect()[-1]['avg_age']
        final_top_young_club = club_avg_age.filter(F.col('avg_age') <= least_youngest_avg_age)
        final_top_young_club.orderBy(F.col('avg_age').asc()).show()

In [18]:
average_age_clubs(5, 2017, highest=True)
average_age_clubs(5, 2017, highest=False)

+--------------------+------------------+
|           club_name|           avg_age|
+--------------------+------------------+
|      Ventforet Kofu|29.392857142857142|
|América Futebol C...|              29.2|
|Club Athletico Pa...|              29.0|
|Sport Club do Recife|              29.0|
|            Criciúma|              29.0|
|             Vitória|              29.0|
+--------------------+------------------+

+----------------+------------------+
|       club_name|           avg_age|
+----------------+------------------+
|Sevilla Atlético|19.923076923076923|
|    Swindon Town| 21.37037037037037|
|   CD Huachipato| 21.40740740740741|
| FC Nordsjælland| 21.40740740740741|
|       FC Twente|21.586206896551722|
+----------------+------------------+



#### 2-3 What is the most popular nationality in the dataset for each year?


In [15]:
from pyspark.sql import Window

male_df = players_df.filter(F.col('gender') == 'M')
# Group by 'year' and 'nationality_name' and count occurrences
nationality_year_counts_df = male_df.groupBy('year', 'nationality_name') \
    .agg(F.count('*').alias('nationality_count'))

# Define a window partitioned by 'year' and ordered by 'nationality_count' in descending order
window_spec = Window.partitionBy('year').orderBy(F.desc('nationality_count'))

# Add a rank to each row within the window
nationality_year_counts_df = nationality_year_counts_df \
    .withColumn('rank', F.row_number().over(window_spec))

# Filter to get the top nationality per year
top_nationalities_df = nationality_year_counts_df \
    .filter(F.col('rank') == 1) \
    .select('year', 'nationality_name', 'nationality_count')

# Display the result
top_nationalities_df.show()


+----+----------------+-----------------+
|year|nationality_name|nationality_count|
+----+----------------+-----------------+
|2015|         England|             1627|
|2016|         England|             1519|
|2017|         England|             1627|
|2018|         England|             1633|
|2019|         England|             1625|
|2020|         England|             1670|
|2021|         England|             1685|
|2022|         England|             1719|
+----+----------------+-----------------+



In [19]:
nationality_year_counts_df.filter(F.col('year') == 2020).show(5)

+----+----------------+-----------------+----+
|year|nationality_name|nationality_count|rank|
+----+----------------+-----------------+----+
|2020|         England|             1670|   1|
|2020|         Germany|             1218|   2|
|2020|           Spain|             1045|   3|
|2020|          France|             1004|   4|
|2020|       Argentina|              886|   5|
+----+----------------+-----------------+----+
only showing top 5 rows

