Collaborator: Helen Yu, Lixin Xu

In [8]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import *
import matplotlib.pyplot as plt

appName = "Ingest Player Data"  # adjust this if you'd like
master = "local"                # adjust this if you'd like

conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

sc = SparkContext.getOrCreate(conf=conf)

sqlContext = SQLContext(sc)

spark = sqlContext.sparkSession.builder.getOrCreate()

# Construct Table and Save in Database

In [9]:
female_files = ['data/female_players_16.csv', 'data/female_players_17.csv', 
                'data/female_players_18.csv', 'data/female_players_19.csv', 
                'data/female_players_20.csv', 'data/female_players_21.csv', 
                'data/female_players_22.csv']

male_files = ['data/players_15.csv', 'data/players_16.csv', 'data/players_17.csv', 
              'data/players_18.csv', 'data/players_19.csv', 'data/players_20.csv', 
              'data/players_21.csv', 'data/players_22.csv']

female_df = None
male_df = None

# Load all female files
for file in female_files:
    year = '20' + file[-6:-4]
    df = spark.read.csv(file, header=True, inferSchema=True)
    df = df.withColumn('year', lit(year))
    df = df.withColumn('gender', lit('Female'))
    if female_df is None:
        female_df = df
    else:
        female_df = female_df.union(df)

# Load all male files
for file in male_files:
    year = '20' + file[-6:-4]
    df = spark.read.csv(file, header=True, inferSchema=True)
    df = df.withColumn('year', lit(year))
    df = df.withColumn('gender', lit('Male'))
    if male_df is None:
        male_df = df
    else:
        male_df = male_df.union(df)

# Combine both male and female data
combined_df = female_df.union(male_df)

