In [22]:
import itertools
import os
import time
from datetime import datetime, timedelta, timezone

import numpy as np
import pandas as pd
import pytz
import requests
from IPython.display import HTML, display
from pandas import DataFrame
from scipy.optimize import minimize
from scipy.stats import poisson


In [23]:
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"
}

# Avoid line breaks by Jupyter (following https://stackoverflow.com/a/70433850/7395592)

display(HTML("<style>div.jp-OutputArea-output pre {white-space: pre;}</style>"))

# Avoid premature line breaks by Numpy and show all array entries

np.set_printoptions(linewidth=np.inf, threshold=np.inf)

np.set_printoptions(suppress=True)

In [24]:
def get_nested_value(d, keys, default=None):
    for key in keys:
        if isinstance(d, dict):
            d = d.get(key, default)
        else:
            return default
    return d

In [25]:
def get_all_match_data(uefa_config: dict, limit: int = 100):

    competition_id = uefa_config["competition_id"]
    season_years = uefa_config["season_years"]

    data = []
    for season_year in season_years:

        params = {"limit": limit}
        if competition_id:
            params.update({"competitionId": competition_id})
        if season_year:
            params.update({"seasonYear": season_year})

        data_season = []
        for offset in range(0, limit * 5, limit):

            params_iter = {"offset": offset}
            _params = params | params_iter
            response = requests.get(
                uefa["url_matches"], params=_params, headers=headers
            )

            data_iter = response.json()
            if not data_iter:
                break

            data_season += data_iter

        data += data_season

    data_reduced = []
    for record in data:

        datetime_str = get_nested_value(record, ["kickOffTime", "dateTime"])
        datetime_offset_str = get_nested_value(
            record, ["kickOffTime", "utcOffsetInHours"]
        )
        if datetime_str is None:
            is_completed = False
            parsed_date = None
        else:
            parsed_date = datetime.fromisoformat(
                datetime_str.replace("Z", "+00:00")
            ) + timedelta(hours=datetime_offset_str)
            current_date = datetime.now(timezone.utc)
            is_completed = parsed_date < current_date

        home_score_total = get_nested_value(record, ["score", "total", "home"])
        away_score_total = get_nested_value(record, ["score", "total", "away"])

        home_team_code = get_nested_value(record, ["homeTeam", "teamCode"])
        away_team_code = get_nested_value(record, ["awayTeam", "teamCode"])

        score_summary = (
            None
            if home_score_total is None
            else " ".join(
                [
                    home_team_code,
                    str(home_score_total),
                    "-",
                    str(away_score_total),
                    away_team_code,
                ]
            )
        )

        result = (
            None
            if home_score_total is None
            else (
                "H"
                if home_score_total > away_score_total
                else "A" if home_score_total < away_score_total else "D"
            )
        )

        status = get_nested_value(record, ["round", "status"])
        if status == "UPCOMING":
            continue

        record_reduced = {
            "id": record["id"],
            "season_year": record["seasonYear"],
            "date": parsed_date,
            # Context
            "phase": get_nested_value(record, ["round", "phase"]),
            "mode": get_nested_value(record, ["round", "mode"]),
            "status": status,
            "is_completed": is_completed,
            # Home Team
            "home_team": get_nested_value(record, ["homeTeam", "id"]),
            "home_team_name": get_nested_value(
                record, ["homeTeam", "internationalName"]
            ),
            "home_team_code": home_team_code,
            "home_score": get_nested_value(record, ["score", "regular", "home"]),
            "home_score_total": home_score_total,
            # Away Team
            "away_team": get_nested_value(record, ["awayTeam", "id"]),
            "away_team_name": get_nested_value(
                record, ["awayTeam", "internationalName"]
            ),
            "away_team_code": away_team_code,
            "away_score": get_nested_value(record, ["score", "regular", "away"]),
            "away_score_total": away_score_total,
            # Score/Result
            "score": score_summary,
            "result": result,
        }
        data_reduced.append(record_reduced)

    df = pd.DataFrame(data_reduced)

    return df

