# Assessment: Analyzing Soccer Data

TODO: add intro

## 0. Set-up

### a. Load linting tool, create spark session, etc.

In [1]:
%load_ext pycodestyle_magic

In [1]:
#%%pycodestyle
from pyspark.sql import SparkSession
import pyspark.sql.functions as func

# Create spark app
spark = (
    SparkSession
    .builder
    .appName("rb assessment app")
    .getOrCreate()
)


### a. Data Engineering for Task 1

In [3]:
#%%pycodestyle
import pyspark.sql.functions as func

bl_results_spark_df = (
    # read matches folder
    spark
    .read
    .json(
        "data/matches/*/",
        multiLine=True
    )
    # filter 1. Bundesliga 2015/16 matches
    .where(
        func.col("competition.competition_name").eqNullSafe("1. Bundesliga")
        & func.col("season.season_name").eqNullSafe("2015/2016")
    )
    # order by date to make sure that the ELO function is used approprietly
    .orderBy(func.to_date("match_date"))
    # perform the mapping to get one row for each team and match
    # the associated result is:
    # - win = 1
    # - draw = 0.5
    # - loss = 0
    .select(
        func.col(
            "home_team.home_team_name"
        ).alias("home_team_name"),
        func.col(
            "away_team.away_team_name"
        ).alias("away_team_name"),
        func.coalesce(
            func.when(func.expr("home_score > away_score"), 1),
            func.when(func.expr("home_score < away_score"), 0),
            func.lit(0.5)
        ).alias("home_team_result"),
        func.coalesce(
            func.when(func.expr("home_score > away_score"), 0),
            func.when(func.expr("home_score < away_score"), 1),
            func.lit(0.5)
        ).alias("away_team_result")
    )
)

# print schema for verification
bl_results_spark_df.printSchema()

# cache result to pandas df
bl_result_df = bl_results_spark_df.toPandas()
# display resulting df
print(bl_result_df)

root
 |-- home_team_name: string (nullable = true)
 |-- away_team_name: string (nullable = true)
 |-- home_team_result: double (nullable = false)
 |-- away_team_result: double (nullable = false)

    home_team_name            away_team_name  home_team_result  \
0    Bayern Munich              Hamburger SV               1.0   
1         Augsburg             Hertha Berlin               0.0   
2    Werder Bremen                Schalke 04               0.0   
3     FSV Mainz 05                Ingolstadt               0.0   
4     Darmstadt 98               Hannover 96               0.5   
..             ...                       ...               ...   
301   Darmstadt 98  Borussia Mönchengladbach               0.0   
302  Bayern Munich               Hannover 96               1.0   
303   FSV Mainz 05             Hertha Berlin               0.5   
304  Werder Bremen       Eintracht Frankfurt               1.0   
305      Wolfsburg             VfB Stuttgart               1.0   

     away_t

## 1. Task 1 - Elo Rating

### a. Develop a function to implement the ELO Rating System with arbitrary K and s.

In [4]:
#%%pycodestyle
import math
import operator


def predict(rating_a, rating_b, s=15):
    """Calculate the expected outcome for team A given ratings."""
    return 1 / (1 + math.pow(10, -(rating_a - rating_b) / s))


def update_elo(rating_a, rating_b, outcome, s=15, K=15):
    """Update the ELO rating of a team after a match."""
    pred = predict(rating_a, rating_b, s)
    return rating_a + K * (outcome - pred)


def process_match(ratings, home_team, away_team, home_result, away_result,
                  s, K):
    """Update ratings after processing a match."""
    # Update home team rating
    ratings[home_team] = update_elo(
        ratings[home_team], ratings[away_team], home_result, s, K
    )
    # Update away team rating
    ratings[away_team] = update_elo(
        ratings[away_team], ratings[home_team], away_result, s, K
    )
    return ratings


def initialize_ratings(teams, initial_rating=100):
    """Initialize the ratings for all teams."""
    return {team_name: initial_rating for team_name in teams}


