In [1]:
import pyspark
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("GenericAppName") \
    .getOrCreate()

#Access SparkContext from your SparkSession
print("APP Name :"+ spark.sparkContext.appName);
print("Master :"+ spark.sparkContext.master);

22/10/25 22:54:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
APP Name :PySparkShell
Master :local[*]


In [122]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
# These are all the databases per year we will analyze.
files = ["players_15", "players_16", "players_17", "players_18",
         "players_19", "players_20", "players_21", "players_22"]

# Reading the .csv files and adding year column for each corresponding database year.
df = spark.read.csv("./archive/"+files[0]+".csv", header=True, inferSchema= True).withColumn("year", lit(2015))
for i in range(1,len(files)):
    df = df.union(spark.read.csv("./archive/"+files[i]+".csv", header=True, inferSchema= True).withColumn("year", lit(2015+i)))

# Adding id column with unique id for every entry. 
# This makes it so the dataframe has only one partition, added to avoid very large numbers for the ids.
df = df.coalesce(1) 
df = df.withColumn("id", monotonically_increasing_id())

# I decided to add the columns before ingesting into postgres so they algo get ingested.

                                                                                

In [125]:
db_properties={}
#update your db username
db_properties['username']="postgres"
#update your db password
db_properties['password']="bigdata"
#make sure you got the right port number here
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
#make sure you had the Postgres JAR file in the right location
db_properties['driver']="org.postgresql.Driver"
# Using table in fifa schema.
db_properties['table']= "fifa.Players"

# Data ingestion into postgres table
df.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()


                                                                                

In [126]:
# Reading database back from postgres.
df_read = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()


In [339]:
# Table schema
df_read.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: timestamp (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: timestamp (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable

In [340]:
# Shows id column doesn't repeat.
print(df_read.count())
print(df_read.select('id').distinct().count())

142079
142079


In [341]:
# Shows 5 entries.
df_read.show(5, vertical=True)

[Stage 469:>                                                        (0 + 1) / 1]

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24 00:00:00  
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

                                                                                

In [180]:
def player_contracts(df, x):
    # Function prints the x clubs that have the most players with contracts ending in 2023
    df = df.filter((col("year") == 2022) & (col("club_contract_valid_until") == 2023))\
    .groupBy('club_name').count().sort(col('count').desc())
    df.withColumnRenamed('club_name', 'club').withColumnRenamed('count', 'contracts ending 2023').show(x)
    return

In [181]:
player_contracts(df, 7)

+--------------------+---------------------+
|                club|contracts ending 2023|
+--------------------+---------------------+
|En Avant de Guingamp|                   19|
| Club Atlético Lanús|                   17|
|       Lechia Gdańsk|                   17|
|            Barnsley|                   16|
|        Kasimpaşa SK|                   16|
|        Bengaluru FC|                   16|
|        FC Barcelona|                   15|
+--------------------+---------------------+
only showing top 7 rows



In [254]:
def players_over_27(df, y):
    # Function prints the y clubs that have the most average players aged over 27.
    df = df.filter(col("age") > 27).groupBy('club_name', 'year').count()\
        .groupBy('club_name').avg('count').sort(col('avg(count)').desc()) # Calculates the average over the years.
    df.withColumnRenamed('club_name', 'club').withColumnRenamed('avg(count)', 'average players over 27').show(y)
    return

In [255]:
players_over_27(df, 6)

+--------------------+-----------------------+
|                club|average players over 27|
+--------------------+-----------------------+
|                null|                 109.25|
| Matsumoto Yamaga FC|                   19.0|
|  Dorados de Sinaloa|                   19.0|
| Shanghai Shenhua FC|                   18.5|
|          Qingdao FC|                   18.0|
|Club Deportivo Jo...|                   17.5|
+--------------------+-----------------------+
only showing top 6 rows



[Stage 318:>                                                        (0 + 1) / 1]                                                                                

In [325]:
def most_frequent_position(df):
    # Function prints the most frequent position across national teams every year.
    years = df.select('year').distinct().collect()
    position = []
    for i in range(len(years)):
        position_count = df.groupBy('nation_position','year').count().filter(col("year") == years[i][0]).sort(col('count').desc())
        position.append(position_count.head(1)[0])
    out = spark.createDataFrame(position, schema=position_count.schema)
    out.show()
    return 

In [326]:
# Test the function. It shows the most frequent position is 'null' since data cleaning hasn't been carried out yet.
most_frequent_position(df)

+---------------+----+-----+
|nation_position|year|count|
+---------------+----+-----+
|           null|2015|15074|
|           null|2016|14608|
|           null|2017|16515|
|           null|2018|16804|
|           null|2019|16981|
|           null|2020|17356|
|           null|2021|17817|
|           null|2022|18480|
+---------------+----+-----+