In [26]:
def get_filtered_match_data(df: DataFrame, year_min: int = 1996):

    df = df.loc[df["season_year"].apply(pd.to_numeric) >= year_min]
    df = df.loc[df["is_completed"]]
    df = df.loc[df["home_score_total"] >= 0]

    df["date"] = pd.to_datetime(df["date"])
    df["time_diff"] = (max(df["date"]) - df["date"]).dt.days

    df = df.drop(["home_team", "away_team"], axis=1)
    df = df.rename(
        columns={
            "home_score_total": "home_goals",
            "away_score_total": "away_goals",
            "home_team_name": "home_team",
            "away_team_name": "away_team",
        }
    )
    df = df[
        [
            "id",
            "home_team",
            "away_team",
            "home_goals",
            "away_goals",
            "time_diff",
            "date",
        ]
    ]

    return df

In [27]:
def rho_correction(x, y, lambda_x, mu_y, rho):
    result = np.ones_like(x)  # Initialize with ones

    zero_zero_mask = (x == 0) & (y == 0)
    zero_one_mask = (x == 0) & (y == 1)
    one_zero_mask = (x == 1) & (y == 0)
    one_one_mask = (x == 1) & (y == 1)

    result[zero_zero_mask] = 1 - (lambda_x[zero_zero_mask] * mu_y[zero_zero_mask] * rho)
    result[zero_one_mask] = 1 + (lambda_x[zero_one_mask] * rho)
    result[one_zero_mask] = 1 + (mu_y[one_zero_mask] * rho)
    result[one_one_mask] = 1 - rho

    # Avoid log(0) and log(negative)
    result = np.maximum(result, 1e-10)
    return result

In [28]:
def solve_parameters(
    dataset,
    init_vals=None,
    options={"disp": False, "maxiter": 500},
    constraints=[{"type": "eq", "fun": lambda x: sum(x[:20]) - 20}],
    **kwargs
):
    teams = np.sort(dataset["home_team"].unique())
    # check for no weirdness in dataset
    away_teams = np.sort(dataset["away_team"].unique())
    if not np.array_equal(teams, away_teams):
        raise ValueError("Something's not right")
    n_teams = len(teams)
    if init_vals is None:
        # random initialisation of model parameters
        init_vals = np.concatenate(
            (
                np.random.uniform(0, 1, (n_teams)),  # attack strength
                np.random.uniform(0, -1, (n_teams)),  # defence strength
                np.array([0, 1.0]),  # rho (score correction), gamma (home advantage)
            )
        )

    def dc_log_like_vectorized(
        home_goals, away_goals, alpha_x, beta_x, alpha_y, beta_y, rho, gamma
    ):
        lambda_x = np.exp(alpha_x + beta_y + gamma)
        mu_y = np.exp(alpha_y + beta_x)

        rho_corr = rho_correction(home_goals, away_goals, lambda_x, mu_y, rho)
        poisson_pmf_x = np.maximum(poisson.pmf(home_goals, lambda_x), 1e-10)
        poisson_pmf_y = np.maximum(poisson.pmf(away_goals, mu_y), 1e-10)

        return np.log(rho_corr) + np.log(poisson_pmf_x) + np.log(poisson_pmf_y)

    def estimate_parameters(params):
        score_coefs = dict(zip(teams, params[:n_teams]))
        defend_coefs = dict(zip(teams, params[n_teams : (2 * n_teams)]))
        rho, gamma = params[-2:]

        home_goals = dataset["home_goals"].values
        away_goals = dataset["away_goals"].values
        alpha_x = dataset["home_team"].map(score_coefs).values
        beta_x = dataset["home_team"].map(defend_coefs).values
        alpha_y = dataset["away_team"].map(score_coefs).values
        beta_y = dataset["away_team"].map(defend_coefs).values

        log_likes = dc_log_like_vectorized(
            home_goals, away_goals, alpha_x, beta_x, alpha_y, beta_y, rho, gamma
        )
        return -np.sum(log_likes)

    opt_output = minimize(
        estimate_parameters,
        init_vals,
        options=options,
        constraints=constraints,
        **kwargs
    )

    return dict(
        zip(
            ["attack_" + team for team in teams]
            + ["defence_" + team for team in teams]
            + ["rho", "home_adv"],
            opt_output.x,
        )
    )

