In [None]:
# default_exp __init__

# Core

> I guess we'll do everything in one notebook to start with??

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
import datetime as dt
import pprint

In [None]:
#export
import abc
import collections
import dataclasses
import itertools
import typing
import functools
import json

import numpy as np
import scipy.stats
import scipy.optimize

## Weights

In [None]:
#export

class UniformWeight:
    @staticmethod
    def __call__(row):
        return 1.0


class ExponentialWeight:
    def __init__(self, epsilon, key):
        self.epsilon = epsilon
        self.key = key

    def __call__(self, row):
        return np.exp(self.epsilon*self.key(row))

## Parameter keys

In [None]:
#export

@dataclasses.dataclass(frozen=True)
class DixonColesParameterKey:
    label: typing.Hashable


# Model constants
RHO_KEY = DixonColesParameterKey('Rho')
HFA_KEY = DixonColesParameterKey('Home-field advantage')
AVG_KEY = DixonColesParameterKey('Average rate')

## Data Adapters

In [None]:
#export


class KeyAdapter:
    def __init__(self, home_team, away_team, home_goals, away_goals):
        self._home_team = home_team
        self._away_team = away_team
        self._home_goals = home_goals
        self._away_goals = away_goals

    def _get_in(self, row, item):
        if isinstance(item, list):
            return functools.reduce(lambda d, i: d[i], item, row)
        return row[item]

    def home_team(self, row):
        return self._get_in(row, self._home_team)

    def away_team(self, row):
        return self._get_in(row, self._away_team)

    def home_goals(self, row):
        return self._get_in(row, self._home_goals)

    def away_goals(self, row):
        return self._get_in(row, self._away_goals)      

In [None]:
#export


class AttributeAdapter:
    def __init__(self, home_team, away_team, home_goals, away_goals):
        self._home_team = home_team
        self._away_team = away_team
        self._home_goals = home_goals
        self._away_goals = away_goals

    def home_team(self, row):
        return getattr(row, self._home_team)

    def away_team(self, row):
        return getattr(row, self._away_team)

    def home_goals(self, row):
        return getattr(row, self._home_goals)

    def away_goals(self, row):
        return getattr(row, self._away_goals)

In [None]:
#export


class LumpedAdapter:
    """ Lump teams who appear below `min_matches` times (default 10) into one team """

    def __init__(self, base_adapter, data, min_matches=10, placeholder=DixonColesParameterKey('Other team')):
        self.base_adapter = base_adapter
        self.min_matches = min_matches
        self.placeholder = placeholder

        self.match_count = None
        self.train(data)
        
    def home_team(self, row):
        home_team = self.base_adapter.home_team(row)
        if self.match_count[home_team] <= self.min_matches:
            return self.placeholder
        return home_team

    def away_team(self, row):
        away_team = self.base_adapter.away_team(row)
        if self.match_count[away_team] <= self.min_matches:
            return self.placeholder
        return away_team

    def home_goals(self, row):
        return self.base_adapter.home_goals(row)

    def away_goals(self, row):
        return self.base_adapter.away_goals(row)

    def fit(self, data):
        home_match_count = collections.Counter(self.base_adapter.home_team(row) for row in data)
        away_match_count = collections.Counter(self.base_adapter.away_team(row) for row in data)
        self.match_count = home_match_count + away_match_count

## Modelling

In [None]:
#export


