In [1]:
import geopy as geopy
from scipy.special import gamma
import numpy as np
import matplotlib.pyplot as plt
from functools import lru_cache
from math import gamma
import datetime
import pandas as pd

## Translate Stadium to latitude and longitude to calculate the distance

In [17]:
pip install geopy

Note: you may need to restart the kernel to use updated packages.


In [27]:
import requests
from geopy.distance import geodesic

def get_coordinates(stadium_name):
    api_key = "AIzaSyCFVZQiqRj_z6MGgCDjEK6imywq6rQj8yk"
    url = f"https://maps.googleapis.com/maps/api/geocode/json?address={stadium_name}&key={api_key}"

    response = requests.get(url)
    data = response.json()

    if data["status"] == "OK":
        # Extracting the latitude and longitude coordinates
        latitude = data["results"][0]["geometry"]["location"]["lat"]
        longitude = data["results"][0]["geometry"]["location"]["lng"]
        return (latitude, longitude)
    else:
        return None

def calculate_distance(home_stadium_name, away_stadium_name):
    # Getting the coordinates of the home stadium
    home_coordinates = get_coordinates(home_stadium_name)
    if home_coordinates is None:
        print(f"Unable to find coordinates for {home_stadium_name}.")
        return None

    # Getting the coordinates of the away stadium
    away_coordinates = get_coordinates(away_stadium_name)
    if away_coordinates is None:
        print(f"Unable to find coordinates for {away_stadium_name}.")
        return None

    # Calculating the distance using the Haversine formula
    distance = geodesic(home_coordinates, away_coordinates).miles

    return distance

In [45]:
#Test code
home_stadium_name = "Craven Cottage"
away_stadium_name = "Selhurst Park"

distance = calculate_distance(home_stadium_name, away_stadium_name)
if distance is not None:
    print(f"The distance between {home_stadium_name} and {away_stadium_name} is approximately {distance:.2f} miles.")

The distance between Craven Cottage and Selhurst Park is approximately 7.91 miles.


## Now let's import Dataset and preprocessing data

In [58]:
df = pd.read_csv('E0-2022.csv')

df["Date"] = df["Date"].str.split("/").str[::-1].apply("/".join)
df.sort_values(by="Date", inplace=True)
df.reset_index(drop=True, inplace=True)
df["fixture"] = df.index // 10

df["HomeStadium"] = df["HomeTeam"].replace({
    'Crystal Palace' : 'Selhurst Park',
    'Fulham': 'Craven Cottage',
    'Bournemouth': 'Vitality Stadium',
    'Leeds': 'Elland Road',
    'Newcastle': "St. James's Park",
    'Tottenham': 'Tottenham Hotspur Stadium',
    'Everton': 'Goodison Park',
    'Leicester': 'King Power Stadium',
    'Man United': 'Old Trafford',
    'West Ham': 'London Stadium',
    'Brentford': 'Gtech Community Stadium',
    'Wolves': 'Molineux Stadium',
    'Southampton': "St. Mary's Stadium",
    'Arsenal': 'Emirates Stadium',
    'Brighton': 'Amex Stadium',
    'Aston Villa': 'Villa Park Reception',
    'Man City': 'Etihad Stadium',
    "Nott'm Forest": 'The Bridgford Stand',
    'Chelsea': 'Stamford Bridge',
    'Liverpool': 'Anfield'
})