In [29]:
def solve_parameters_decay(
    dataset,
    xi=0.001,
    init_vals=None,
    options={"disp": True, "maxiter": 500},
    constraints=[{"type": "eq", "fun": lambda x: sum(x[:20]) - 20}],
    **kwargs
):
    teams = np.sort(dataset["home_team"].unique())
    # check for no weirdness in dataset
    away_teams = np.sort(dataset["away_team"].unique())
    if not np.array_equal(teams, away_teams):
        raise ValueError("Something's not right")
    n_teams = len(teams)
    if init_vals is None:
        # random initialisation of model parameters
        init_vals = np.concatenate(
            (
                np.random.uniform(0, 1, (n_teams)),  # attack strength
                np.random.uniform(0, -1, (n_teams)),  # defence strength
                np.array([0, 1.0]),  # rho (score correction), gamma (home advantage)
            )
        )

    def dc_log_like_decay_vectorized(
        x, y, alpha_x, beta_x, alpha_y, beta_y, rho, gamma, t, xi
    ):
        lambda_x = np.exp(alpha_x + beta_y + gamma)
        mu_y = np.exp(alpha_y + beta_x)

        rho_corr = rho_correction(x, y, lambda_x, mu_y, rho)
        poisson_pmf_x = np.maximum(poisson.pmf(x, lambda_x), 1e-10)
        poisson_pmf_y = np.maximum(poisson.pmf(y, mu_y), 1e-10)

        decay_factor = np.exp(-xi * t)
        log_likes = decay_factor * (
            np.log(rho_corr) + np.log(poisson_pmf_x) + np.log(poisson_pmf_y)
        )
        return log_likes

    def estimate_parameters(params):
        score_coefs = dict(zip(teams, params[:n_teams]))
        defend_coefs = dict(zip(teams, params[n_teams : (2 * n_teams)]))
        rho, gamma = params[-2:]

        home_goals = dataset["home_goals"].values
        away_goals = dataset["away_goals"].values
        alpha_x = dataset["home_team"].map(score_coefs).values
        beta_x = dataset["home_team"].map(defend_coefs).values
        alpha_y = dataset["away_team"].map(score_coefs).values
        beta_y = dataset["away_team"].map(defend_coefs).values

        log_likes = dc_log_like_decay_vectorized(
            home_goals, away_goals, alpha_x, beta_x, alpha_y, beta_y, rho, gamma, xi=xi
        )
        return -np.sum(log_likes)

    opt_output = minimize(
        estimate_parameters,
        init_vals,
        options=options,
        constraints=constraints,
        **kwargs
    )

    return dict(
        zip(
            ["attack_" + team for team in teams]
            + ["defence_" + team for team in teams]
            + ["rho", "home_adv"],
            opt_output.x,
        )
    )

In [30]:
def calc_means(param_dict, home_team, away_team):
    return [
        np.exp(
            param_dict["attack_" + home_team]
            + param_dict["defence_" + away_team]
            + param_dict["home_adv"]
        ),
        np.exp(param_dict["defence_" + home_team] + param_dict["attack_" + away_team]),
    ]


def rho_correction_old(x, y, lambda_x, mu_y, rho):
    if x == 0 and y == 0:
        return 1 - (lambda_x * mu_y * rho)
    elif x == 0 and y == 1:
        return 1 + (lambda_x * rho)
    elif x == 1 and y == 0:
        return 1 + (mu_y * rho)
    elif x == 1 and y == 1:
        return 1 - rho
    else:
        return 1.0