class DixonColes:
    def __init__(self, adapter, blocks, weight=UniformWeight(), params=None):
        self.params = params
        self.adapter = adapter
        self.weight = weight
        self._blocks = blocks
        
    @property
    def blocks(self):
        # Make sure blocks are always in the correct order
        return sorted(self._blocks, key=lambda x: -x.PRIORITY)

    def home_goals(self, row):
        """ Returns home goals scored """
        return self.adapter.home_goals(row)

    def away_goals(self, row):
        """ Returns away goals scored """
        return self.adapter.away_goals(row)

    def parse_params(self, data):
        """ Returns a tuple of (parameter_names, [constraints]) """
        base_params = [RHO_KEY]
        block_params = list(itertools.chain(*[b.param_keys(self.adapter, data) for b in self.blocks]))
        return (
            block_params + base_params,
            list(itertools.chain(*[b.constraints(self.adapter, data) for b in self.blocks]))
        )

    def home_rate(self, params, row):
        """ Returns home goalscoring rate """
        terms = itertools.chain(*[b.home_terms(self.adapter, row) for b in self.blocks])
        return np.exp(sum(params[t] for t in terms))

    def away_rate(self, params, row):
        """ Returns away goalscoring rate """
        terms = itertools.chain(*[b.away_terms(self.adapter, row) for b in self.blocks])
        return np.exp(sum(params[t] for t in terms))
    
    # Core methods

    @staticmethod
    def _assign_params(param_keys, param_values):
        return dict(zip(param_keys, param_values))

    @staticmethod
    def _tau(home_goals, away_goals, home_rate, away_rate, rho):
        tau = np.ones(len(home_goals))
        tau = np.where((home_goals == 0) & (away_goals == 0), 1 - home_rate*away_rate*rho, tau)
        tau = np.where((home_goals == 0) & (away_goals == 1), 1 + home_rate*rho, tau)
        tau = np.where((home_goals == 1) & (away_goals == 0), 1 + away_rate*rho, tau)
        tau = np.where((home_goals == 1) & (away_goals == 1), 1 - rho, tau)
        return tau

    def _log_like(self, home_goals, away_goals, home_rate, away_rate, params):
        rho = params[RHO_KEY]
        return (
            scipy.stats.poisson.logpmf(home_goals, home_rate) +
            scipy.stats.poisson.logpmf(away_goals, away_rate) +
            np.log(self._tau(home_goals, away_goals, home_rate, away_rate, rho))
        )

    def objective_fn(self, data, param_keys, xs):
        params = self._assign_params(param_keys, xs)

        home_goals, away_goals = np.empty(len(data)), np.empty(len(data))
        home_rate, away_rate = np.empty(len(data)), np.empty(len(data))
        weights = np.empty(len(data))

        # NOTE: Should data adapter define the iteration?
        # E.g. dataframe adapter?
        for i, row in enumerate(data):
            home_goals[i] = self.home_goals(row)
            away_goals[i] = self.away_goals(row)
            home_rate[i] = self.home_rate(params, row)
            away_rate[i] = self.away_rate(params, row)
            weights[i] = self.weight(row)

        log_like = self._log_like(home_goals, away_goals, home_rate, away_rate, params)

        pseudo_log_like = log_like * weights
        return -np.sum(pseudo_log_like)

    def fit(self, data, **kwargs):
        param_keys, constraints = self.parse_params(data)

        init_params = (
            np.asarray([self.params.get(p, 0) for p in param_keys])
            if self.params
            else np.zeros(len(param_keys))
        )

        # Optimise!
        estimate = scipy.optimize.minimize(
            lambda xs: self.objective_fn(data, param_keys, xs),
            x0=init_params,
            constraints=constraints,
            **kwargs
        )

        # Parse the estimates into parameter map
        self.params = self._assign_params(param_keys, estimate.x)

        return self

    def predict_one(self, row, up_to=26):
        scorelines = list(itertools.product(range(up_to), repeat=2))

        home_goals = [h for h, a in scorelines]
        away_goals = [a for h, a in scorelines]
        home_rate = self.home_rate(self.params, row)
        away_rate = self.away_rate(self.params, row)

        probs = np.exp(self._log_like(home_goals, away_goals, home_rate, away_rate, self.params))

        # TODO: add a mixin/adapter to customise the indexing in the results dicts?
        # OR just use a custom dataclass for these...
        return [dict(zip(['home_goals', 'away_goals', 'probability'], vals))
                for vals in zip(home_goals, away_goals, probs)]

    def predict(self, data, up_to=26):
        scorelines = [self.predict_one(row, up_to=up_to) for row in data]
        return scorelines

## Model blocks

In [None]:
#exporti


class ModelBlockABC(abc.ABC):
    """
    Base class for model blocks
    """
    PRIORITY = 0
    
    def param_keys(self, adapter, data):
        return []

    def constraints(self, adapter, data):
        return []
    
    def home_terms(self, adapter, data):
        return []
    
    def away_terms(self, adapter, data):
        return []

In [None]:
#export


class BaseRate(ModelBlockABC):
    def __init__(self):
        pass
    
    def param_keys(self, adapter, data):
        return [AVG_KEY]
    
    def home_terms(self, adapter, row):
        return [AVG_KEY]
    
    def away_terms(self, adapter, row):
        return [AVG_KEY]

