In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract, when,round

In [9]:
columns_t = ['id','match_id','index','player_id','player','position','type','shot_body_part','play_pattern',
 'shot_type','shot_technique','shot_outcome','shot_statsbomb_xg','location', 'shot_end_location', 'shot_freeze_frame',
 'shot_key_pass_id', 'under_pressure','shot_one_on_one', 'shot_deflected','shot_aerial_won','shot_redirect','shot_follows_dribble',
 'shot_saved_off_target', 'shot_first_time','shot_open_goal',]

In [29]:
spark = SparkSession.builder.appName('Clean_event_data').getOrCreate()

In [30]:
events = spark.read.csv('Data/events.csv',header=True,inferSchema=True,sep=';')

                                                                                

In [31]:
events.groupBy('type').count().orderBy('count',ascending=False).show(n=50,truncate=False)



+-----------------+-------+
|type             |count  |
+-----------------+-------+
|Pass             |2434509|
|Ball Receipt*    |2293313|
|Carry            |1868736|
|Pressure         |792485 |
|Ball Recovery    |249876 |
|Duel             |187690 |
|Clearance        |110176 |
|Block            |93904  |
|Dribble          |82105  |
|Goal Keeper      |74663  |
|Foul Committed   |73087  |
|Foul Won         |69643  |
|Miscontrol       |68593  |
|Shot             |61608  |
|Dispossessed     |60954  |
|Interception     |57124  |
|Dribbled Past    |50262  |
|Substitution     |15324  |
|Half End         |9972   |
|Half Start       |9972   |
|Injury Stoppage  |9467   |
|50/50            |8314   |
|Tactical Shift   |6604   |
|Starting XI      |4900   |
|Shield           |3357   |
|Referee Ball-Drop|2574   |
|Player Off       |2411   |
|Player On        |2390   |
|Bad Behaviour    |1990   |
|Camera On        |1837   |
|Error            |1091   |
|Offside          |873    |
|Camera off       |4

                                                                                

In [39]:
events_pass = events.filter(events.shot_key_pass_id.isNotNull())
events_shot = events.filter(events.type == 'Shot')

In [41]:
size_pass = events_pass.count()
size_shot = events_shot.count()

                                                                                

### Split Location

In [13]:
events_shot = events_shot.withColumn("shot_location_x", regexp_extract(col("location"), r'\[(.*?),', 1).cast("float")) \
             .withColumn("shot_location_y", regexp_extract(col("location"), r', (.*?)\]', 1).cast("float"))

events_shot.select('location','shot_location_x','shot_location_y').show(5)

+-------------+---------------+---------------+
|     location|shot_location_x|shot_location_y|
+-------------+---------------+---------------+
|[105.6, 44.0]|          105.6|           44.0|
| [98.6, 25.2]|           98.6|           25.2|
|[105.5, 47.3]|          105.5|           47.3|
|[113.4, 38.7]|          113.4|           38.7|
|[106.2, 36.8]|          106.2|           36.8|
+-------------+---------------+---------------+
only showing top 5 rows



### Split End Location

In [14]:
# Extract x, y, and z coordinates from shot_end_location
events_shot = events_shot \
    .withColumn("shot_end_location_x", regexp_extract(col("shot_end_location"), r'\[(.*?),', 1).cast("float")) \
    .withColumn("shot_end_location_y", regexp_extract(col("shot_end_location"), r', (.*?)(,|\])', 1).cast("float")) \
    .withColumn("shot_end_location_z", when(
        col("shot_end_location").rlike(r', (.*?), (.*?)\]'),  # Checks for three values
        regexp_extract(col("shot_end_location"), r', (.*?), (.*?)\]', 2).cast("float")
    ).otherwise(0.0))  # Sets z to 0 if not present

# Round 'shot_end_location_z' to 2 decimal places
events_shot = events_shot.withColumn("shot_end_location_z", round(col("shot_end_location_z"), 2))

# Display the updated DataFrame
events_shot.select('shot_end_location', 'shot_end_location_x', 'shot_end_location_y', 'shot_end_location_z').show(10)

+------------------+-------------------+-------------------+-------------------+
| shot_end_location|shot_end_location_x|shot_end_location_y|shot_end_location_z|
+------------------+-------------------+-------------------+-------------------+
|[120.0, 31.9, 0.5]|              120.0|               31.9|                0.5|
|[117.9, 37.0, 0.3]|              117.9|               37.0|                0.3|
|[120.0, 35.9, 1.3]|              120.0|               35.9|                1.3|
|[118.8, 38.4, 0.9]|              118.8|               38.4|                0.9|
|     [112.1, 36.8]|              112.1|               36.8|                0.0|
|     [109.0, 38.9]|              109.0|               38.9|                0.0|
|[119.0, 37.0, 1.4]|              119.0|               37.0|                1.4|
|     [113.3, 42.2]|              113.3|               42.2|                0.0|
|     [118.7, 33.6]|              118.7|               33.6|                0.0|
|[118.4, 42.8, 0.2]|        

In [15]:
# Split location 
# split shot end location # consider ground shots
# Distance to goal
# Angle to goal
# solution for shot_freeze_frame

