# DA 231o Data Engineering at Scale Project



# Expected Goals Model & Player Analysis

### Import Libraries

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

import pyspark
pyspark.__version__

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, regexp_replace, when

# Load input Dataset with Goals

In [3]:
# Mount the drive

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
# Read the csv file for visualization. We do not use this data anywhere in our model

data = pd.read_csv('/content/drive/Shareddrives/DES_Project/events.csv')

In [5]:
print(len(data))
print(data.is_goal.sum())
print(len(data.columns)-1)

941009
24446
21


In [6]:
data.columns

Index(['id_odsp', 'id_event', 'sort_order', 'time', 'text', 'event_type',
       'event_type2', 'side', 'event_team', 'opponent', 'player', 'player2',
       'player_in', 'player_out', 'shot_place', 'shot_outcome', 'is_goal',
       'location', 'bodypart', 'assist_method', 'situation', 'fast_break'],
      dtype='object')

In [7]:
# Create spark session

spark = SparkSession \
    .builder \
    .appName('xG Project Solution') \
    .getOrCreate()
  
spark

In [8]:
# Load the events and the info as spark sql dataframes

eventsDF = spark.read.option("header", True).option("inferSchema", True).option("nullable", 'NA').csv('/content/drive/Shareddrives/DES_Project/events.csv')
infoDF = spark.read.option("header", True).option("inferSchema", True).option("nullable", 'NA').csv('/content/drive/Shareddrives/DES_Project/ginf.csv')

In [9]:
# Print Schema

eventsDF.printSchema()

infoDF.printSchema()

root
 |-- id_odsp: string (nullable = true)
 |-- id_event: string (nullable = true)
 |-- sort_order: integer (nullable = true)
 |-- time: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- event_type: integer (nullable = true)
 |-- event_type2: string (nullable = true)
 |-- side: integer (nullable = true)
 |-- event_team: string (nullable = true)
 |-- opponent: string (nullable = true)
 |-- player: string (nullable = true)
 |-- player2: string (nullable = true)
 |-- player_in: string (nullable = true)
 |-- player_out: string (nullable = true)
 |-- shot_place: string (nullable = true)
 |-- shot_outcome: string (nullable = true)
 |-- is_goal: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- bodypart: string (nullable = true)
 |-- assist_method: integer (nullable = true)
 |-- situation: string (nullable = true)
 |-- fast_break: integer (nullable = true)

root
 |-- id_odsp: string (nullable = true)
 |-- link_odsp: string (nullable = true)
 |-- adv_stat

In [10]:
# Verify if the data loaded is correct and in the proper format

eventsDF.show(10)
type(eventsDF)

+---------+----------+----------+----+--------------------+----------+-----------+----+-----------------+-----------------+-------------------+----------------+---------+----------+----------+------------+-------+--------+--------+-------------+---------+----------+
|  id_odsp|  id_event|sort_order|time|                text|event_type|event_type2|side|       event_team|         opponent|             player|         player2|player_in|player_out|shot_place|shot_outcome|is_goal|location|bodypart|assist_method|situation|fast_break|
+---------+----------+----------+----+--------------------+----------+-----------+----+-----------------+-----------------+-------------------+----------------+---------+----------+----------+------------+-------+--------+--------+-------------+---------+----------+
|UFot0hit/| UFot0hit1|         1|   2|Attempt missed. M...|         1|         12|   2|       Hamburg SV|Borussia Dortmund|      mladen petric|     gokhan tore|       NA|        NA|         6|       

pyspark.sql.dataframe.DataFrame

In [11]:
# Join id_odsp, country, date columns from infoDF on id_odsp

eventsDF = eventsDF.join(infoDF[['id_odsp', 'country', 'date']], on='id_odsp', how='left')
eventsDF.printSchema()
eventsDF.show(10)

root
 |-- id_odsp: string (nullable = true)
 |-- id_event: string (nullable = true)
 |-- sort_order: integer (nullable = true)
 |-- time: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- event_type: integer (nullable = true)
 |-- event_type2: string (nullable = true)
 |-- side: integer (nullable = true)
 |-- event_team: string (nullable = true)
 |-- opponent: string (nullable = true)
 |-- player: string (nullable = true)
 |-- player2: string (nullable = true)
 |-- player_in: string (nullable = true)
 |-- player_out: string (nullable = true)
 |-- shot_place: string (nullable = true)
 |-- shot_outcome: string (nullable = true)
 |-- is_goal: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- bodypart: string (nullable = true)
 |-- assist_method: integer (nullable = true)
 |-- situation: string (nullable = true)
 |-- fast_break: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- date: timestamp (nullable = true)