def dixon_coles_simulate_match(params_dict, home_team, away_team, max_goals=10):
    team_avgs = calc_means(params_dict, home_team, away_team)
    team_pred = [
        [poisson.pmf(i, team_avg) for i in range(0, max_goals + 1)]
        for team_avg in team_avgs
    ]
    output_matrix = np.outer(np.array(team_pred[0]), np.array(team_pred[1]))
    correction_matrix = np.array(
        [
            [
                rho_correction_old(
                    home_goals,
                    away_goals,
                    team_avgs[0],
                    team_avgs[1],
                    params_dict["rho"],
                )
                for away_goals in range(2)
            ]
            for home_goals in range(2)
        ]
    )
    output_matrix[:2, :2] = output_matrix[:2, :2] * correction_matrix
    return output_matrix

In [31]:
def test_match(
    params: dict,
    home_team: str = "England",
    away_team: str = "Denmark",
    max_goals: int = 6,
):
    test_match = dixon_coles_simulate_match(
        params, home_team, away_team, max_goals=max_goals
    )


    with np.printoptions(precision=3, suppress=True):
        print(test_match)


    predicted_score = np.unravel_index(np.argmax(test_match), test_match.shape)
    print(predicted_score)

In [32]:
def get_1x2_probs(match_score_matrix):
    return dict(
        {
            "H": np.sum(np.tril(match_score_matrix, -1)),
            "A": np.sum(np.triu(match_score_matrix, 1)),
            "D": np.sum(np.diag(match_score_matrix)),
        }
    )


def build_temp_model(dataset, time_diff, xi=0.000, init_params=None):
    test_dataset = dataset[
        (
            (dataset["time_diff"] <= time_diff)
            & (dataset["time_diff"] >= (time_diff - 2))
        )
    ]
    if len(test_dataset) == 0:
        return 0
    train_dataset = dataset[dataset["time_diff"] > time_diff]
    train_dataset["time_diff"] = train_dataset["time_diff"] - time_diff
    params = solve_parameters_decay(train_dataset, xi=xi, init_vals=init_params)
    predictive_score = sum(
        [
            np.log(
                get_1x2_probs(
                    dixon_coles_simulate_match(params, row.HomeTeam, row.AwayTeam)
                )[row.FTR]
            )
            for row in test_dataset.itertuples()
        ]
    )
    return predictive_score

In [33]:
def get_predicted_match_data(
    source_matches: DataFrame, params: dict, sim_date: datetime
):
    future_matches_ = source_matches.drop(
        ["is_completed", "home_score", "away_score", "home_team", "away_team"], axis=1
    )

    future_matches_ = future_matches_.loc[future_matches_["date"] >= sim_date]

    future_matches_ = future_matches_.rename(
        columns={
            "home_score_total": "home_goals",
            "away_score_total": "away_goals",
            "home_team_name": "home_team",
            "away_team_name": "away_team",
        }
    )

    future_matches_["home_goalsActual"] = future_matches_["home_goals"]

    future_matches_["away_goalsActual"] = future_matches_["away_goals"]

    future_matches_ = future_matches_[
        [
            "id",
            "date",
            "home_team",
            "away_team",
            "home_goals",
            "away_goals",
            "home_goalsActual",
            "away_goalsActual",
        ]
    ]

    for index, row in future_matches_.iterrows():

        prediction = dixon_coles_simulate_match(
            params, row["home_team"], row["away_team"], max_goals=6
        )

        predicted_score = np.unravel_index(np.argmax(prediction), prediction.shape)

        future_matches_.loc[index, "home_goals"] = predicted_score[0]

        future_matches_.loc[index, "away_goals"] = predicted_score[1]

        result = (
            "H"
            if predicted_score[0] > predicted_score[1]
            else "A" if predicted_score[0] < predicted_score[1] else "D"
        )

        result_actual = (
            ""
            if pd.isnull(row["home_goalsActual"])
            else (
                "H"
                if row["home_goalsActual"] > row["away_goalsActual"]
                else "A" if row["home_goalsActual"] < row["away_goalsActual"] else "D"
            )
        )

        future_matches_.loc[index, "Result"] = result

        future_matches_.loc[index, "ResultActual"] = result_actual

        performance = 0

        if not pd.isnull(row["home_goalsActual"]):

            if (
                predicted_score[0] == row["home_goalsActual"]
                and predicted_score[1] == row["away_goalsActual"]
            ):

                performance += 5

            elif result == result_actual:

                performance += 2

        future_matches_.loc[index, "ModelPerformance"] = performance

    future_matches_ = future_matches_[
        [
            "id",
            "date",
            "home_team",
            "away_team",
            "home_goals",
            "away_goals",
            "Result",
            "home_goalsActual",
            "away_goalsActual",
            "ResultActual",
            "ModelPerformance",
        ]
    ]

    return future_matches_