df["AwayStadium"] = df["AwayTeam"].replace({
    'Crystal Palace' : 'Selhurst Park',
    'Fulham': 'Craven Cottage',
    'Bournemouth': 'Vitality Stadium',
    'Leeds': 'Elland Road',
    'Newcastle': "St. James's Park",
    'Tottenham': 'Tottenham Hotspur Stadium',
    'Everton': 'Goodison Park',
    'Leicester': 'King Power Stadium',
    'Man United': 'Old Trafford',
    'West Ham': 'London Stadium',
    'Brentford': 'Gtech Community Stadium',
    'Wolves': 'Molineux Stadium',
    'Southampton': "St. Mary's Stadium",
    'Arsenal': 'Emirates Stadium',
    'Brighton': 'Amex Stadium',
    'Aston Villa': 'Villa Park Reception',
    'Man City': 'Etihad Stadium',
    "Nott'm Forest": 'The Bridgford Stand',
    'Chelsea': 'Stamford Bridge',
    'Liverpool': 'Anfield'
})

df["Distance"] = df.apply(lambda row: calculate_distance(row["HomeStadium"], row["AwayStadium"]), axis=1)

df = df[["fixture", "HomeTeam","HomeStadium","AwayTeam","AwayStadium", "FTHG", "FTAG","Distance"]].copy()


In [59]:
print(df)

     fixture        HomeTeam              HomeStadium       AwayTeam  \
0          0  Crystal Palace            Selhurst Park        Arsenal   
1          0          Fulham           Craven Cottage      Liverpool   
2          0     Bournemouth         Vitality Stadium    Aston Villa   
3          0           Leeds              Elland Road         Wolves   
4          0       Newcastle         St. James's Park  Nott'm Forest   
..       ...             ...                      ...            ...   
375       37       Brentford  Gtech Community Stadium       Man City   
376       37     Aston Villa     Villa Park Reception       Brighton   
377       37         Arsenal         Emirates Stadium         Wolves   
378       37         Chelsea          Stamford Bridge      Newcastle   
379       37     Southampton       St. Mary's Stadium      Liverpool   

              AwayStadium  FTHG  FTAG    Distance  
0        Emirates Stadium     0     2   10.890755  
1                 Anfield     2

## Get teams and matchs

In [3]:
import pandas as pd
from typing import List


def get_all_matches_by_team(df: pd.DataFrame, team: str) -> pd.DataFrame:
    df = df.copy()

    df = df[(df["HomeTeam"] + df["AwayTeam"]).apply(lambda x: team in x)].copy()
    return df


def get_all_teams(df: pd.DataFrame) -> List:
    df = df.copy()

    all_teams = list(set(list(df["HomeTeam"].unique()) + list(df["AwayTeam"].unique())))

    return all_teams

### Prepare the parameters

In [4]:
config = {
'KAPPA': -0.4561,
'C1': 1.050,
'C2': 0.9831,
'GAMMA': 0.2948,
'XI': 0.002,
'GOAL_RANGE': range(5)
}

# Import a Bivariate Weibull Model Step by Step

In [5]:
def alphas(x: int, j: int, c: float) -> float:  #
    if x == 0:
        return gamma(c * j + 1) / gamma(j + 1)
    elif j < x:
        raise ValueError(f"{x, j}")
    else:
        return sum(
            [
                alphas(x - 1, m, c) * gamma(c * j - c * m + 1) / gamma(j - m + 1)
                for m in range(x - 1, j)
            ]
        )

### Calculate the first layer

In [6]:
def _weibull(x: int, c: float, l: float, t: float = 1, j: int = 0) -> float:
    return (-1) ** (x + j) * (l * t**c) ** j * alphas(x, j, c) / gamma(c * j + 1)

### Sum together

In [7]:
def weibull(x: int, c: float, l: float, t: float = 1) -> float:
    return sum(
        [_weibull(x, c, l, t, j) for j in range(x, x + 50)]
    )  # 50 terms is an approximation of inf, but apparently good enough

In [8]:
def cumulative_weibull(x: int, c: float, l: float, t: float = 1) -> float:
    return sum([weibull(i, c, l, t) for i in range(0, x + 1)])

### Using Frank Copula to generate a bivariate model

In [9]:
def frank_copula(u: float, v: float, k: float = config['KAPPA']) -> float:
    return (
        -1
        / k
        * np.log(1 + (np.exp(-k * u) - 1) * (np.exp(-k * v) - 1) / (np.exp(-k) - 1))
    )