combined_df.printSchema()
print(len(combined_df.columns))
print(combined_df.count())

                                                                                

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: string (nullable = true)
 |-- wage_eur: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: string (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: string (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: string (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: string (nullable = true)
 |-- club_contract_valid_until: string (nullable = true)
 |-- nationality_id: integer (nullable = true)
 |



144323


                                                                                

## !!! CHANGE DATABASE PROPERTIES TO YOURS !!!

In [10]:
db_properties = {
    "username": "postgres",   
    "password": "password",   
    'url': "jdbc:postgresql://localhost:5433/14787db",  
    'table': "fifa",
    'driver': "org.postgresql.Driver"
}

combined_df.write.format("jdbc")\
    .mode("overwrite")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("driver", db_properties['driver'])\
    .save()

                                                                                

# Spark Analysis

In [11]:
df = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

df = df.filter(df['gender'] == 'Male')
print(df.count())

142079


## Function 1
In Year X, what were the Y clubs that had the highest number of players with contracts ending in year Z (or after)?
1.  X is a year between (2015 and 2022, inclusively).
2.  Y is a positive integer.
3.  Z is a year that can hold the value of 2023 or a year after it.

In [12]:
# Columns:  year, club_name, club_contract_valid_until

def top_clubs_by_contract_end_year(df, X:str, Y:int, Z:str):
    if type(X) != str:
        raise TypeError("X must be an string")
    elif not (2015 <= int(X) <= 2022):
        raise ValueError("X must be a year between 2015 and 2022 inclusively.")
    
    if type(Y) != int:
        raise TypeError("Y must be a positive integer")
    elif int(Y) <= 0:
        raise ValueError(("Y must be a positive integer"))
    
    if type(Z) != str:
        raise TypeError("X must be an string")
    elif int(Z) < 2023:
        raise ValueError("Z must be a year that is 2023 or later.")
    
    mask_1 = df["year"] == X
    mask_2 = df["club_contract_valid_until"] >= Z
    df = df.filter(mask_1 & mask_2)
    
    club_count_df = df.groupBy("club_name") \
        .agg(count("club_name").alias("Player_Count")) \
        .orderBy(desc("Player_Count")) \
        .limit(Y)
    
    club_count_df.show()


X = '2017'
Y = 5
Z = '2023'

top_clubs_by_contract_end_year(df, X, Y, Z)

+--------------------+------------+
|           club_name|Player_Count|
+--------------------+------------+
|         Envigado FC|          14|
|   Alianza Petrolera|          12|
|Club Atlético Tem...|          12|
|Club de Deportes ...|          12|
| Seattle Sounders FC|          12|
+--------------------+------------+



## Function 2
In sports, maturity and energy of teams depend on the average age of team
players (among other factors). Therefore, it’s important to have a function that
can find clubs with such features.

List the X clubs with the highest (or lowest) average player age for a given
year Y.
- X represents a positive integer, but you should handle a scenario if
X is not positive value.
- Y represents a year between 2015 and 2022 inclusively.
- Provide the user with the ability to choose if they want the highest
average age or the lowest average age.
- Make sure to handle this scenario as well: if the user requests 5
clubs with highest averages but there are 3 clubs that share the
same count at rank number 5, please include all of them in your
output

In [13]:
def club_by_avg_age (df, X, Y, orderby):
    
    if type(X) != int:
        raise TypeError("X must be an integer")
    elif X <= 0:
        raise ValueError("X should be a positive integer.")
    
    if type(Y) != str:
        raise TypeError("Y must be an string")
    elif not (2015 <= int(Y) <= 2022):
        raise ValueError("Y should be between 2015 and 2022 inclusively.")
    
    if orderby != 'highest' and orderby != 'lowest':
        raise ValueError("orderby must either be 'highest' or 'lowest'.")

    mask_1 = df["year"] == Y
    df = df.filter(mask_1)  

    avg_age_df = df.groupBy("club_name") \
        .agg(avg("age").alias("Average_Age")) \
        .orderBy(desc("Average_Age"))

    sort_order = desc("Average_Age") if orderby == 'highest' else asc("Average_Age")
    threshold_avg_age = avg_age_df.select("Average_Age") \
        .distinct() \
        .orderBy(sort_order) \
        .limit(5) \
        .collect()[-1]["Average_Age"]

    result_df = avg_age_df.filter(col("Average_Age") >= threshold_avg_age)

    result_df.show()


X = 5
Y = '2020'
orderby = 'highest'
club_by_avg_age(df, X, Y, orderby)

+--------------------+-----------+
|           club_name|Average_Age|
+--------------------+-----------+
|           Fortaleza|       32.6|
|            Cruzeiro|       31.6|
|Club Athletico Pa...|       31.4|
|            Botafogo|       31.4|
|Associação Chapec...|       31.4|
|             Avaí FC|       31.2|
|          Fluminense|       31.2|
| Ceará Sporting Club|       31.2|
|               Goiás|       31.0|
|               Bahia|       31.0|
|              Grêmio|       31.0|
+--------------------+-----------+



## Function 3
What is the most popular nationality in the dataset for each year? (i.e. display the most frequent nation for 2015, 2016, etc.)

In [14]:
from pyspark.sql.window import Window

def most_popular_nationality(df):   
    count_df = df.groupBy("year", "nationality_name") \
        .count() \
        .withColumnRenamed("count", "nationality_count")

    window = Window.partitionBy("year").orderBy(desc("nationality_count"))

    result_df = count_df \
        .withColumn("rank", row_number().over(window)) \
        .filter(col("rank") == 1) \
        .select("year", "nationality_name", col("nationality_count").alias("max_count"))
    
    result_df.show()

most_popular_nationality(df)

+----+----------------+---------+
|year|nationality_name|max_count|
+----+----------------+---------+
|2015|         England|     1627|
|2016|         England|     1519|
|2017|         England|     1627|
|2018|         England|     1633|
|2019|         England|     1625|
|2020|         England|     1670|
|2021|         England|     1685|
|2022|         England|     1719|
+----+----------------+---------+