+---------+---------

In [12]:
# Extract year from the timestamp and append the column

extract_year = lambda x: x.strftime("%Y")
extract_year_udf = udf(extract_year)
eventsDF = eventsDF.withColumn("year", extract_year_udf("date"))
eventsDF.show(10)

+---------+----------+----------+----+--------------------+----------+-----------+----+-----------------+-----------------+-------------------+----------------+---------+----------+----------+------------+-------+--------+--------+-------------+---------+----------+-------+-------------------+----+
|  id_odsp|  id_event|sort_order|time|                text|event_type|event_type2|side|       event_team|         opponent|             player|         player2|player_in|player_out|shot_place|shot_outcome|is_goal|location|bodypart|assist_method|situation|fast_break|country|               date|year|
+---------+----------+----------+----+--------------------+----------+-----------+----+-----------------+-----------------+-------------------+----------------+---------+----------+----------+------------+-------+--------+--------+-------------+---------+----------+-------+-------------------+----+
|UFot0hit/| UFot0hit1|         1|   2|Attempt missed. M...|         1|         12|   2|       Hambur

In [13]:
# Create shotsDF dataframe by filtering event type == 1

shotsDF = eventsDF.filter(eventsDF.event_type==1)
shotsDF = shotsDF.withColumn("player", udf(lambda x: x.title())("player"))
shotsDF = shotsDF.withColumn("player2", udf(lambda x: x.title())("player2"))
shotsDF = shotsDF.withColumn("country", udf(lambda x: x.title())("country"))
shotsDF.show(10)

+---------+----------+----------+----+--------------------+----------+-----------+----+-----------------+-----------------+-----------------+------------------+---------+----------+----------+------------+-------+--------+--------+-------------+---------+----------+-------+-------------------+----+
|  id_odsp|  id_event|sort_order|time|                text|event_type|event_type2|side|       event_team|         opponent|           player|           player2|player_in|player_out|shot_place|shot_outcome|is_goal|location|bodypart|assist_method|situation|fast_break|country|               date|year|
+---------+----------+----------+----+--------------------+----------+-----------+----+-----------------+-----------------+-----------------+------------------+---------+----------+----------+------------+-------+--------+--------+-------------+---------+----------+-------+-------------------+----+
|UFot0hit/| UFot0hit1|         1|   2|Attempt missed. M...|         1|         12|   2|       Hambur

In [14]:
# Filter the columns which have all the relevant information about the shot and predict if it is a goal or not
# is_goal is the true label. We will not use this in the training

input_cols = ['location', 'bodypart', 'assist_method', 'situation', 'is_goal']
required_cols = ['location', 'bodypart', 'assist_method', 'situation']

# Create dataDF dataframe with the selected input columns
dataDF = shotsDF.select(input_cols)

# Create string indexer list
indexer = []

# Convert each column into integer type using string indexer without losing any features
for column in required_cols:
  temp = StringIndexer(inputCol=column, outputCol=column+"_index")
  indexer.append(temp)

# Pipeline
pipeline = Pipeline(stages=indexer)
dataDF = pipeline.fit(dataDF).transform(dataDF)

dataDF.show(10)

+--------+--------+-------------+---------+-------+--------------+--------------+-------------------+---------------+
|location|bodypart|assist_method|situation|is_goal|location_index|bodypart_index|assist_method_index|situation_index|
+--------+--------+-------------+---------+-------+--------------+--------------+-------------------+---------------+
|       9|       2|            1|        1|      0|           2.0|           1.0|                0.0|            0.0|
|      15|       1|            1|        1|      0|           0.0|           0.0|                0.0|            0.0|
|       9|       2|            1|        1|      1|           2.0|           1.0|                0.0|            0.0|
|      15|       1|            0|        1|      0|           0.0|           0.0|                1.0|            0.0|
|      15|       1|            0|        1|      0|           0.0|           0.0|                1.0|            0.0|
|       3|       1|            1|        1|      0|     