In [None]:
#export


class HomeAdvantage(ModelBlockABC):
    def __init__(self):
        # TODO: allow HFA on/off depending on the data?
        pass
    
    def param_keys(self, adapter, data):
        return [HFA_KEY]
    
    def home_terms(self, adapter, row):
        return [HFA_KEY]

In [None]:
#export


class TeamStrength(ModelBlockABC):
    # This is a gross hack so that we know that the 
    # team strength parameters come first, and thus can
    # do the constraints (which are positionally indexed)
    PRIORITY = 1
    
    def __init__(self):
        pass
    
    def _teams(self, adapter, data):
        return set(adapter.home_team(r) for r in data) | set(adapter.away_team(r) for r in data)
    
    def offence_key(self, label):
        return DixonColesParameterKey(('Offence', label))
    
    def defence_key(self, label):
        return DixonColesParameterKey(('Defence', label))
    
    def param_keys(self, adapter, data):
        teams = self._teams(adapter, data)

        offence = [self.offence_key(t) for t in teams]
        defence = [self.defence_key(t) for t in teams]

        return offence + defence
    
    def constraints(self, adapter, data):
        n_teams = len(self._teams(adapter, data))
        return [
            # Force team offence parameters to average to 1
            {'fun': lambda x: 1 - np.mean(np.exp(x[0:n_teams])),
             'type': 'eq'},
        ]
    
    def home_terms(self, adapter, row):
        return [
            self.offence_key(adapter.home_team(row)),
            self.defence_key(adapter.away_team(row))
        ]
    
    def away_terms(self, adapter, row):
        return [
            self.offence_key(adapter.away_team(row)),
            self.defence_key(adapter.home_team(row))
        ]

In [None]:
#export


class KeyBlock(ModelBlockABC):
    """
    Generic model block for adding arbitrary model terms to both home and away team
    """
    def __init__(self, key):
        self.key = key
    
    def param_keys(self, adapter, data):
        return list(set(self.key(r) for r in data))
    
    def home_terms(self, adapter, row):
        return [self.key(row)]
    
    def away_terms(self, adapter, row):
        return [self.key(row)]

Example model fit

In [None]:
with open('../data/premier-league-1516.json', 'r') as f:
    pl_1516 = json.load(f)

# Let's parse the dates, too
for match in pl_1516:
    match['date'] = dt.datetime.strptime(match['date'], '%Y-%m-%d')
    
pl_1516[0:3]

[{'date': datetime.datetime(2015, 8, 8, 0, 0),
  'team1': 'Manchester United FC',
  'team2': 'Tottenham Hotspur FC',
  'score': {'ft': [1, 0]}},
 {'date': datetime.datetime(2015, 8, 8, 0, 0),
  'team1': 'AFC Bournemouth',
  'team2': 'Aston Villa FC',
  'score': {'ft': [0, 1]}},
 {'date': datetime.datetime(2015, 8, 8, 0, 0),
  'team1': 'Leicester City FC',
  'team2': 'Sunderland AFC',
  'score': {'ft': [4, 2]}}]

In [None]:
model = DixonColes(
    adapter=KeyAdapter(
        home_team='team1',
        away_team='team2',
        home_goals=['score', 'ft', 0],
        away_goals=['score', 'ft', 1],
    ),
    blocks=[TeamStrength(), BaseRate(), HomeAdvantage()]
)
model.fit(pl_1516)

# All estimates should be valid numbers
assert all(not np.isnan(x) for x in model.params.values())

# Home advantage should be positive
assert 1.0 < np.exp(model.params[HFA_KEY]) < 2.0



In [None]:
param_keys = model.params.keys()
param_key_len = max(len(str(k.label)) for k in param_keys)

for k in param_keys:
    key_str = str(k.label).ljust(param_key_len + 1)
    print(f'{key_str}: {np.exp(model.params[k]):0.2f}')