def elo(results, s=15, K=15, R_0=100):
    """Calculate the final ELO ratings for all teams."""
    # Initialize ratings
    ratings = initialize_ratings(results["home_team_name"].unique(), R_0)
    for row in results.itertuples(index=False):
        # Update ratings based on the match result
        process_match(
            ratings,
            row.home_team_name,
            row.away_team_name,
            row.home_team_result,
            row.away_team_result,
            s, K
        )
    # Sort the ratings in descending order
    return dict(
        sorted(ratings.items(), key=operator.itemgetter(1), reverse=True)
    )


### b. Apply the rating system to 1. Bundesliga 2015/2016 Season with starting values R0 = 100, s = 15 and K = 15.

In [5]:
elo(bl_result_df)

{'Borussia Mönchengladbach': 128.73684750507124,
 'Bayern Munich': 126.03882090784246,
 'Werder Bremen': 122.53431008967196,
 'Bayer Leverkusen': 119.86744451903735,
 'Schalke 04': 116.7404982618847,
 'Eintracht Frankfurt': 115.13792895836448,
 'Borussia Dortmund': 113.14276236567889,
 'FC Köln': 112.7931080154221,
 'Hannover 96': 112.683637686516,
 'Darmstadt 98': 104.6708369599969,
 'Hoffenheim': 104.14444368636025,
 'Ingolstadt': 104.05781546247819,
 'Hamburger SV': 103.82508894341434,
 'Wolfsburg': 103.54107736006338,
 'Augsburg': 100.16514926109396,
 'FSV Mainz 05': 98.21672472241177,
 'Hertha Berlin': 96.401623690483,
 'VfB Stuttgart': 88.0062841309003}

### c. Develop an approach that finds the optimal values for s and K based on that season and display the final ranking table at the end of the season.

We could take the assumption that the optimal values for parameters s and K are the one that would minimize the discrepancy between the predicted outcome and actual match result for each step of that particular season.

Let's define our loss function as the average brier score of our ELO Rating System.

In [6]:
#%%pycodestyle

def brier_score(results, s=15, K=15, R_0=100):
    """
    Calculate the Brier score based on the predicted outcomes and actual
    results.
    """
    ratings = initialize_ratings(results["home_team_name"].unique(), R_0)
    total_brier_score = 0
    for row in results.itertuples(index=False):
        # Get teams predictions
        home_rating = ratings[row.home_team_name]
        away_rating = ratings[row.away_team_name]

        home_pred = predict(home_rating, away_rating, s)
        away_pred = 1 - home_pred

        # Calculate Brier score for both teams
        total_brier_score += (home_pred - row.home_team_result) ** 2
        total_brier_score += (away_pred - row.away_team_result) ** 2

        # Update ratings after the match
        process_match(
            ratings,
            row.home_team_name,
            row.away_team_name,
            row.home_team_result,
            row.away_team_result,
            s, K
        )

    # Return the average Brier score
    return total_brier_score / (2 * len(results))

brier_score(bl_result_df)


0.23717873793811076

We are going to try finding good values for s and K params for brier_score in terms of brier score using bayesian optimization.
It is a sampling method based on exploration and optimization.

In [17]:
#%%pycodestyle
from functools import partial
from bayes_opt import BayesianOptimization

# Bounded region of parameter space
pbounds = {'s': (15, 300), 'K': (15, 30)}

def neg_brier_score(s, K):
    return - brier_score(bl_result_df, s=s, K=K)

optimizer = BayesianOptimization(
    f=neg_brier_score,
    pbounds=pbounds,
    random_state=1,
    verbose=1
)

optimizer.maximize(
    init_points=32,
    n_iter=128,
)