# Exclude penalty shotouts
# players between shooter and goal
# drop type
# distribution of xg based on shot type etc
# function to plot all shots
# % of the shot outcome
# shot_statsbomb_xg
# some predictions are negative, fix it
# make sure all predictions are strictly between 0 and 1

In [23]:
# doing a regression model for xg based on the location x and y
# using multiple models
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor, DecisionTreeRegressor, GeneralizedLinearRegression, IsotonicRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# Split the data into training and test sets
# Combine features into a single vector
assembler = VectorAssembler(inputCols=["shot_location_x", "shot_location_y"], outputCol="features")
data = assembler.transform(events_shot).select("features", "shot_statsbomb_xg")

# Split into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Initialize all models
lr = LinearRegression(labelCol="shot_statsbomb_xg", featuresCol="features")
rf = RandomForestRegressor(labelCol="shot_statsbomb_xg", featuresCol="features")
gbt = GBTRegressor(labelCol="shot_statsbomb_xg", featuresCol="features")
dt = DecisionTreeRegressor(labelCol="shot_statsbomb_xg", featuresCol="features")
glr = GeneralizedLinearRegression(labelCol="shot_statsbomb_xg", featuresCol="features")
ir = IsotonicRegression(labelCol="shot_statsbomb_xg", featuresCol="features")

In [24]:
# Create a list of models
models = [lr, rf, gbt, dt, glr, ir]

# Create a list of model names
model_names = ["Linear Regression", "Random Forest", "Gradient Boosted Trees", "Decision Tree", "Generalized Linear Regression", "Isotonic Regression"]

# Create a list of model pipelines
model_pipelines = [Pipeline(stages=[model]) for model in models]

# Fit each model pipeline
fitted_models = [pipeline.fit(train_data) for pipeline in model_pipelines]

# Evaluate each model
evaluator = RegressionEvaluator(labelCol="shot_statsbomb_xg", predictionCol="prediction", metricName="rmse")
predictions = [fitted_model.transform(test_data) for fitted_model in fitted_models]
rmse = [evaluator.evaluate(prediction) for prediction in predictions]

# Display the RMSE of each model
for i in range(len(models)):
    print(model_names[i] + " RMSE: " + str(rmse[i]))

24/12/19 15:31:54 WARN Instrumentation: [c7ade2e6] regParam is zero, which might cause numerical instability and overfitting.
24/12/19 15:31:56 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/19 15:31:59 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/12/19 15:33:02 WARN Instrumentation: [9a48c959] regParam is zero, which might cause numerical instability and overfitting.

Linear Regression RMSE: 0.1352502063904327
Random Forest RMSE: 0.10375812622622416
Gradient Boosted Trees RMSE: 0.09228774312790226
Decision Tree RMSE: 0.10711748587468688
Generalized Linear Regression RMSE: 0.1352502063904327
Isotonic Regression RMSE: 0.1314889382718718


                                                                                

In [40]:
from pyspark.sql.functions import col, round, sqrt, format_number

# Calculate RMSE for each row, round it to 7 decimals, and add as a new column
predictions[2].select(
    col('shot_statsbomb_xg').alias('xg'),
    col('prediction').alias('pred')
) \
    .withColumn('xg', round('xg', 7)) \
    .withColumn('pred', round('pred', 7)) \
    .withColumn('rmse', format_number(sqrt((col('xg') - col('pred'))**2), 7)) \
    .show(50)


[Stage 283:>                                                        (0 + 1) / 1]

+---------+---------+---------+
|       xg|     pred|     rmse|
+---------+---------+---------+
|0.0027236|0.0056996|0.0029760|
|0.0055647|0.0076678|0.0021031|
|0.0034012|0.0080524|0.0046512|
|0.0035249|0.0032214|0.0003035|
|0.0045202|0.0092226|0.0047024|
|0.0061508|0.0127276|0.0065768|
|0.0058455|0.0080524|0.0022069|
|0.0063212| 0.008532|0.0022108|
| 0.005861|0.0130612|0.0072002|
|0.0112379|0.0160351|0.0047972|
|0.0078782|0.0130612|0.0051830|
|0.0142051|0.0160351|0.0018300|
|0.0179917|0.0168994|0.0010923|
|0.0131978|0.0150162|0.0018184|
|0.0062868| 0.007324|0.0010372|
|0.0283282| 0.007324|0.0210042|
|0.0252688|0.0168994|0.0083694|
|0.0117378|0.0261074|0.0143696|
|0.0142668|0.0270567|0.0127899|
|0.0202336|0.0284603|0.0082267|
|0.0391188|0.0284603|0.0106585|
|0.0071529|0.0098645|0.0027116|
|0.0226101|0.0216741|0.0009360|
|0.0428493|0.0289398|0.0139095|
|0.0202187|0.0208941|0.0006754|
|0.0147919|0.0216741|0.0068822|
|0.0309246|0.0284603|0.0024643|
|0.0202455| 0.027293|0.0070475|
|0.04233

                                                                                

24/12/19 18:22:40 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3633125 ms exceeds timeout 120000 ms
24/12/19 18:22:40 WARN SparkContext: Killing executors is not supported by current scheduler.
24/12/19 18:22:47 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$