In [34]:
def save_predictions_to_csv(predicted_matches: DataFrame, sim_year_max:int, sim_year_min:int):
    future_matches_ = predicted_matches
    order_list = [
        "2036161",
        "2036162",
        "2036163",
        "2036164",
        "2036167",
        "2036165",
        "2036166",
        "2036170",
        "2036169",
        "2036168",
        "2036171",
        "2036172",
        "2036176",
        "2036173",
        "2036174",
        "2036177",
        "2036178",
        "2036175",
        "2036182",
        "2036179",
        "2036180",
        "2036184",
        "2036183",
        "2036181",
        "2036185",
        "2036186",
        "2036187",
        "2036188",
        "2036191",
        "2036192",
        "2036190",
        "2036189",
        "2036194",
        "2036193",
        "2036196",
        "2036195",
    ]


    future_matches_["id"] = pd.Categorical(
        future_matches_["id"], categories=order_list, ordered=True
    )


    future_matches_ = future_matches_.sort_values("id")

    future_matches_.to_csv(
        "predictions\\euros2024_predictions_model"
        + str(sim_year_max)
        + "-"
        + str(sim_year_min)
        + ".csv",
        index=False,
        encoding="utf-8-sig",
        mode="w",
    )

In [35]:
def sum_column_in_csv(file_path, column_name):
    try:
        df = pd.read_csv(file_path)
        summed_value = df[column_name].sum()
        return summed_value
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None


def total_possible_in_csv(file_path, column_name):
    try:
        df = pd.read_csv(file_path)
        total_possible = df[df[column_name] != ""][column_name].count()
        return total_possible
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None


def display_model_performance():
    # Specify the folder path containing CSV files
    folder_path = "predictions"  # Replace with your folder path

    # Initialize a list to store dictionaries of file summaries
    file_summaries = []

    # Iterate over files in the folder
    for file_name in os.listdir(folder_path):
        if file_name.endswith(".csv"):
            file_path = os.path.join(folder_path, file_name)

            summed_value = sum_column_in_csv(file_path, "ModelPerformance")
            total_possible = total_possible_in_csv(file_path, "ResultActual") * 5

            file_summary = {
                "file_name": file_name
            }  # Initialize summary dictionary for the current file

            if summed_value is not None:
                file_summary["summed_value"] = int(summed_value)

            if total_possible is not None:
                file_summary["total_possible"] = total_possible

            if summed_value is not None and total_possible is not None:
                file_summary["percent"] = round(summed_value / total_possible * 100, 1)

            file_summaries.append(file_summary)  # Add summary dictionary to the list

    # Print file summaries
    for summary in file_summaries:
        print(
            f'File: {summary["file_name"]}, Performance: {summary.get("summed_value", "N/A")}, Total: {summary.get("total_possible", "N/A")} ({summary.get("percent", "N/A")}%)'
        )

In [36]:
def get_uefa_config(sim_year_max: int, sim_year_min: int):
    uefa = {
        "competition_id": 3,
        "url_competition": "https://comp.uefa.com/v2/competition-structure",
        "url_matches": "https://match.uefa.com/v5/matches",
        "url_teams": "https://comp.uefa.com/v2/teams",
        "season_years": range(sim_year_max, sim_year_min - 1, -4),
    }
    return uefa

In [37]:
def get_debug_start_time():
    if not globals().get("start_time"):
        global start_time
        start_time = time.time()

    return start_time