# Extended Goals with PySpark Machine Learning

In [15]:
required_features = list(dataDF.columns[-4:]) # remove 'is_goal' because it is the label

assembler = VectorAssembler(inputCols=required_features, outputCol='features')
transformed_data = assembler.transform(dataDF)

transformed_data.show(10)

+--------+--------+-------------+---------+-------+--------------+--------------+-------------------+---------------+-----------------+
|location|bodypart|assist_method|situation|is_goal|location_index|bodypart_index|assist_method_index|situation_index|         features|
+--------+--------+-------------+---------+-------+--------------+--------------+-------------------+---------------+-----------------+
|       9|       2|            1|        1|      0|           2.0|           1.0|                0.0|            0.0|[2.0,1.0,0.0,0.0]|
|      15|       1|            1|        1|      0|           0.0|           0.0|                0.0|            0.0|        (4,[],[])|
|       9|       2|            1|        1|      1|           2.0|           1.0|                0.0|            0.0|[2.0,1.0,0.0,0.0]|
|      15|       1|            0|        1|      0|           0.0|           0.0|                1.0|            0.0|    (4,[2],[1.0])|
|      15|       1|            0|        1|     

In [16]:
# Split the data into training and testing data

(training_data, test_data) = transformed_data.randomSplit([0.8,0.2])

print("Total Data", transformed_data.count())
print("Total Training Data", training_data.count())
print("Total Test Data", test_data.count())

print("Training Dataset")
training_data.show(5)

Total Data 229135
Total Training Data 183495
Total Test Data 45640
Training Dataset
+--------+--------+-------------+---------+-------+--------------+--------------+-------------------+---------------+-----------------+
|location|bodypart|assist_method|situation|is_goal|location_index|bodypart_index|assist_method_index|situation_index|         features|
+--------+--------+-------------+---------+-------+--------------+--------------+-------------------+---------------+-----------------+
|      10|       1|            0|        1|      0|           5.0|           0.0|                1.0|            0.0|[5.0,0.0,1.0,0.0]|
|      10|       1|            0|        1|      0|           5.0|           0.0|                1.0|            0.0|[5.0,0.0,1.0,0.0]|
|      10|       1|            0|        1|      0|           5.0|           0.0|                1.0|            0.0|[5.0,0.0,1.0,0.0]|
|      10|       1|            0|        1|      0|           5.0|           0.0|                1.0

# Gradient-Boosted Trees (GBTs) Classifier

In [17]:
gbt = GBTClassifier(labelCol='is_goal', 
                            featuresCol='features',
                            maxDepth=5)

In [18]:
# Another classifier
# from pyspark.ml.classification import RandomForestClassifier
# gbt = RandomForestClassifier(labelCol='is_goal', 
#                             featuresCol='features',
#                             maxDepth=5)

In [19]:
# Fit the model on training data
model = gbt.fit(training_data)

# Predictions with Test Data

In [20]:
# Predictions with test data
predictions = model.transform(test_data)
predictions.show(5)

+--------+--------+-------------+---------+-------+--------------+--------------+-------------------+---------------+-----------------+--------------------+--------------------+----------+
|location|bodypart|assist_method|situation|is_goal|location_index|bodypart_index|assist_method_index|situation_index|         features|       rawPrediction|         probability|prediction|
+--------+--------+-------------+---------+-------+--------------+--------------+-------------------+---------------+-----------------+--------------------+--------------------+----------+
|      10|       1|            0|        1|      0|           5.0|           0.0|                1.0|            0.0|[5.0,0.0,1.0,0.0]|[0.58228792672286...|[0.76216317673189...|       0.0|
|      10|       1|            0|        1|      0|           5.0|           0.0|                1.0|            0.0|[5.0,0.0,1.0,0.0]|[0.58228792672286...|[0.76216317673189...|       0.0|
|      10|       1|            0|        1|      0|    

# Evaluate our model

In [21]:
# Evaluate our model
evaluator = MulticlassClassificationEvaluator(
    labelCol='is_goal', 
    predictionCol='prediction', 
    metricName='accuracy')

# Calculate Test Accuracy

In [22]:
accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)

Test Accuracy =  0.9086985100788781