### Calculate The log-likelihood

In [10]:
def weibull_likelihood(
    y1: int, y2: int, l1: float, l2: float, c1=config['C1'], c2=config['C2']
) -> float:
    x1 = cumulative_weibull(y1, c1, l1)
    x2 = cumulative_weibull(y2, c2, l2)
    x3 = cumulative_weibull(y1 - 1, c1, l1)
    x4 = cumulative_weibull(y2 - 1, c2, l2)

    return (
        frank_copula(x1, x2)
        - frank_copula(x1, x4)
        - frank_copula(x3, x2)
        + frank_copula(x3, x4)
    )


## WeibullCountFitter

In [19]:
class WeibullCountFitter:
    def __init__(self):
        self.gamma = config['GAMMA']

    def _get_all_teams(self, df: pd.DataFrame):
        self.teams = get_all_teams(df)
        return self

    def _calculate_lambda(self, a: float, b: float, home: bool):
        _lambda = a + b
        if home:
            _lambda += self.gamma
        return np.exp(_lambda)

    @property
    def team_mapping(self):
        return {team: n for n, team in enumerate(self.teams)}

    @property
    def inverse_team_mapping(self):
        return {v: k for k, v in self.team_mapping.items()}

    @property
    def C_team(self):
        X = pd.DataFrame(self.C, columns=["alpha", "beta"])
        X.reset_index(inplace=True)
        X.rename(columns={"index": "team"}, inplace=True)
        X["team"] = X["team"].map(self.inverse_team_mapping)
        return X

    def _initialise_coefficients(self):
        self.C = 0.1 * (np.random.normal(0, 1, (len(self.teams), 2)))

    def _get_likelihood(self, df: pd.DataFrame, C: np.array) -> float:
        log_l = 0

        max_fixture = df["fixture"].max()

        import time

        for i, row in df.iterrows():
            home = row["HomeTeam"]
            away = row["AwayTeam"]
            i, j = self.team_mapping[home], self.team_mapping[away]
            l1 = self._calculate_lambda(C[i][0], C[j][1], home=True)
            l2 = self._calculate_lambda(C[j][0], C[i][1], home=False)
            #   print(l1, l2, row["FTHG"], row["FTAG"])
            #   print(
            #       np.exp(-config.XI * (max_fixture - row["fixture"])),
            #       weibull_likelihood(y1=row["FTHG"], y2=row["FTAG"], l1=l1, l2=l2),
            #   )
            #   print(
            #       np.log(
            #           np.exp(-config.XI * (max_fixture - row["fixture"]))
            #           * weibull_likelihood(y1=row["FTHG"], y2=row["FTAG"], l1=l1, l2=l2)
            #       )
            #   )
            log_l += np.log(
                np.exp(-config['XI'] * (max_fixture - row["fixture"]))
                * weibull_likelihood(y1=row["FTHG"], y2=row["FTAG"], l1=l1, l2=l2)
            )

        return log_l

    def _get_grads(self, df: pd.DataFrame, C: np.array):
        C = C.copy()
        df = df.copy()
        eps = 1e-6

        grads = np.zeros_like(C)

        for i in range(C.shape[0]):
            for j in range(C.shape[1]):
                C_plus = C.copy()
                C_minus = C.copy()
                C_plus[i, j] += eps
                C_minus[i, j] -= eps
                grads[i, j] = (
                    self._get_likelihood(df, C_plus) - self._get_likelihood(df, C_minus)
                ) / (2 * eps)

        return grads

    def fit(
        self,
        train: pd.DataFrame,
        n_iter: int = 100,
        learning_rate: float = 0.001,
        verbose: bool = True,
        test: pd.DataFrame = None,
    ):
        train = train.copy()
        self._get_all_teams(train)
        self._initialise_coefficients()

        train_log_l = self._get_likelihood(train, self.C)
        if test is not None:
            test_log_l = self._get_likelihood(test, self.C)

        if verbose:
            print(f"Starting train likelihood: {train_log_l}")
            if test is not None:
                print(f"Starting test likelihood: {test_log_l}")

        self.train_likelihoods = [train_log_l]
        if test is not None:
            self.test_likelihoods = [test_log_l]

        C = self.C.copy()
        for n in range(n_iter):
            # C += learning_rate * self._get_grads(train, C)
            C += learning_rate * self._get_grads(train, C)
            self.C = C.copy()
            train_log_l = self._get_likelihood(train, self.C)
            self.train_likelihoods.append(train_log_l)
            if test is not None:
                test_log_l = self._get_likelihood(test, self.C)
                self.test_likelihoods.append(test_log_l)

            if verbose:
                print(f"Step {n + 1}, current likelihood: {train_log_l}")
                if test is not None:
                    print(f"Step {n + 1}, current test likelihood: {test_log_l}")

            if self.train_likelihoods[-1] - self.train_likelihoods[-2] < 10e-4:
                print("Early stop. Algorithm has converged")
                break



    def predict_all_scores_single_match(self, home: str, away: str):
        results = []

        C = self.C_team.set_index("team").to_dict(orient="index")
        for i in config['GOAL_RANGE']:
            for j in config['GOAL_RANGE']:
                l1 = self._calculate_lambda(
                    C[home]["alpha"], C[away]["beta"], home=True
                )
                l2 = self._calculate_lambda(
                    C[home]["beta"], C[away]["alpha"], home=False
                )
                p = weibull_likelihood(i, j, l1, l2)
                results.append([i, j, p])

        X = pd.DataFrame(results, columns=["H", "A", "p"])

        return X

    def predict_under_over_single_match(self, home: str, away: str):
        X = self.predict_all_scores_single_match(home, away)

        X["over"] = X["H"] + X["A"] > 2.5

        under = X[~X["over"]]["p"].sum()
        over = 1 - under

        return (under, over)

    def predict_under_over(
        self,
        df: pd.DataFrame,
    ) -> pd.DataFrame:
        df = df.copy()

        res = df.apply(
            lambda row: self.predict_under_over_single_match(
                row["HomeTeam"],
                row["AwayTeam"],
            ),
            axis=1,
        ).apply(pd.Series)

        res.rename(columns={0: "under", 1: "over"}, inplace=True)

        return res

    def predict_1x2_single_match(self, home: str, away: str):
        X = self.predict_all_scores_single_match(home, away)

        X["pred"] = "0"
        X.loc[X["H"] > X["A"], "pred"] = "1"
        X.loc[X["H"] < X["A"], "pred"] = "2"
        X.loc[X["H"] == X["A"], "pred"] = "X"

        scores = X.groupby("pred")["p"].sum()
        scores /= (
            scores.sum()
        )  # Renormalising to 1 as we are not calculating p for all possible scores

        return scores.T

    def predict_1x2(
        self,
        df: pd.DataFrame,
    ) -> pd.DataFrame:
        df = df.copy()

        res = df.apply(
            lambda row: self.predict_1x2_single_match(
                row["HomeTeam"],
                row["AwayTeam"],
            ),
            axis=1,
        )

        return res

    def get_results(self):
        return pd.DataFrame(self.C).T

In [20]:
train = df[df["fixture"] < df["fixture"].max() - 4].copy()
test = df[df["fixture"] >= df["fixture"].max() - 4].copy()

In [21]:
w = WeibullCountFitter()

In [22]:
w.fit(train, learning_rate=0.01, test=test)

KeyboardInterrupt: 

In [None]:
# Likelihood per observation
plt.plot(-np.array(w.train_likelihoods)/len(train))
plt.plot(-np.array(w.test_likelihoods)/len(test))