In [12]:
# Adjust notebook UI to fit the screen
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [13]:
import pyspark
import re
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import *

appName = "Final Project - FIFA Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than rely on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
#sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = SparkSession.builder.getOrCreate()

In [4]:
def read_csv(path): 
    return (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load(path))

def evaluate_skill(entry):
    if (skill_express_add.match(entry)):
        values = entry.split("+")
        res = int(values[0]) + int(values[1])
        return res
    elif (skill_express_minus.match(entry)):
        values = entry.split("-")
        res = int(values[0]) - int(values[1])
        return res
    else:
        res = int(entry)
        return res
    
def preprocess_df(df):
    df = df.withColumn('club_team_id', df['club_team_id'].cast('integer').cast('string'))
    df = df.withColumn('nation_team_id', df['nation_team_id'].cast('integer').cast('string'))
    df = df.withColumn('club_contract_valid_until', df['club_contract_valid_until'].cast('string'))
    df = df.withColumn('sofifa_id', df['sofifa_id'].cast('string'))
    df = df.withColumn('nationality_id', df['nationality_id'].cast('string'))
    df = df.drop(*drop_columns)
    for col in player_position_col_set:
        df = df.withColumn(col + "_evaluated", evaluate_udf(df[col])).drop(col)
    return df

In [5]:
# Define const variables
years = ['15', '16', '17', '18', '19', '20', '21', '22']
player_position_col_set = {"ls", "st", "rs", "lw", "lf", "cf", "rf", "rw", "lam", "cam", "ram", "lm", "lcm", "cm", "rcm", "rm", "lwb", "ldm", "cdm", "rdm", "rwb", "lb", "lcb", "cb", "rcb", "rb", "gk"}
drop_columns = ['player_url', 'player_face_url', 'club_logo_url', 'club_flag_url', 'nation_logo_url', 'nation_flag_url']

In [6]:
# Preprocessing utilities
from pyspark.sql.types import IntegerType

skill_express_add = re.compile("(\d+)\+(\d+)")
skill_express_minus = re.compile("(\d+)\-(\d+)")

evaluate_udf = udf(lambda exp : evaluate_skill(exp), IntegerType())

In [7]:
# Ingest male player data
player_dfs = {}
for year in years:
    player_dfs[str(year)] = preprocess_df(read_csv(f"../data/players_{year}.csv"))
    
player_all = None
for year, df in player_dfs.items():
    df = df.withColumn("year", lit('20'+year))
    player_all = df if player_all is None else player_all.union(df)

                                                                                

In [8]:
player_all.printSchema()
player_all.show(5, vertical=True)

root
 |-- sofifa_id: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: string (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: string (nullable = true)
 |-- club_contract_valid_until: string (nullable = true)
 |-- nationality_id: string (nullable = true)
 |-- nationality_name: string (nullable = 

22/03/24 12:53:08 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Traceback (most recent call last):                                  (0 + 1) / 1]
  File "/Users/xintongwu/miniforge3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/Users/xintongwu/miniforge3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/Users/xintongwu/miniforge3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/Users/xintongwu/miniforge3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241                  
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF                   
 club_jersey_number          | 10             

In [9]:
def writedb(db, db_name):
    db.write.format("jdbc")\
    .mode("overwrite")\
    .option("url", "jdbc:postgresql://localhost:5432/postgres")\
    .option("dbtable", db_name)\
    .option("user", "xintongwu")\
    .option("password", "")\
    .option("Driver", "org.postgresql.Driver")\
    .save()

In [10]:
# def writedb(db, db_name):
#     db.write.format("jdbc")\
#     .mode("overwrite")\
#     .option("url", "jdbc:postgresql://localhost:5432/postgres")\
#     .option("dbtable", db_name)\
#     .option("user", "jc")\
#     .option("password", "")\
#     .option("Driver", "org.postgresql.Driver")\
#     .save()

In [11]:
writedb(player_all, "fifa.player_all")

                                                                                