|   iter    |  target   |     K     |     s     |
-------------------------------------------------
| [35m7        [39m | [35m-0.178   [39m | [35m18.07    [39m | [35m265.3    [39m |
| [35m8        [39m | [35m-0.1779  [39m | [35m15.41    [39m | [35m206.1    [39m |
| [35m44       [39m | [35m-0.1779  [39m | [35m22.83    [39m | [35m300.0    [39m |
| [35m59       [39m | [35m-0.1779  [39m | [35m15.0     [39m | [35m200.4    [39m |
| [35m94       [39m | [35m-0.1779  [39m | [35m18.18    [39m | [35m241.4    [39m |
| [35m130      [39m | [35m-0.1779  [39m | [35m20.45    [39m | [35m271.1    [39m |


In [18]:
res = optimizer.max
print(res)

{'target': np.float64(-0.17791904423717828), 'params': {'K': np.float64(20.44873714712105), 's': np.float64(271.1405558615955)}}


In [19]:
elo(bl_result_df, s=res["params"]["s"], K=res["params"]["K"])

{'Bayern Munich': np.float64(239.10714701168303),
 'Borussia Dortmund': np.float64(202.74555857602832),
 'Bayer Leverkusen': np.float64(156.15384473399382),
 'Borussia Mönchengladbach': np.float64(135.36332335175678),
 'Schalke 04': np.float64(117.98494348740996),
 'FSV Mainz 05': np.float64(106.89958701018837),
 'FC Köln': np.float64(95.41208312462189),
 'Hertha Berlin': np.float64(95.35379411470757),
 'Wolfsburg': np.float64(83.21128855250517),
 'Augsburg': np.float64(81.1916679150901),
 'Hoffenheim': np.float64(80.0920946039107),
 'Werder Bremen': np.float64(77.97082248179228),
 'Hamburger SV': np.float64(74.23642223799513),
 'Darmstadt 98': np.float64(71.19245828494051),
 'Ingolstadt': np.float64(71.0165776348987),
 'Eintracht Frankfurt': np.float64(70.6169902683531),
 'VfB Stuttgart': np.float64(37.66955722879909),
 'Hannover 96': np.float64(19.194387968792206)}

As we can see, the final ranking at the end of the season is strongly correlated with the actual league table.
But, we could try optimizing the parameters a bit further using gradient descent from those starting parameters.
Gradient descent is a standard practice in machine learning to find parameters that allow a function to reach a minimum in terms of loss.

In [35]:
#%%pycodestyle
import torch
from torch.optim import Adam


# Define the PyTorch version of predict
def predict_torch(rating_a, rating_b, s):
    return 1 / (1 + torch.pow(10, -(rating_a - rating_b) / s))


# Define the PyTorch version of update_elo
def update_elo_torch(rating_a, rating_b, outcome, s, K):
    pred = predict_torch(rating_a, rating_b, s)
    return rating_a + K * (outcome - pred)


# Convert the brier_score function to work with PyTorch tensors
def brier_score_torch(results_df, s, K):
    # Initialize ratings with PyTorch tensors
    unique_teams = results_df["home_team_name"].unique()
    ratings = {
        team_name: torch.tensor(100.0, requires_grad=False)
        for team_name in unique_teams
    }

    total_brier_score = torch.tensor(0.0, dtype=torch.float32)

    for row in results_df.itertuples(index=False):
        home_rating = ratings[row.home_team_name]
        away_rating = ratings[row.away_team_name]

        # Use the torch-based predict function
        home_pred = predict_torch(home_rating, away_rating, s)
        away_pred = 1 - home_pred

        home_team_result = torch.tensor(
            [row.home_team_result], dtype=torch.float32
        )
        away_team_result = torch.tensor(
            [row.away_team_result], dtype=torch.float32
        )

        # Accumulate Brier score and ensure correct shape
        total_brier_score = total_brier_score \
            + torch.pow(home_pred - home_team_result, 2)
        total_brier_score = total_brier_score \
            + torch.pow(away_pred - away_team_result, 2)

        # Update ratings with torch-based update function
        ratings[row.home_team_name] = update_elo_torch(
            home_rating, away_rating, row.home_team_result, s, K
        )
        ratings[row.away_team_name] = update_elo_torch(
            away_rating, home_rating, row.away_team_result, s, K
        )

    # Return the average Brier score as a tensor
    return total_brier_score / (2 * len(results_df))


# Initial values for s and K as torch tensors (requires gradients)
s = torch.tensor([res["params"]["s"]], requires_grad=True)
K = torch.tensor([res["params"]["K"]], requires_grad=True)

# Define the optimizer
optimizer = Adam([s, K], lr=0.001)

# Number of gradient descent steps
n_iterations = 500

# Perform gradient descent
for i in range(n_iterations):
    optimizer.zero_grad()  # Zero the gradients from the previous iteration

    # Calculate the Brier score
    loss = brier_score_torch(bl_result_df, s, K)

    # Backpropagation to compute gradients
    loss.backward()

    # Perform a step of gradient descent
    optimizer.step()

    # Optionally, print progress
    if i % 100 == 0:
        print(f"Iteration {i}: Brier score = {loss.item()}, s = {s.item()}, K = {K.item()}")

# Final optimized values of s and K
print(f"Optimized s: {s.item()}, Optimized K: {K.item()}")


Iteration 0: Brier score = 0.17795345066574975, s = 271.14154310709904, K = 20.447738120511808
Iteration 100: Brier score = 0.17795254370403202, s = 271.2349039633821, K = 20.35309214544007
Iteration 200: Brier score = 0.17795199812485302, s = 271.3126315462579, K = 20.273932400837435
Iteration 300: Brier score = 0.17795170632470167, s = 271.3735168370143, K = 20.21162764201154
Iteration 400: Brier score = 0.17795156975407675, s = 271.4180023744578, K = 20.16588990044537
Optimized s: 271.4477628525792, Optimized K: 20.135156178137716


In [23]:
elo(bl_result_df, s=s.item(), K=K.item())

{'Bayern Munich': 238.11614695578442,
 'Borussia Dortmund': 202.10731690794827,
 'Bayer Leverkusen': 155.58162112932683,
 'Borussia Mönchengladbach': 134.8976205950394,
 'Schalke 04': 117.80211535227875,
 'FSV Mainz 05': 106.9566223178877,
 'Hertha Berlin': 95.6603558234831,
 'FC Köln': 95.37158298186573,
 'Wolfsburg': 83.44786249701849,
 'Augsburg': 81.22629511106862,
 'Hoffenheim': 80.08479250444809,
 'Werder Bremen': 77.92219184991505,
 'Hamburger SV': 74.46172196837092,
 'Darmstadt 98': 71.39588595656109,
 'Ingolstadt': 71.31005191484432,
 'Eintracht Frankfurt': 70.64756396595439,
 'VfB Stuttgart': 38.3064811665141,
 'Hannover 96': 19.632316449702756}

Gradient descent does not change the final rankings, but it found a slightly lower K value and higher s value indincating slower ELO updates and more conservative outcome predictions.

## 2. Task 2 - Research Problem Corners

### a. Use the whole data set of Free Statsbomb Data to help you find general offensive trends.

In [3]:
events = (
    spark
    .read
    .json(
        "data/events/",
        multiLine=True
    )
    .withColumn("match_id", func.regexp_extract(func.input_file_name(), "/(\\d+)\\.json$", 1))
)

In [4]:
matches = (
    spark
    .read
    .json(
        "data/matches/*/",
        multiLine=True
    )
)

In [6]:
from pyspark.sql import Window

window_spec_xg = Window.partitionBy("match_id", "possession").orderBy("minute", "second")
window_spec_events = Window.partitionBy("match_id", "period").orderBy("minute", "second")

start_y = func.col("location").getItem(1)
end_x = func.col("pass.end_location").getItem(0)
end_y = func.col("pass.end_location").getItem(1)
right_side_corner = (start_y > 40)
inswinging = (
    (right_side_corner & func.col("pass.body_part.name").eqNullSafe("Left Foot"))
    | (~right_side_corner & func.col("pass.body_part.name").eqNullSafe("Right Foot"))
)

corners = (
    events
    .withColumn(
        "xg",
        func.coalesce(
            func.first("shot.statsbomb_xg", ignorenulls=True).over(window_spec_xg.rowsBetween(Window.currentRow, 5)),
            func.lit(0)
        )
    )
    .withColumn(
        "next_event_id",
        func.lag("id", -1).over(window_spec_events)
    )
    .withColumn(
        "next_event_team",
        func.lag("team.name", -1).over(window_spec_events)
    )
    .where("pass.type.name='Corner'")
    .join(matches, on="match_id", how="left")
    .select(
        "match_id",
        func.col("season.season_name").alias("season"),
        func.col("id").alias("event_id"),
        func.col("team.name").alias("team"), 
        "next_event_id",
        "next_event_team",
        right_side_corner.alias("y_symetry_applied"),
        end_x.alias("end_x"), 
        # horizontal symmetry
        func.when(right_side_corner, 80 - end_y).otherwise(end_y).alias("end_y"),
        # horizontal symmetry
        func.when(inswinging, "Inswinging").otherwise("Outswinging").alias("technique"),
        "xg"
    )
)

corners.write.parquet("data/processed/corners", mode="overwrite")

In [7]:
corners = (
    spark
    .read
    .parquet("data/processed/corners")
)

In [4]:
# Step 1: Prepare the data (use 'end_x' and 'end_y')
X = corners.where("technique='Outswinging'").toPandas()[['end_x', 'end_y']].values  # Convert to NumPy array for sklearn

from sklearn.mixture import GaussianMixture
from sklearn.model_selection import GridSearchCV


def gmm_bic_score(estimator, X):
    """Callable to pass to GridSearchCV that will use the BIC score."""
    # Make it negative since GridSearchCV expects a score to maximize
    return -estimator.bic(X)


param_grid = {
    "n_components": range(1, 7),
    "covariance_type": ["full", "tied", "spherical"],
}
grid_search = GridSearchCV(
    GaussianMixture(), param_grid=param_grid, scoring=gmm_bic_score
)
grid_search.fit(X)

  _data = np.array(data, dtype=dtype, copy=copy,


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Ellipse
from scipy import linalg

color_iter = sns.color_palette("tab10", 10)[::-1]
Y_ = grid_search.predict(X)

fig, ax = plt.subplots()

for i, (mean, cov, color) in enumerate(
    zip(
        grid_search.best_estimator_.means_,
        grid_search.best_estimator_.covariances_,
        color_iter,
    )
):
    if grid_search.best_estimator_.covariance_type == "diag":
        v = cov
        w = np.eye(len(cov))
    else:
        v, w = linalg.eigh(cov)
    if not np.any(Y_ == i):
        continue
    plt.scatter(X[Y_ == i, 0], X[Y_ == i, 1], 0.8, color=color)

    angle = np.arctan2(w[0][1], w[0][0])
    angle = 180.0 * angle / np.pi  # convert to degrees
    v = 2.0 * np.sqrt(2.0) * np.sqrt(v)
    ellipse = Ellipse(mean, v[0], v[1], angle=180.0 + angle, color=color, label=f"Component {i}")
    ellipse.set_clip_box(fig.bbox)
    ellipse.set_alpha(0.5)
    ax.add_artist(ellipse)

plt.title(
    f"Selected GMM: {grid_search.best_params_['covariance_type']} model, "
    f"{grid_search.best_params_['n_components']} components"
)
plt.xlim(60, 120)
plt.ylim(0, 80)
plt.legend()
plt.show()

In [151]:
df = corners.where("technique='Inswinging'").toPandas()
components = [f"Component {i}" for i in range(len(grid_search.best_estimator_.means_))]
df[components] = grid_search.predict_proba(df[['end_x', 'end_y']].values)
df[components].multiply(df["xg"], axis="rows").sum() / df[components].sum()

Component 0    0.026208
Component 1    0.001939
Component 2    0.018886
Component 3    0.006123
Component 4    0.026458
Component 5    0.003990
dtype: float64

In [152]:
df[components].sum() / df[components].sum().sum() * 100

Component 0    34.327230
Component 1    10.735656
Component 2    29.139784
Component 3     4.681218
Component 4    12.071740
Component 5     9.044373
dtype: float64

In [8]:
import pyspark.sql.types as types
schema = types.StructType([
    types.StructField('event_uuid', types.StringType()),
    types.StructField(
        'freeze_frame',
        types.ArrayType(
            types.StructType([
                types.StructField('actor', types.BooleanType()),
                types.StructField('keeper', types.BooleanType()),
                types.StructField('location', types.ArrayType(types.DoubleType())),
                types.StructField('teammate', types.BooleanType())
            ])
        )
    ), 
    types.StructField('visible_area', types.ArrayType(types.DoubleType()))
])

three_sixty = (
    spark
    .read
    .schema(schema)
    .json("data/three-sixty/", multiLine=True)
)

In [3]:
from pyspark.sql import Window

players = (
    corners.alias("corner")
    .join(
        three_sixty.alias("event"),
        on=func.expr("corner.event_id=event.event_uuid"),
        how="left"
    )
    .select(
        "event.event_uuid",
        func.posexplode("event.freeze_frame").alias("player_id", "player")
    )
)

team = (
    players
    .where("player.teammate AND not player.keeper AND not player.actor")
     .withColumn(
        "n_players_in_box", 
        func.sum(
            func.when(
                func.col("player.location").getItem(0).between(114, 120)
                & func.col("player.location").getItem(1).between(30, 50),
                1
            ).otherwise(0)
        ).over(Window.partitionBy("event_uuid"))
    )
    .alias("team")
)
opponent = (
    players
    .where("not player.teammate AND not player.keeper")
    .withColumn(
        "n_players_in_box", 
        func.sum(
            func.when(
                func.col("player.location").getItem(0).between(114, 120)
                & func.col("player.location").getItem(1).between(30, 50),
                1
            ).otherwise(0)
        ).over(Window.partitionBy("event_uuid"))
    )
    .alias("opponent")
)
pairing = (
    team.join(opponent, "event_uuid", "left")
    .select(
        "event_uuid",
        func.col("team.player_id"),
        func.col("opponent.player_id").alias("opponent_player_id"),
        func.col("team.n_players_in_box").alias("n_team_players_in_box"),
        func.col("opponent.n_players_in_box").alias("n_opponent_players_in_box"),
        func.sqrt(
            (func.col("team.player.location").getItem(0) - func.col("opponent.player.location").getItem(0)) ** 2
            + (func.col("team.player.location").getItem(1) - func.col("opponent.player.location").getItem(1)) ** 2
        ).alias("distance")
    )
    .withColumn(
        "closest_distance",
        func.min("distance").over(Window.partitionBy("event_uuid", "player_id"))
    )
    .groupBy("event_uuid")
    .agg(
        func.median("closest_distance").alias("median_marking_distance"),
        func.first("n_team_players_in_box").alias("n_team_players_in_box"),
        func.first("n_opponent_players_in_box").alias("n_opponent_players_in_box")
    )
)

pairing.write.parquet("data/processed/pairing", mode="overwrite")

SyntaxError: invalid syntax. Perhaps you forgot a comma? (3601044214.py, line 47)

In [2]:
pairing = (
    spark
    .read
    .parquet("data/processed/pairing")
)

In [30]:
df = (
    corners.alias("corner")
    .join(
        pairing.alias("event"),
        on=func.expr("corner.event_id=event.event_uuid"),
        how="inner"
    )
    .drop("event_uuid")
    #.join(
    #    three_sixty.alias("next_event"),
    #    on=func.expr("corner.next_event_id=next_event.event_uuid"),
    #    how="inner"
    #)
    .select(
        "corner.*",
        "event.*",
        #func.col("event.freeze_frame"),
        #func.col("next_event.freeze_frame").alias("next_freeze_frame")
    )
)

df.write.parquet("data/processed/corners_v2", mode="overwrite")

df = (
    spark
    .read
    .parquet("data/processed/corners_v2")
)

In [31]:
# Step 1: Prepare the data (use 'end_x' and 'end_y')
X = df.toPandas()[['median_marking_distance', 'n_team_players_in_box', 'n_opponent_players_in_box']].values  # Convert to NumPy array for sklearn

from sklearn.mixture import GaussianMixture
from sklearn.model_selection import GridSearchCV


def gmm_bic_score(estimator, X):
    """Callable to pass to GridSearchCV that will use the BIC score."""
    # Make it negative since GridSearchCV expects a score to maximize
    return -estimator.bic(X)


param_grid = {
    "n_components": [2],
    "covariance_type": ["full", "tied", "spherical"],
}
grid_search = GridSearchCV(
    GaussianMixture(), param_grid=param_grid, scoring=gmm_bic_score
)
grid_search.fit(X)

In [32]:
grid_search.best_estimator_.means_

array([[3.15204602, 0.78435947, 3.77071265],
       [1.7594572 , 1.91636021, 4.1663924 ]])