('Offence', 'Chelsea FC')              : 1.15
('Offence', 'Newcastle United FC')     : 0.87
('Offence', 'Crystal Palace FC')       : 0.76
('Offence', 'Liverpool FC')            : 1.23
('Offence', 'Manchester City FC')      : 1.38
('Offence', 'Everton FC')              : 1.16
('Offence', 'West Ham United FC')      : 1.27
('Offence', 'AFC Bournemouth')         : 0.89
('Offence', 'Swansea City FC')         : 0.82
('Offence', 'Aston Villa FC')          : 0.54
('Offence', 'Southampton FC')          : 1.14
('Offence', 'Stoke City FC')           : 0.81
('Offence', 'Arsenal FC')              : 1.25
('Offence', 'Manchester United FC')    : 0.94
('Offence', 'Sunderland AFC')          : 0.94
('Offence', 'Watford FC')              : 0.78
('Offence', 'Leicester City FC')       : 1.31
('Offence', 'Tottenham Hotspur FC')    : 1.33
('Offence', 'Norwich City FC')         : 0.77
('Offence', 'West Bromwich Albion FC') : 0.66
('Defence', 'Chelsea FC')              : 1.08
('Defence', 'Newcastle United FC')

Making predictions for a single match

In [None]:
scorelines = model.predict_one({
    'team1': 'Manchester City FC',
    'team2': 'Swansea City FC',
})

home_win = sum(s['probability'] for s in scorelines if s['home_goals'] > s['away_goals'])
draw = sum(s['probability'] for s in scorelines if s['home_goals'] == s['away_goals'])
away_win = sum(s['probability'] for s in scorelines if s['home_goals'] < s['away_goals'])

home_win, draw, away_win

(0.6645794578795163, 0.19833741652954348, 0.13708312559093988)

Or for multiple matches

In [None]:
many_scorelines = model.predict([
    {'team1': 'Manchester City FC',
     'team2': 'Swansea City FC'},
    {'team1': 'Manchester City FC',
     'team2': 'West Ham United FC'}
])

What about a model with a different weighting method?

In [None]:
season_end_date = max(match['date'] for match in pl_1516)

model_exp = DixonColes(
    adapter=KeyAdapter(
        home_team='team1',
        away_team='team2',
        home_goals=['score', 'ft', 0],
        away_goals=['score', 'ft', 1],
    ),
    blocks=[TeamStrength(), BaseRate(), HomeAdvantage()],
    weight=ExponentialWeight(
        epsilon=0.0065,  # Value taken from the original paper
        key=lambda x: (season_end_date - x['date']).days
    )
)
model_exp.fit(pl_1516)

  Pk = special.xlogy(k, mu) - gamln(k + 1) - mu


<__main__.DixonColes at 0x127f2b550>

How much does that change the ratings at season-end?

In [None]:
for k in param_keys:
    key_str = str(k.label).ljust(param_key_len + 1)
    model_param = np.exp(model.params[k])
    model_exp_param = np.exp(model_exp.params[k])
    print(f'{key_str}: {model_param:0.2f} -> {model_exp_param:0.2f} ({model_exp_param/model_param:0.2f})')

('Offence', 'Chelsea FC')              : 1.15 -> 1.12 (0.97)
('Offence', 'Newcastle United FC')     : 0.87 -> 0.82 (0.94)
('Offence', 'Crystal Palace FC')       : 0.76 -> 0.85 (1.12)
('Offence', 'Liverpool FC')            : 1.23 -> 1.06 (0.86)
('Offence', 'Manchester City FC')      : 1.38 -> 1.46 (1.06)
('Offence', 'Everton FC')              : 1.16 -> 1.28 (1.10)
('Offence', 'West Ham United FC')      : 1.27 -> 1.27 (1.00)
('Offence', 'AFC Bournemouth')         : 0.89 -> 0.88 (0.98)
('Offence', 'Swansea City FC')         : 0.82 -> 0.79 (0.97)
('Offence', 'Aston Villa FC')          : 0.54 -> 0.59 (1.10)
('Offence', 'Southampton FC')          : 1.14 -> 1.06 (0.93)
('Offence', 'Stoke City FC')           : 0.81 -> 0.76 (0.94)
('Offence', 'Arsenal FC')              : 1.25 -> 1.24 (0.98)
('Offence', 'Manchester United FC')    : 0.94 -> 0.96 (1.02)
('Offence', 'Sunderland AFC')          : 0.94 -> 0.89 (0.95)
('Offence', 'Watford FC')              : 0.78 -> 0.77 (0.99)
('Offence', 'Leicester C

And we should demo model blocks a bit more?