def get_debug_elapsed_time(start_time: float):
    elapsed_time = time.time() - start_time
    formatted_time = time.strftime("%H:%M:%S", time.gmtime(elapsed_time))
    return formatted_time


def print_debug_string(string: str):
    print(f"{get_debug_elapsed_time(get_debug_start_time())}    {string}")


def print_debug(string: str, lines_before: int = 0, lines_after: int = 0):
    spacer = "=================================================="
    for _ in itertools.repeat(None, lines_before):
        print_debug_string(spacer)
    print_debug_string(string)
    for _ in itertools.repeat(None, lines_after):
        print_debug_string(spacer)


def end_debug():
    global start_time
    del start_time

In [38]:
sim_date = datetime.fromisoformat("2024-06-14").replace(tzinfo=pytz.UTC)
sim_year_max = 2024
sim_year_mins = [1996, 2000, 2004, 2008, 2012, 2016]

In [39]:
uefa = get_uefa_config(sim_year_max, min(sim_year_mins))
print_debug(f"Downloading data from {uefa['url_matches']}", 1)
match_data = get_all_match_data(uefa)
print_debug("Downloading complete")

00:00:00: Downloading data from https://match.uefa.com/v5/matches


KeyboardInterrupt: 

In [None]:
for sim_year_min in sim_year_mins:
    model = f"(Years {sim_year_max} - {sim_year_min})"
    print_debug(f"Beginning Model Generation {model}", 1)
    model_start_time = time.time()
    match_data_filtered = get_filtered_match_data(match_data, year_min=sim_year_min)
    params = solve_parameters(match_data_filtered)
    model_duration = str(round(time.time() - model_start_time, 1)) + "s"
    print_debug(f"Model Generation Complete ({model_duration})")

    print_debug("Predicting future matches")
    match_data_predicted = get_predicted_match_data(match_data, params, sim_date)
    print_debug("Predictions complete")

    print_debug("Saving predictions to file")
    save_predictions_to_csv(
        match_data_predicted, sim_year_max=sim_year_max, sim_year_min=sim_year_min
    )

6.1s: Beginning Model Generation (Years 2024 - 1996)
43.9s: Model Generation Complete (37.8s)
43.9s: Predicting future matches
43.9s: Predictions complete
43.9s: Saving predictions to file
43.9s: Beginning Model Generation (Years 2024 - 2000)
79.1s: Model Generation Complete (35.2s)
79.1s: Predicting future matches
79.2s: Predictions complete
79.2s: Saving predictions to file
79.2s: Beginning Model Generation (Years 2024 - 2004)
112.5s: Model Generation Complete (33.4s)
112.5s: Predicting future matches
112.6s: Predictions complete
112.6s: Saving predictions to file
112.6s: Beginning Model Generation (Years 2024 - 2008)
143.0s: Model Generation Complete (30.4s)
143.0s: Predicting future matches
143.0s: Predictions complete
143.0s: Saving predictions to file
143.0s: Beginning Model Generation (Years 2024 - 2012)
169.7s: Model Generation Complete (26.6s)
169.7s: Predicting future matches
169.7s: Predictions complete
169.7s: Saving predictions to file
169.7s: Beginning Model Generation (Y

In [None]:
print_debug("Displaying Model Performance", 1, 1)
display_model_performance()

193.8s: Displaying Model Performance
File: euros2024_predictions_model2024-1996.csv, Performance: 29, Total: 75 (38.7%)
File: euros2024_predictions_model2024-2000.csv, Performance: 32, Total: 75 (42.7%)
File: euros2024_predictions_model2024-2004.csv, Performance: 34, Total: 75 (45.3%)
File: euros2024_predictions_model2024-2008.csv, Performance: 26, Total: 75 (34.7%)
File: euros2024_predictions_model2024-2012.csv, Performance: 32, Total: 75 (42.7%)
File: euros2024_predictions_model2024-2016.csv, Performance: 27, Total: 75 (36.0%)


In [None]:
end_debug()