In [1]:
### src/pipeline/__init__.py


In [2]:
### src/pipeline/categories.py
from pandas.api.types import is_categorical_dtype
import pandas as pd
from collections import Counter
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import KBinsDiscretizer


class Categorify(BaseEstimator, TransformerMixin):
    def __init__(self, top_n: int = None,
                 add_nan: bool = False,
                 dtype=np.int64):
        self.top_n = top_n
        self.add_nan = add_nan
        self.dtype = dtype

    def fit(self, X: pd.Series):
        if not isinstance(X, pd.Series):
            X = pd.Series(X)
        has_nan = X.isna().sum() > 0
        self.add_nan = (self.add_nan and has_nan)
        categories = (list(X.categories)
                      if is_categorical_dtype(X)
                      else sorted(list(f for f in X.drop_duplicates()
                                       if not pd.isna(f))))
        if self.top_n is not None:
            counter = Counter(X)
            top_categories = [cat for cat, _ in
                              counter.most_common(self.top_n)]
            categories = [cat for cat in categories
                          if cat in top_categories]
        self.categories = categories
        return self

    def transform(self, X: pd.Series) -> pd.DataFrame:
        Xcat = pd.Categorical(X, categories=self.categories, ordered=True)
        Xcodes = Xcat.codes.astype(self.dtype)
        if self.add_nan:
            Xcodes += 1
        assert Xcodes.min() == 0, f'the min is {Xcodes.min()}'
        return pd.DataFrame({X.name: Xcodes})


class PdKBinsDiscretizer(KBinsDiscretizer):
    def __init__(self, n_bins=5,
                 encode: str = 'ordinal',
                 strategy: str ='quantile'):
        super().__init__(n_bins=n_bins,
                 encode=encode,
                 strategy=strategy)
        
    def transform(self, X: pd.DataFrame):
        features = list(X.columns)
        outputX = super().transform(X).astype(np.int64)
        return pd.DataFrame(outputX, columns=features)

In [3]:
### src/pipeline/continuous.py
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
from typing import List
from pandas.api.types import is_float_dtype


class MedianFillNaN(BaseEstimator, TransformerMixin):
    def __init__(self, features: List[str] = None):
        self.features = features

    def fit(self, df: pd.DataFrame, y=None):
        if self.features is not None:
            df = df.loc[:, self.features]
        self.medians = df.median().to_dict()
        return self
        
    def transform(self, X: pd.DataFrame):
        return X.fillna(self.medians)



class FilterContinuousFeatures(BaseEstimator, TransformerMixin):

    def __init__(self, ignore_features: List[str]):
        self.ignore_features = ignore_features

    def fit(self, df: pd.DataFrame, y=None):
        self._features = [name for name, values in df.items()
                          if (is_float_dtype(values) and
                              name not in self.ignore_features)]
        return self

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:        
        return df.loc[:, self._features]



class PdStandardNorm(BaseEstimator, TransformerMixin):
    eps = 1e-15

    def fit(self, X: pd.DataFrame , y=None):
        self.means = X.mean()
        self.stds = X.std() + self.eps
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        output = (X - self.means) / self.stds
        return output.astype(np.float32)


class PdScaleNorm(BaseEstimator, TransformerMixin):
    eps = 1e-15

    def fit(self, X: pd.DataFrame, y=None):
        self.min = X.min()
        self.max = X.max()
        self.difference = self.max - self.min + self.eps
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        output = (X - self.min) / self.difference
        return output.astype(np.float16)



In [4]:
### src/pipeline/core.py
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import FunctionTransformer
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
from typing import Union, List, Tuple
from functools import reduce


def pandas_hstack(Xs):
    return pd.concat([X.reset_index(drop=True) for X in Xs], axis=1)


def reduce_fn(left: pd.DataFrame,
              right: pd.DataFrame):
    common_cols = [f for f in left.columns
                   if f in right.columns]
    if len(common_cols) > 0:
        return left.merge(right, on=common_cols, how='left')
    return pd.concat([left, right], axis=1)



class CoreTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X: pd.DataFrame, y=np.ndarray):
        return self
    
    def transform(self, X: pd.DataFrame):
        return X


class PdColumnTransformer(ColumnTransformer):
    def _hstack(self, Xs):
        return reduce(reduce_fn, Xs)


class PdFeatureUnion(FeatureUnion):
    def _hstack(self, Xs):
        return reduce(reduce_fn, Xs)


def unpack_json(json_str):
    return np.nan if pd.isna(json_str) else pd.read_json(json_str)


def unpack_column(series: pd.DataFrame) -> pd.DataFrame:
    def _unpack_row(index, row):
        out_df = unpack_json(row)
        out_df = out_df.assign(date=index)
        return out_df
    
    return pd.concat([_unpack_row(index, row)
                      for index, row in series.iteritems()
                      if pd.notna(row)], ignore_index=True)


def make_column_tmf(*transformers: List[Tuple[TransformerMixin, Union[List[str], str]]],
                    **kwargs):
    names = [type(tmf).__name__ for (tmf, _) in transformers]
    return PdColumnTransformer([(name, tmf, features)
                                 for name, (tmf, features) in zip(names, transformers)], **kwargs)


def make_unpack_tmf(column_name: str):
    return make_column_tmf((FunctionTransformer(unpack_column), column_name))


def forward_fill(df: pd.DataFrame, features: List[str],
                 on='playerId',
                 suffix: str = None,
                 limit: int =None):
    ffilled_df = df.groupby(on)[features].ffill(limit=limit)
    output_features = (features if suffix is None else
                       list(map(lambda f: suffix + f, features)))
    outputX = df.copy()
    outputX.loc[:, output_features] = ffilled_df.to_numpy()
    return outputX


def gen_hardcoded_features(df: pd.DataFrame):
    # some feature eng for the dates
    df['pstatsTime'] = (df['date'] - pd.to_datetime(df['pstatsDate'])).dt.total_seconds()
    df['playerAge'] = (df['date'] - df['DOB']).dt.total_seconds()
    df['playerTSinceDebut'] = (df['date'] - df['mlbDebutDate']).dt.total_seconds()
    df['playerDebutAge'] = (df['mlbDebutDate'] - df['DOB']).dt.total_seconds()
    df['rostersTime'] = (df['date'] - pd.to_datetime(df['rosterDate'])).dt.total_seconds()
    df[['rostersTime', 'pstatsTime']] /= 60 * 60 * 24
    # normalize
    df[['playerAge', 'playerTSinceDebut', 'playerDebutAge']] /= 60 * 60 * 24 * 365
    return df

def fillna(df: pd.DataFrame, fill_value=0):
    return df.fillna(fill_value)


class FilterFeatures(BaseEstimator, TransformerMixin):
    def __init__(self, suffixes: List[str]):
        self.suffixes = suffixes
    
    def fit(self, X: pd.DataFrame, y=None):
        self._features = list(filter(lambda f: any([f.endswith(suffix) for suffix in self.suffixes]),
                                     X.columns))
        return self
    
    def transform(self, X: pd.DataFrame):
        return X.loc[:, self._features]

In [5]:
### src/pipeline/players.py
import numpy as np
import pandas as pd


def player_preprocessing(players: pd.DataFrame) -> pd.DataFrame:    
    # weight to kg and height to cm
    players['weight'] *= 0.453592
    players['height'] = players['heightInches'] * 2.54 / 100
    players['playerBMI'] = players['weight'] / np.power(players['height'], 2)
    
    # drop unnecessary features
    to_drop = ['birthCity', 'heightInches',
               'birthStateProvince',
               'primaryPositionCode',
               'playerName',
               'playerForTestSetAndFuturePreds']
    players.drop(to_drop, inplace=True, axis=1)
    return players


def join_players_info(df: pd.DataFrame, path_to_players_csv: str):
    # read players csv
    raw_players = pd.read_csv(path_to_players_csv, parse_dates=['DOB', 'mlbDebutDate'])
    # process player data
    players = player_preprocessing(raw_players)
    # merge
    return df.merge(players, how='left', on=['playerId'])


In [6]:
### src/pipeline/season.py
from typing import List
import pandas as pd
import numpy as np


def date_preprocessing(df: pd.DataFrame,
                       path_to_season: str,
                       dt_col: str = 'date',
                       date_attr: List[str] = ['year', 'month']):
    assert 'year' in date_attr, \
            'year attr must be on the date_attr list'
    date_cols = ['seasonStartDate', 'seasonEndDate', 'preSeasonStartDate',
                 'preSeasonEndDate', 'regularSeasonStartDate', 'regularSeasonEndDate',
                 'lastDate1stHalf', 'allStarDate', 'firstDate2ndHalf',
                 'postSeasonStartDate', 'postSeasonEndDate']
    seasons = pd.read_csv(path_to_season, parse_dates=date_cols)
    
    # adding date attr
    for attr in date_attr:
        attr = attr.lower()
        df[attr] = getattr(df[dt_col].dt, attr)

    season_df = pd.merge(df, seasons, left_on='year', right_on='seasonId')

    season_df['inSeason'] = (season_df['date'].between(
                                    season_df['regularSeasonStartDate'],
                                    season_df['postSeasonEndDate'],
                                    inclusive = True
                                    )
                                  )

    season_df['seasonPart'] = np.select(
      [
        season_df['date'] < season_df['preSeasonStartDate'], 
        season_df['date'] < season_df['regularSeasonStartDate'],
        season_df['date'] <= season_df['lastDate1stHalf'],
        season_df['date'] < season_df['firstDate2ndHalf'],
        season_df['date'] <= season_df['regularSeasonEndDate'],
        season_df['date'] < season_df['postSeasonStartDate'],
        season_df['date'] <= season_df['postSeasonEndDate'],
        season_df['date'] > season_df['postSeasonEndDate']
      ], 
      [
        'Offseason',
        'Preseason',
        'Reg Season 1st Half',
        'All-Star Break',
        'Reg Season 2nd Half',
        'Between Reg and Postseason',
        'Postseason',
        'Offseason'
      ], 
      default = np.nan
      )

    season_df.drop(seasons.columns, axis=1, inplace=True)
    
    return season_df
  

def join_season_info(df: pd.DataFrame,
                    path_to_season_csv: str,
                    date_attr: List[str] = ['year', 'weekday'],
                    ):
    # get unique dates
    dates = df[['date']].drop_duplicates()
    # add season info
    dates = date_preprocessing(dates, path_to_season_csv,
                   date_attr=date_attr)
    # merge
    return df.merge(dates, how='left', on=['date'])

In [7]:
### src/pipeline/stats.py
from sklearn.base import BaseEstimator, TransformerMixin
from typing import List
import pandas as pd
import numpy as np


class StatisticTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X: pd.DataFrame, y=None):
        self._features = list(X.columns.drop(self.index_cols))
        return self
    
    def transform(self, X: pd.DataFrame):
        X.set_index('date', inplace=True)
        output = X.groupby(self.ids)[self._features].apply(self.compute_features)
        output.reset_index(inplace=True)
        
        X.reset_index(inplace=True)
        
        assert np.all(X[self.index_cols] == output[self.index_cols]), \
               'the ids do not match!'
        if self.drop_index:
            output.drop(self.index_cols, axis=1, inplace=True)
        return output


class StatisticGen(StatisticTransformer):
    def __init__(self, stats: List[str] = ['mean'],
                 windows: List[int] = [10],
                 dt_col: str = 'date',
                 ids: List[str] = ['playerId'],
                 drop_index: bool = True):        
        self.stats = stats
        self.windows = windows
        self.dt_col = dt_col
        self.ids = ids
        self.index_cols = self.ids + [self.dt_col]
        self.drop_index = drop_index
        
    def _compute_features(self, df: pd.DataFrame, window: int) -> pd.DataFrame:
        stats_df = df.rolling(window, min_periods=1).agg(self.stats)
        stats_df.columns = ['__'.join(list(f) + [f'{window}d'])
                            for f in stats_df]
        return stats_df.astype(np.float32)
    
    def compute_features(self, df: pd.DataFrame):
        return pd.concat([self._compute_features(df, window)
                               for window in self.windows], axis=1)
    


class LagGen(StatisticTransformer):
    def __init__(self, lags: List[int] = [10],
                 dt_col: str = 'date',
                 ids: List[str] = ['playerId'],
                 drop_index: bool = True):
        self.lags = lags
        self.dt_col = dt_col
        self.ids = ids
        self.index_cols = self.ids + [self.dt_col]
        self.drop_index = drop_index
        
    def _compute_features(self, df: pd.DataFrame, lag: int) -> pd.DataFrame:
        lagdf = df.shift(lag)
        lagdf.columns = [f'{f}__{lag}lag' for f in lagdf.columns]
        return lagdf.astype(np.float32)
        
    def compute_features(self, df: pd.DataFrame) -> pd.DataFrame:
        return pd.concat([self._compute_features(df, lag)
                          for lag in self.lags], axis=1) 


class FeaturesTable(BaseEstimator, TransformerMixin):
    def __init__(self, table: pd.DataFrame,
                 lags: List[int],
                 on: List[str] = ['playerId', 'date'],
                 date_col: str = 'date',
                 clip_max: bool = True):
        self.on = on
        self.lags = lags
        self.table = table
        self.date_col = date_col
        self.clip_max = clip_max
        self.max_date = table[self.date_col].max().to_numpy()

    
    def fit(self, X: pd.DataFrame, y=None):
        return self

    def _merge_table(self, X: pd.DataFrame,
                     lag: int) -> pd.DataFrame:
        Xon = X.loc[:, self.on].copy()
        Xon[self.date_col] = X[self.date_col] - pd.to_timedelta(lag, unit='d')
        if self.clip_max:
            Xon[self.date_col] = np.minimum(Xon[self.date_col], self.max_date)
        outputX = Xon.merge(self.table, on=self.on,
                            how='left').drop(self.on, axis=1)
        assert len(outputX) == len(X), \
               f'the len {len(X)} of the input do not match the output len {len(outputX)}'
        outputX.columns += f'__{lag}lag'
        return outputX

    def transform(self, X: pd.DataFrame):
        return pd.concat([self._merge_table(X, lag)
                          for lag in self.lags], axis=1)
    
    

In [8]:
### src/pipeline/target.py
import pandas as pd


def target_preprocessing(target: pd.DataFrame) -> pd.DataFrame:
    target.drop('engagementMetricsDate', axis=1, inplace=True)
    target.sort_values(by=['playerId', 'date'], inplace=True)
    target.reset_index(drop=True, inplace=True)
    return target

In [9]:
### src/data/__init__.py


In [10]:
### src/data/dataset.py
from torch.utils.data import DataLoader
import torch
from typing import Dict, List
import numpy as np
import pandas as pd
import gc


def get_timeseries_index(values: np.ndarray,
                         idx: int,
                         bptt: int):

    start_idx = max(0, idx - bptt)
    timeft = values[start_idx: idx + 1]
    
    if len(timeft) <= bptt:
        missing_steps = bptt - len(timeft) + 1
        timeft = np.pad(timeft, pad_width=((missing_steps, 0), (0, 0)))

    return timeft



class PlayerDataset:
    def __init__(self, features: np.ndarray,
                 categories: np.ndarray,
                 target: np.ndarray = None,
                 bptt: int = 10,
                 weight: np.ndarray = None,
                 device: torch.device = torch.device('cpu')):
        self.features = features
        self.target = target
        self.bptt = bptt - 1
        self.device = device
        self.categories = categories
        self.weight = weight
        assert len(self) > self.bptt, f'lenght: {len(self)}, bptts: {self.bptt}'

    def __len__(self):
        return len(self.features)

    def _to_torch(self, array):
        return torch.from_numpy(np.asarray(array)).to(device=self.device)
    
    def to_device(self, batch_item: Dict[str, np.ndarray]):
        return {name: self._to_torch(values)
                for name, values in batch_item.items()}
    
    def __getitem__(self, idx: int):        
        features = (get_timeseries_index(self.features, idx=idx,
                                         bptt=self.bptt)
                    if self.bptt > 0 else self.features[idx])
        cats = (get_timeseries_index(self.categories, idx=idx,
                                         bptt=self.bptt)
                    if self.bptt > 0 else self.categories[idx])

        batch_item = {'features': features.astype(np.float32),
                      'categories': cats.astype(np.int64)}
        if self.target is not None:
            batch_item['target'] = self.target[idx]
        if self.weight is not None:
            batch_item['weight'] = self.weight[idx]
    
        return self.to_device(batch_item)

    @classmethod
    def from_df(cls, df: pd.DataFrame,
                features: List[str],
                categories: List[str],
                target: List[str]=None,
                weight: List[str]=None,
                bptt: int = 30,
                device: torch.device = torch.device('cpu')):

        if target is not None:
            target = df.loc[:, target].to_numpy()
        if weight is not None:
            weight = df.loc[:, weight].to_numpy()
    
        return cls(features=df.loc[:, features].to_numpy(),
                categories=df.loc[:, categories].to_numpy(),
                target=target,
                weight=weight,
                bptt=bptt,
                device=device)

    def __del__(self):
        del self.features, self.target, self.categories
        gc.collect()

In [11]:
### src/data/split.py
from datetime import datetime
from typing import Union
import pandas as pd

class SplitData:
    """Helper class to split the data for time series"""
    def __init__(self, date: Union[datetime, str],
                 train_days: int = None,
                 test_days: int = 31,
                 gap: int = 0,
                 dt_col: str = 'date'):
        if isinstance(date, str):
            date = pd.to_datetime(date)
        # test range
        self.start_test_dt = date
        self.end_test_dt =  date + pd.to_timedelta(test_days, unit='d')
        # train range
        self.end_train_dt = date - pd.to_timedelta(gap, unit='d')
        self.start_train_dt = (self.end_train_dt - pd.to_timedelta(train_days, unit='d')
                               if train_days is not None else
                               None)
        self.dt_col = dt_col
    
    @staticmethod
    def _get_index(dates: pd.Series,
                start: datetime,
                end: datetime):
        index = (dates >= start) & (dates < end)
        return index
    
    def train_idx(self, df: pd.DataFrame) -> pd.DataFrame:
        start_train_dt = (df[self.dt_col].min()
                          if self.start_train_dt is None
                          else self.start_train_dt)
        index = self._get_index(df[self.dt_col], start_train_dt,
                                self.end_train_dt)
        return index

    def valid_idx(self, df: pd.DataFrame) -> pd.DataFrame:
        index = self._get_index(df[self.dt_col], self.start_test_dt,
                                self.end_test_dt)
        return index
    
    def train(self, df: pd.DataFrame):
        return self.filter(df, self.train_idx(df))
    
    def valid(self, df: pd.DataFrame):
        return self.filter(df, self.valid_idx(df))
    
    def filter(self, df: pd.DataFrame, index):
        return df.loc[index, :].reset_index(drop=True)
    
    def __repr__(self):
        return (f'test_range=({self.start_test_dt}, {self.end_test_dt}), '
                f'train_range=({self.start_train_dt}, {self.end_train_dt})')

In [12]:
### src/data/util.py
import pandas as pd
from typing import List
import numpy as np


def filter_by_id(df: pd.DataFrame, ids: List[int]) -> pd.DataFrame:
    index = df['playerId'].isin(ids)
    return df.loc[index, :].reset_index(drop=True)
    

def sample_by_id(df: pd.DataFrame, n: int = 1) -> pd.DataFrame:
    unique_id = df['playerId'].unique()
    choosen_id = np.random.choice(unique_id, n, replace=False)
    return  filter_by_id(df, choosen_id)


def filter_by_date(df: pd.DataFrame,
                   date: str,
                   dt_col: str = 'date') -> pd.DataFrame:
    index = df.loc[:, dt_col] >= date
    return df.loc[index, :].reset_index(drop=True)

In [13]:
### src/data/ingest_data/__init__.py
import pandas as pd
from pathlib import Path
from typing import List, Dict
import os
import numpy as np


feature_fields = {'games': 'games', 
                   'playerBoxScores': 'pstats',
                   'rosters': 'rosters',
                   'playerTwitterFollowers': 'ptw_fl',
                   'teamTwitterFollowers': 'team_tw_fl', 
                   'awards': 'awards',
                   'teamBoxScores': 'teams',
                   'standings': 'standings'}


def ingest_target(df: pd.DataFrame) -> pd.DataFrame:
    print('preprocessing target')
    fields = unpack_dataframe(df, fields={'nextDayPlayerEngagement': 'target'})
    target = fields['target']
    return target_preprocessing(target)


def ingest_stats_features(pstats: pd.DataFrame,
                          games: pd.DataFrame,
                          teams: pd.DataFrame):
    teams = ingest_team_stats(teams)
    games = ingest_games_stats(games)
    pstats = ingest_player_stats(pstats)
    pstats = join_games_stats_to_pstats(pstats, games)
    pstats = join_team_stats_to_pstats(pstats, teams)
    pstats.drop(['gamePk', 'teamId'], axis=1, inplace=True)
    return pstats


def ingest_rosters(rosters: pd.DataFrame):
    # rename roster date
    rosters.rename(columns={'gameDate': 'rosterDate'}, inplace=True)
    # drop statusCode
    rosters.drop(['statusCode'], axis=1, inplace=True)
    assert not has_duplicates(rosters), 'rosters include duplicates'
    return rosters


def ingest_player_twitter_fl(tw_fl: pd.DataFrame):
    tw_fl = tw_fl.loc[:, ['playerId', 'numberOfFollowers', 'date']]
    assert not has_duplicates(tw_fl), 'player tw include duplicates'
    return tw_fl


def ingest_team_twitter_fl(tw_fl: pd.DataFrame):
    tw_fl.rename(columns={'numberOfFollowers': 'teamFollowers'}, inplace=True)
    tw_fl = tw_fl.loc[:, ['teamId', 'teamFollowers', 'date']]
    assert not has_duplicates(tw_fl, on=['date', 'teamId']), 'team tw include duplicates'
    return tw_fl


def ingest_awards(awards: pd.DataFrame):
    awards = awards[['date', 'awardId', 'playerId']]
    awards = awards.groupby(['date', 'playerId'])[['awardId']].count()
    awards.rename(columns={'awardId': 'awardCount'}, inplace=True)
    awards.reset_index(inplace=True)
    # awards['totalAwardCount'] = awards.groupby(['playerId'])[['awardId']].count()
    assert not has_duplicates(awards), 'awards tw include duplicates'
    return awards



def ingest_features(df: pd.DataFrame,
                    pstats: pd.DataFrame = None,
                    games: pd.DataFrame = None,
                    awards: pd.DataFrame = None,
                    ptw_fl: pd.DataFrame = None,
                    rosters: pd.DataFrame = None,
                    standings: pd.DataFrame = None,
                    team_tw_fl: pd.DataFrame = None,
                    teams: pd.DataFrame = None,
                    path_to_players_csv: str = None,
                    path_to_season_csv: str = None):


    if standings is not None:
        standings = ingest_standings(standings)
    if (pstats is not None) and (games is not None):
        pstats = ingest_stats_features(pstats, games, teams)
    if rosters is not None:
        rosters = ingest_rosters(rosters)
    if (awards is not None):
        awards = ingest_awards(awards)
    if ptw_fl is not None:
        ptw_fl = ingest_player_twitter_fl(ptw_fl)
    if team_tw_fl is not None:
        team_tw_fl = ingest_team_twitter_fl(team_tw_fl)
    
    for feature_ds in [pstats, rosters, ptw_fl, awards]:
        if feature_ds is None:
            continue
        df = df.merge(feature_ds, on=['playerId', 'date'],
                      how='left')

    if (team_tw_fl is not None) and ('teamId' in df.columns):
        df = df.merge(team_tw_fl, on=['teamId', 'date'],
                    how='left')

    if (standings is not None) and ('teamId' in df.columns):
        df = join_standings_to(df, standings)

    if path_to_season_csv is not None:
        df = join_season_info(df, path_to_season_csv=path_to_season_csv)
    if path_to_players_csv is not None:
        df = join_players_info(df, path_to_players_csv=path_to_players_csv)
    assert not has_duplicates(df), 'output features include duplicates'
    return df


def create_test_template(sample_submission: pd.DataFrame):
    test_df = sample_submission.copy(deep=True)
    test_df.reset_index(inplace=True)
    test_df['date'] = pd.to_datetime(test_df['date'], format='%Y%m%d')
    
    test_df.sort_values(by=['playerId', 'date'], inplace=True)
    test_df.reset_index(drop=True, inplace=True)
    return test_df.drop('date_playerId', axis=1)


def ingest_features_for_test(test_df: pd.DataFrame,
                             raw_test_df: pd.DataFrame,
                             train_fields: Dict[str, pd.DataFrame], 
                             path_to_players_csv: str = None,
                             path_to_season_csv: str = None):
    # read each feature dataset
    test_fields = unpack_dataframe(raw_test_df, fields=feature_fields)
    # update the information in the train fields
    test_fields = update_fields(train_fields, test_fields)
    # compute all features
    test_df = ingest_features(test_df, **test_fields,
                              path_to_players_csv=path_to_players_csv,
                              path_to_season_csv=path_to_season_csv)
    return test_df, test_fields


def ingest_test_data(submission_template: pd.DataFrame,
                     raw_test_df: pd.DataFrame,
                     train_fields: Dict[str, pd.DataFrame],
                     path_to_players_csv: str = None,
                     path_to_season_csv: str = None):
    submission_template['playerId'] = (submission_template['date_playerId']
                                      .map(lambda x: int(x.split('_')[1])))
    test_df = create_test_template(submission_template)

    test_date = test_df['date'].iloc[0]
    raw_test_df['date'] = test_date
    raw_test_df.set_index('date', inplace=True)

    test_df, test_fields = ingest_features_for_test(test_df, raw_test_df,
                                       train_fields,
                                       path_to_players_csv=path_to_players_csv,
                                       path_to_season_csv=path_to_season_csv) 
    return submission_template, test_df, test_fields


def ingest_train_data(path_to_train_csv: str,
                      path_to_players_csv: str,
                      path_to_season_csv: str) -> pd.DataFrame:
    print('reading training data..')
    train_data = pd.read_csv(path_to_train_csv, parse_dates=['date'])
    # set index date
    train_data = train_data.set_index('date')
    # ingest target data
    df = ingest_target(train_data)

    feature_fields_data = unpack_dataframe(train_data, fields=feature_fields)
    df = ingest_features(df, **feature_fields_data,
                        path_to_players_csv=path_to_players_csv,
                        path_to_season_csv=path_to_season_csv)
    return df




In [14]:
### src/data/ingest_data/core.py
import pandas as pd
from typing import List, Dict

fields_type = Dict[str, pd.DataFrame] 

def unpack_dataframe(df: pd.DataFrame, fields: List[str]) -> fields_type:
    output = {}
    for field, output_name in fields.items():
        # check if there is data for this feature
        if df.loc[:, field].isna().all():
            output[output_name] = None
            continue
        # unpack data
        tmf = make_unpack_tmf(field)
        field_data = tmf.fit_transform(df)
        output[output_name] = field_data
    return output


def update_fields(train_fields: fields_type,
                  test_fields: fields_type,
                  add_field: bool = False,
                  concat: bool = True) -> fields_type:

    for field, test_data in test_fields.items():
        # if the field in the main dict, and if there is some data to add
        if (field in train_fields):
            train_data = train_fields[field]
            updated_data = (train_data.append(test_data, ignore_index=True)
                            if concat else test_data)
        elif not (field in train_fields) and add_field:
            updated_data = test_data
        train_fields[field] = updated_data
    return train_fields


def split_fields_by_date(fields: fields_type,
                 start_date: str = None,
                 end_date: str = None,
                 features: List[str] = None,
                 dt_col: str = 'date'):
    if features is None:
        features = list(fields.keys())

    for feature in features:
        data = fields[feature]
        if start_date is not None:
            index = (data[dt_col] >= start_date)
            if index.sum() == 0:
                print(feature, index.sum())
            else:
                data = data.loc[index, :]
        if end_date is not None:
            index = (data[dt_col] < start_date)
            assert index.sum() > 0
            data = data.loc[index, :]
        data.reset_index(drop=True, inplace=True)
        fields[feature] = data
    return fields

    
def has_duplicates(X: pd.DataFrame,
                   on: List[str] = ['playerId', 'date']) -> bool:
    return X.loc[:, on].duplicated().sum() > 0


def compute_rank_features(df: pd.DataFrame, on: List[str],
                  features: List[str]):
    suffix = '__' + '__'.join(on + ['ranked'])
    output_features = list(pd.Series(features) + suffix)
    ranked_features = df.groupby(on)[features].rank()
    df.loc[:, output_features] = ranked_features.fillna(0).to_numpy()
    return df


def normalize_with_max(df: pd.DataFrame,
                       on: List[str],
                       features: List[str]):
    maximum = df.groupby(on)[features].transform('max')
    output_features = [f'__'.join([f] + on + ['maxNorm'])
                       for f in features]
    normalized_features = df.loc[:, features] / maximum.to_numpy()
    df.loc[:, output_features] = normalized_features.to_numpy()
    
    return df


def add_suffix(df: pd.DataFrame, features: List[str],
               suffix: str):
    new_features_names = {name: f'{name}__{suffix}'
                          for name in features}
    return df.rename(columns=new_features_names)

In [15]:
### src/data/ingest_data/games.py
import pandas as pd
import numpy as np


def preprocess_games_stats(games: pd.DataFrame):    
    to_drop = ['gameTimeUTC', 'resumeDate',
               'resumedFrom', 'codedGameState',
               'detailedGameState',
               'gameNumber',
               'doubleHeader',
               'dayNight',
               'scheduledInnings', 
               'homeName',
               'homeAbbrev',
               'gameType',
               'homeWins',
               'homeLosses',
               'homeWinPct',
               'awayWins',
               'awayLosses',
               'awayWinPct',
               'homeName',
               'gameDate',
               'awayName', 'awayAbbrev', 'isTie']
    games['gamesInSeries'] = (games['gamesInSeries'].replace({0: 1})
                              .fillna(1).astype(np.int64))
    # drop features
    
    games.sort_values(by=['date', 'gamePk'], inplace=True)
    games.reset_index(drop=True, inplace=True)
    return games.drop(to_drop, axis=1)


def _join_home_and_away_games(games: pd.DataFrame):
    # home games
    home_team_games = games.copy()
    home_team_games['home'] = 1
    home_team_games['away'] = 0
    home_team_games.rename(columns={'homeId': 'teamA',
                                    'awayId': 'teamB'}, inplace=True)

    # away games
    away_team_games = games.copy()
    away_team_games['home'] = 0
    away_team_games['away'] = 1
    away_team_games.rename(columns={'homeId': 'teamB',
                                    'awayId': 'teamA'}, inplace=True)
    
    team_games = pd.concat([home_team_games, away_team_games],
                               axis=0, ignore_index=True)
    
    # sort values
    team_games = (team_games.sort_values(by=['teamA', 'teamB', 'date'])
                  .reset_index(drop=True))
    
    return team_games


def compute_current_game_in_series(games: pd.DataFrame):
    team_games = _join_home_and_away_games(games)
    
    features = ['teamA', 'teamB', 'date', 'season', 'seriesDescription']
    team_games = team_games.loc[:, features]
    
    team_games['currentGameInSeries'] = 1

    current_series_game = (team_games.set_index('date')
                           .groupby(['teamA', 'teamB', 'season',
                                     'seriesDescription'])['currentGameInSeries']
                           .expanding().sum())

    current_series_game = current_series_game.reset_index()
    current_series_game.drop_duplicates(subset=['teamA', 'teamB',
                                                'date'], keep='last', inplace=True)
    current_series_game.reset_index(drop=True, inplace=True)    
    return current_series_game


def compute_current_game_in_series_and_join(games: pd.DataFrame):
    current_series_game = compute_current_game_in_series(games)
    current_series_game.drop(['season', 'seriesDescription'], axis=1, inplace=True)
    games = games.merge(current_series_game,
                        left_on=['homeId', 'awayId', 'date'],
                        right_on=['teamA', 'teamB', 'date'],
                        how='left')
    
    games.drop(['teamA', 'teamB'], axis=1, inplace=True)
    games['currentGameInSeries'] = ((games['currentGameInSeries'] - 1) %
                                     games['gamesInSeries'])
    return games
    

def compute_games_stats(games: pd.DataFrame) -> pd.DataFrame:
    team_games = _join_home_and_away_games(games)
    team_games = team_games.loc[:, ['teamA', 'teamB', 'date', 'season',
                                    'awayWinner', 'homeWinner', 'home', 'away']]
    # fill the nan with False
    # this is because you cant win at home when playing as visitant
    team_games['homeWinner'] = team_games['homeWinner'] * team_games['home']
    team_games['awayWinner'] = team_games['awayWinner'] * team_games['away']
    # some day have different days, lets sum over each day
    team_games = team_games.groupby(['teamA', 'teamB',  'season', 'date']).sum()
    team_games.reset_index(inplace=True)
    
    # sort values
    team_games = (team_games.sort_values(by=['teamA', 'teamB', 'date'])
                  .reset_index(drop=True))
    # calculate the cummulative sum
    team_games = (team_games.set_index('date')
                  .groupby(['teamA', 'teamB', 'season'])[['home', 'away', 'homeWinner', 'awayWinner']]
                  .expanding().sum())
    # compute stats
    team_games['totalGamesVsoppTeam'] = (team_games['away'] + team_games['home'])
    team_games['WinPctAsHome'] = team_games['homeWinner'] / team_games['home']
    team_games['WinPctAsAway'] = team_games['awayWinner'] / team_games['away']
    team_games['WintPctHist'] = ((team_games['homeWinner'] + team_games['awayWinner'])
                                 /  team_games['totalGamesVsoppTeam'])
    # fillnan with 0
    team_games.replace([np.inf, -np.inf], np.nan, inplace=True)
    team_games.fillna(0, inplace=True)
    # reset the index
    team_games.reset_index(inplace=True)
    team_games.drop(['away', 'home', 'homeWinner', 'awayWinner', 'season'], axis=1, inplace=True)
    return team_games


def compute_games_stats_and_join(games: pd.DataFrame) -> pd.DataFrame:
    input_shape = len(games)
    team_games = compute_games_stats(games)
    features = ['WinPctAsHome', 'WinPctAsAway', 'WintPctHist']
    # merge for the home ids
    games = games.merge(team_games, left_on=['homeId', 'awayId', 'date'],
            right_on=['teamA', 'teamB', 'date'],
            how='left')
    # rename columns to start with home
    games.drop(['teamA', 'teamB', 'totalGamesVsoppTeam'], inplace=True, axis=1)
    games.rename(columns={f: 'home' + f for f in features}, inplace=True)
    
    # merge for the away teams
    games = games.merge(team_games, left_on=['homeId', 'awayId', 'date'],
                right_on=['teamB', 'teamA', 'date'],
                how='left')
    # rename columns to start with waway
    games.rename(columns={f: 'away' + f for f in features}, inplace=True)
    games.drop(['teamA', 'teamB'], inplace=True, axis=1)
    
    assert len(games) == input_shape, \
           f'the input lenght (input_shape) != output shape (len(games))'
    return games


def ingest_games_stats(games: pd.DataFrame):
    games = preprocess_games_stats(games)
    games = compute_current_game_in_series_and_join(games)
    games = compute_games_stats_and_join(games)
    games.drop(['season', 'gamesInSeries', 'seriesDescription'], axis=1, inplace=True)
    return games

def join_games_stats_to_pstats(pstats: pd.DataFrame,
                               games: pd.DataFrame):
    teamFeatures = ['Id',
                    'Winner',
                    'Score',
                    'WinPctAsHome',
                    'WinPctAsAway',
                    'WintPctHist']

    add_suffix = lambda suffix: [suffix + f for f in teamFeatures] 

    homeFeatures = add_suffix('home')
    awayFeatures = add_suffix('away')
    playerTeamFeatures = add_suffix('playerTeam')
    opTeamFeatures = add_suffix('opponentTeam')
    
    pstats = pstats.merge(games, on=['gamePk', 'date'], how='left')
    # home and aways stats to player and opponent stats
    pstats.loc[:, playerTeamFeatures] = np.where(pstats[['home']], pstats[homeFeatures], pstats[awayFeatures])
    pstats.loc[:, opTeamFeatures] = np.where(pstats[['home']], pstats[awayFeatures], pstats[homeFeatures])
    # drop features
    redundat_features = ['playerTeamId', 'opponentTeamWinner', 'opponentTeamWintPctHist']
    pstats.drop((homeFeatures + awayFeatures +
                 redundat_features),
                axis=1, inplace=True)
    # compute the difference between the scores
    return pstats

In [16]:
### src/data/ingest_data/player_stats.py
import pandas as pd
from typing import List

player_stats_features = ['battingOrder', 'gamesPlayedBatting', 'flyOuts',
       'groundOuts', 'runsScored', 'doubles', 'triples', 'homeRuns',
       'strikeOuts', 'baseOnBalls', 'intentionalWalks', 'hits', 'hitByPitch',
       'atBats', 'caughtStealing', 'stolenBases', 'groundIntoDoublePlay',
       'groundIntoTriplePlay', 'plateAppearances', 'totalBases', 'rbi',
       'leftOnBase', 'sacBunts', 'sacFlies', 'catchersInterference',
       'pickoffs', 'gamesPlayedPitching', 'gamesStartedPitching',
       'completeGamesPitching', 'shutoutsPitching', 'winsPitching',
       'lossesPitching', 'flyOutsPitching', 'airOutsPitching',
       'groundOutsPitching', 'runsPitching', 'doublesPitching',
       'triplesPitching', 'homeRunsPitching', 'strikeOutsPitching',
       'baseOnBallsPitching', 'intentionalWalksPitching', 'hitsPitching',
       'hitByPitchPitching', 'atBatsPitching', 'caughtStealingPitching',
       'stolenBasesPitching', 'inningsPitched', 'saveOpportunities',
       'earnedRuns', 'battersFaced', 'outsPitching', 'pitchesThrown', 'balls',
       'strikes', 'hitBatsmen', 'balks', 'wildPitches', 'pickoffsPitching',
       'rbiPitching', 'gamesFinishedPitching', 'inheritedRunners',
       'inheritedRunnersScored', 'catchersInterferencePitching',
       'sacBuntsPitching', 'sacFliesPitching', 'saves', 'holds', 'blownSaves',
       'assists', 'putOuts', 'errors', 'chances']


players_features_to_drop = ['gamesPlayedBatting', 'flyOuts', 'doubles', 'triples', 'atBats',
       'caughtStealing', 'groundIntoDoublePlay', 'leftOnBase', 'sacBunts',
       'sacFlies', 'shutoutsPitching', 'flyOutsPitching',
       'airOutsPitching', 'doublesPitching',
       'triplesPitching', 'homeRunsPitching', 'baseOnBallsPitching',
       'intentionalWalksPitching', 'hitsPitching', 'hitByPitchPitching',
       'stolenBasesPitching', 'earnedRuns', 'pitchesThrown', 'balls',
       'strikes', 'hitBatsmen', 'wildPitches', 'rbiPitching',
       'inheritedRunnersScored', 'sacFliesPitching', 'errors']


rank_features = ['runsScored', 'homeRuns', 'hits', 'SLG', 'rbi', 'runsPitching']

def compute_player_metrics(pstats: pd.DataFrame):
    pstats['SLG'] = ((1 * pstats['hits'] +
                      2 * pstats['doubles'] +
                      3 * pstats['triples'] +
                      4 * pstats['homeRuns']) / pstats['atBats']).fillna(-1).to_numpy()
    return pstats
    
to_keep = ['home', 'gamePk', 'playerId', 'date', 'teamId']

def preprocess_player_stats(pstats: pd.DataFrame):
    # we drop teamId because this info is in roster]
    agg_pstats = (pstats.groupby(['playerId', 'date'])[player_stats_features]
                  .sum().reset_index())

    pstats = (pstats.drop_duplicates(subset=['playerId', 'date'], keep='last')
              .loc[:, to_keep])
    
    pstats['pstatsDate'] = pstats['date'].copy(deep=True)
    pstats = pstats.merge(agg_pstats, on=['playerId', 'date'], how='left')
    assert not has_duplicates(pstats), 'player stats include duplicates'
    return pstats


def ingest_player_stats(pstats: pd.DataFrame):
    pstats = preprocess_player_stats(pstats)
    pstats = compute_player_metrics(pstats)
    # rank features per game
    # pstats = compute_rank_features(pstats, on=['gamePk'],
                                #    features=rank_features)
    pstats = normalize_with_max(pstats, on=['date'],
                                features=rank_features)
    pstats = pstats.drop(players_features_to_drop, axis=1)
    stats_features = pstats.columns.drop(to_keep + ['pstatsDate'])
    pstats = add_suffix(pstats, features=stats_features, suffix='ptvf')
    return pstats

In [17]:
### src/data/ingest_data/standings.py
import pandas as pd
import numpy as np
from typing import List


def preprocess_standings(standings: pd.DataFrame):
    def _streak_code(standings):
        standings['streakCode'] = standings['streakCode'].fillna('W0')
        streak = standings['streakCode'].str[1:].astype(np.float32)
        code = np.where(standings['streakCode'].str[:1] == 'W', 1, -1)
        standings['streak'] = streak * code
        standings.drop('streakCode', axis=1, inplace=True)
    to_drop = ['gameDate', 'teamName',
               'leagueGamesBack',
               'sportGamesBack', 'divisionGamesBack', 
               'runsAllowed', 'runsScored', 
               'extraInningWins', 'extraInningLosses', 
               'oneRunWins',
               'oneRunLosses', 'dayWins', 'dayLosses',
               'nightWins', 'nightLosses',
               'grassWins', 'grassLosses', 'turfWins',
               'turfLosses', 'divWins',
               'divLosses', 'alWins', 'alLosses',
               'nlWins', 'nlLosses', 'season', 
               'wins', 'losses', 'wildCardEliminationNumber',
               'eliminationNumber', 'divisionRank', 'leagueRank', 'wildCardRank',
               'divisionId', 'streakCode']
    standings = standings.drop(to_drop, axis=1)
    # _streak_code(standings)
    standings['wildCardLeader'] = (standings['wildCardLeader'].replace({'None': False})
                                   .fillna(False).astype(np.bool_))

    bool_features = ['divisionChamp', 'divisionLeader']
    standings[bool_features] = standings[bool_features].astype('float')

    standings['homeWinPct'] = standings['homeWins'] / (standings['homeWins'] + standings['homeLosses'])
    standings['awayWinPct'] = standings['awayWins'] / (standings['awayWins'] + standings['awayLosses'])

    standings.rename(columns={'pct': 'winPct'}, inplace=True)
    standings = standings.drop(['homeWins', 'homeLosses',
                                 'awayWins', 'awayLosses'], axis=1)
    # scale up to 1
    standings[['lastTenWins', 'lastTenLosses']] /= 10
    return standings
    

def compute_standings_features(standings: pd.DataFrame, features: List[str]):
    return compute_rank_features(standings, on=['date'], features=features)


def ingest_standings(standings: pd.DataFrame):
    standings = preprocess_standings(standings)
    standings = compute_standings_features(standings, features=['homeWinPct', 'awayWinPct', 'winPct'])
    return standings


# def join_standings_to(df: pd.DataFrame,
#                       standings: pd.DataFrame,
#                       as_away: bool = False):
#     team_col = 'teamId' if as_away else 'opponentTeamId'
#     output_name = 'playerTeam' if as_away else 'opponentTeam'
#     _standings = standings.rename(columns={'teamId': team_col})

#     df = df.merge(_standings, how='left', on=['date', team_col])
#     df.rename(columns={f: f'{output_name}{f.title()}St'
#                            for f in _standings.columns.drop([team_col, 'date'])}, inplace=True)
#     return df


def join_standings_to(df: pd.DataFrame,
                      standings: pd.DataFrame):
    def _merge(df: pd.DataFrame, standings, team_col: str, output_name: str):
        df = df.merge(standings, how='left', on=['date', team_col])
        df.rename(columns={f: f'{output_name}{f.title()}'
                            for f in standings.columns.drop([team_col, 'date'])}, inplace=True)
        return df

    df = _merge(df, standings, team_col='teamId',
                output_name='playerTeam')
    # if 'opponentTeamId' not in df.columns:
        # return df
    # standings = standings.rename(columns={'teamId': 'opponentTeamId'})
    # df = _merge(df, standings, team_col='opponentTeamId',
                # output_name='opponentTeam')
    return df


In [18]:
### src/data/ingest_data/team_stats.py
import pandas as pd
from typing import List

team_stats_features = ['runsScored', 'homeRuns', 'strikeOuts', 'hits', 'runsPitching',
                       'homeRunsPitching', 'outsPitching','rbiPitching']

rank_features = ['runsScored', 'homeRuns']

def preprocess_teams(teams: pd.DataFrame):
    to_keep = ['teamId', 'date']
    # we drop teamId because this info is in roster]
    agg_teams = (teams.groupby(['teamId', 'date'])[team_stats_features]
                  .sum().reset_index())

    teams = (teams.drop_duplicates(subset=['teamId', 'date'], keep='last')
              .loc[:, to_keep])
    
    teams = teams.merge(agg_teams, on=['teamId', 'date'], how='left')
    assert not has_duplicates(teams, on=['teamId', 'date']), 'team stats include duplicates'
    return teams

def ingest_team_stats(teams: pd.DataFrame):
    teams = preprocess_teams(teams)
    teams = normalize_with_max(teams, on=['date'],
                                features=rank_features)
    return teams

def join_team_stats_to_pstats(pstats: pd.DataFrame,
                              teams: pd.DataFrame):
    features = teams.columns.drop(['date', 'teamId'])
    player_features = list('playerTeam' + features)
    away_features = list('opponentTeam' + features)
    
    # for the player team
    pstats = pstats.merge(teams, on=['teamId', 'date'], how='left')
    pstats.rename(columns={old: new
                           for old, new in zip(features, player_features)},
                  inplace=True)
    
    # for the away team
    teams.rename(columns={'teamId': 'opponentTeamId'}, inplace=True)
    pstats = pstats.merge(teams, on=['opponentTeamId', 'date'], how='left')
    pstats.rename(columns={old: new
                           for old, new in zip(features, away_features)},
                  inplace=True)
    
    return pstats


In [19]:
### src/models/cont_emb_stack.py
import pytorch_lightning as pl
from typing import List, Tuple, Callable, Dict
from torch import nn, optim
from torch.nn import functional as F
import torch
from collections import OrderedDict
import numpy as np

def mae(yhat, y, weight=None):
    error = torch.abs(yhat-y)
    if weight is None:
        return error.mean()
    weight /= weight.sum()
    return (error * weight).sum()


def get_emb_size(cardinality: int, maximum: int = 20):
    emb_szs = int(np.ceil(cardinality**(1/2)))
    return (cardinality, min(emb_szs, maximum))


class EmbeddingLayer(nn.Module):
    def __init__(self, categories: Dict[str, int],
                 dropout: float = 0.,
                 max_emb_sz: int = 20,
                 stack_fn: Callable = torch.cat):
        super().__init__()

        emb_szs = OrderedDict({name: get_emb_size(size, max_emb_sz)
                              for name, size in categories.items()})
        self.dropout = nn.Dropout(dropout)
        self.emb = nn.ModuleDict(OrderedDict({name: nn.Embedding(size, hidden_dim)
                                             for name, (size, hidden_dim) in emb_szs.items()}))
        self.stack_fn = stack_fn
        self.out_features = sum([hidden_dim for (_, hidden_dim) in emb_szs.values()])
    
    def forward(self, x):
        output = self.stack_fn([emb(x[:, e]) for e, emb in enumerate(self.emb.values())], dim=-1)
        output = self.dropout(output)
        return output


def build_feed_forward(layers: List[int],
                       use_bn: bool = False,
                       input_bn: bool = False,
                       act_fn: Callable = nn.ReLU,
                       dropout: List[int] = None):
    model = []

    if input_bn:
        model.append(nn.BatchNorm1d(layers[0]))

    if isinstance(dropout, float):
        dropout = [dropout] * len(layers)

    for L in range(len(layers)-1):
        input_dim, output_dim = layers[L], layers[L+1]
        linear = nn.Linear(input_dim, output_dim)
        model.append(linear)
        if L+1 < len(layers) -1:
            model.append(act_fn())
            if use_bn:
                model.append(nn.BatchNorm1d(output_dim))
            if dropout is not None:
                model.append(nn.Dropout(dropout[L]))

    return nn.Sequential(*model)


def compute_factor_layers(input_dim,
                          depth: int = 1,
                          factor: float = 1.,
                          dtype=np.int64):
    layers_sizes = np.array([input_dim] * (depth-1))
    layers_pct = np.array([[factor] * (depth-1)])
    layers_pct = np.cumprod(layers_pct)
    layers_sizes = list((layers_sizes * layers_pct).astype(dtype))
    return layers_sizes


def scale_to_100(x):
    return torch.sigmoid(x) * 100

def log_softmax(x):
    return torch.log_softmax(x, dim=-1)


class EmbModelModule(pl.LightningModule):
    def __init__(self, 
                 cont_features: int,
                 categories: Dict[str, int],
                 encoder_dim: int,
                 depth: int,
                 decrease_factor: float = 1.,
                 drop_decrease_factor: float = 1.,
                 emb_dropout: float = 0.,
                 max_emb_sz: int = 20,
                 dropout: float = 0.,
                 lr: float = 0.01,
                 wd: float = 0.,
                 out_features: int = 1,
                 output_act_fn: Callable = None):
        super().__init__()
        self.lr = lr
        self.wd = wd
        self.output_act_fn = output_act_fn
        
        self.emb = EmbeddingLayer(categories, dropout=emb_dropout, max_emb_sz=max_emb_sz)
        
        layers_sizes = compute_factor_layers(encoder_dim, depth=depth, factor=decrease_factor)
        drop_sizes = compute_factor_layers(dropout, depth=depth,
                                           factor=drop_decrease_factor, dtype=float)
        daily_features = self.emb.out_features + cont_features
        dropout_layers = [dropout] + drop_sizes
        daily_decoder_layers = [daily_features, encoder_dim] + layers_sizes + [out_features]
        self.output_layer = build_feed_forward(daily_decoder_layers, use_bn=False,
                                                dropout=dropout_layers,
                                                act_fn=self._act_fn)

    def _act_fn(self):
        return nn.LeakyReLU(0.2)

    def forward(self, features,
                categories,
                target=None,
                weight=None):

        categories = self.emb(categories)

        features = torch.cat((features, categories), dim=-1)
        
        prediction = self.output_layer(features)
        if self.output_act_fn is not None:
            prediction = self.output_act_fn(prediction)
        return prediction


class RegressionEmbModel(EmbModelModule):
    def __init__(self, 
                 cont_features: int,
                 categories: Dict[str, int],
                 encoder_dim: int,
                 depth: int,
                 decrease_factor: float = 1.,
                 drop_decrease_factor: float = 1.,
                 emb_dropout: float = 0.,
                 max_emb_sz: int = 20,
                 dropout: float = 0.,
                 lr: float = 0.01,
                 wd: float = 0.,
                 out_features: int = 1,
                 scale_output: bool = True):

        output_act_fn = (scale_to_100 if scale_output else None)
        super().__init__(cont_features=cont_features, categories=categories,
                         max_emb_sz=max_emb_sz, encoder_dim=encoder_dim, out_features=out_features,
                         dropout=dropout, emb_dropout=emb_dropout, depth=depth, lr=lr,
                         decrease_factor=decrease_factor, drop_decrease_factor=drop_decrease_factor,
                         wd=wd, output_act_fn=output_act_fn)
        
    def training_step(self, batch, batch_idx):
        y_hat = self(**batch)
        weight = batch['weight'] if 'weight' in batch else None
        loss = mae(y_hat, batch['target'], weight)
        self.log('train_loss', loss, on_epoch=True,
                 prog_bar=True, sync_dist=True)
        return loss

    def validation_step(self, batch, batch_idx):
        y_hat = self(**batch)
        loss = mae(y_hat, batch['target'])
        self.log('valid_loss', loss, on_epoch=True,
                 prog_bar=True, sync_dist=True)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.wd)
        return optimizer

    
class ClassificationEmbModel(EmbModelModule):
    def __init__(self, 
                 cont_features: int,
                 categories: Dict[str, int],
                 encoder_dim: int,
                 depth: int,
                 decrease_factor: float = 1.,
                 drop_decrease_factor: float = 1.,
                 emb_dropout: float = 0.,
                 max_emb_sz: int = 20,
                 dropout: float = 0.,
                 lr: float = 0.01,
                 wd: float = 0.,
                 out_features: int = 1,
                 pos_weight: torch.tensor = None):
        self.pos_weight = pos_weight

        output_act_fn = log_softmax
        super().__init__(cont_features=cont_features, categories=categories,
                         max_emb_sz=max_emb_sz, encoder_dim=encoder_dim, out_features=out_features,
                         dropout=dropout, emb_dropout=emb_dropout, depth=depth, lr=lr,
                         decrease_factor=decrease_factor, drop_decrease_factor=drop_decrease_factor,
                         wd=wd, output_act_fn=output_act_fn)

    def training_step(self, batch, batch_idx):
        y_hat = self(**batch)
        loss = F.nll_loss(y_hat, batch['target'], weight=self.pos_weight)
        self.log('train_loss', loss, on_epoch=True,
                 prog_bar=True, sync_dist=True)
        return loss

    def validation_step(self, batch, batch_idx):
        y_hat = self(**batch)
        loss = F.nll_loss(y_hat, batch['target'], weight=self.pos_weight)
        self.log('valid_loss', loss, on_epoch=True,
                 prog_bar=True, sync_dist=True)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.wd)
        return optimizer


  rank_zero_deprecation(


In [20]:
### src/models/lstm.py
import pytorch_lightning as pl
from typing import List, Tuple, Callable, Dict
from torch import nn, optim
from torch.nn import functional as F
import torch
from collections import OrderedDict

def mae(yhat, y):
    return torch.abs(yhat-y).mean()

def get_emb_size(cardinality: int, maximum: int = 20):
    return (cardinality+1, min(int(cardinality**(1/2)), maximum))


class EmbeddingLayer(nn.Module):
    def __init__(self, categories: Dict[str, int],
                 dropout: float = 0.,
                 max_emb_sz: int = 20,
                 stack_fn: Callable = torch.cat):
        super().__init__()

        emb_szs = OrderedDict({name: get_emb_size(size, max_emb_sz)
                              for name, size in categories.items()})
        self.dropout = nn.Dropout(dropout)
        self.emb = nn.ModuleDict(OrderedDict({name: nn.Embedding(size, hidden_dim)
                                             for name, (size, hidden_dim) in emb_szs.items()}))
        self.stack_fn = stack_fn
        self.out_features = sum([hidden_dim for (_, hidden_dim) in emb_szs.values()])
    
    def forward(self, x):
        output = self.stack_fn([emb(x[:, e]) for e, emb in enumerate(self.emb.values())], dim=-1)
        output = self.dropout(output)
        return output


class LstmModel(pl.LightningModule):
    def __init__(self, 
                 static_features: int,
                 time_features: int,
                 categories: Dict[str, int],
                 max_emb_sz: int,
                 hidden_dim: int,
                 encoder_dim: int,
                 emb_dropout: float = 0.,
                 dropout: float = 0.,
                 lr: float = 0.01,
                 wd: float = 0.,
                 out_features: int = 1,
                 n_layers: int = 2):
        super().__init__()
        
        self.lr = lr
        self.wd = wd
        
        self.emb = EmbeddingLayer(categories, dropout=emb_dropout, max_emb_sz=max_emb_sz)
        self.hidden_dim = hidden_dim
        # lstm
        self.net = nn.LSTM(time_features, hidden_dim, batch_first=True)
        # decoder
        input_decoder = static_features + hidden_dim + self.emb.out_features
        self.output_layer = nn.Sequential(nn.Linear(input_decoder, encoder_dim),
                                          nn.ReLU(),
                                          nn.BatchNorm1d(encoder_dim),
                                          nn.Dropout(dropout),
                                          nn.Linear(encoder_dim, out_features))

    def _init_hidden(self, bs):
        return next(self.parameters()).new(1, bs, self.hidden_dim).zero_()
        
    def init_hidden_state(self, bs):
        return (self._init_hidden(bs), self._init_hidden(bs))
    
    def forward(self, features,
                timeft,
                categories,
                target=None):        
        bs, sq, ft = timeft.size()
        hidden_state = self.init_hidden_state(bs)

        _, (final_state, _) = self.net(timeft, hidden_state)
        
        final_state.squeeze_(dim=0)
        categories = self.emb(categories)
        final_state = torch.cat((features, categories, final_state), dim=1)
        prediction = self.output_layer(final_state)
        # scaled prediction to 0 to 100
        prediction = torch.sigmoid(prediction) * 100

        return prediction
    
    def training_step(self, batch, batch_idx):
        y_hat = self(**batch)
        loss = mae(y_hat, batch['target'])
        self.log('train_mae', loss, on_epoch=True,
                 prog_bar=True, sync_dist=True)
        return loss

    def validation_step(self, batch, batch_idx):
        y_hat = self(**batch)
        loss = mae(y_hat, batch['target'])
        self.log('valid_mae', loss, on_epoch=True,
                 prog_bar=True, sync_dist=True)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.wd)
        return optimizer

In [21]:
### src/train/cont_emb_stack.py

from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.callbacks import ModelCheckpoint
import pytorch_lightning as pl
from typing import Dict, Any
import torch
from torch.utils.data import DataLoader
from torch import nn
import pandas as pd
import numpy as np
import gc


def predict_dl(model: nn.Module, valid_dl: DataLoader):
    model.eval()
    with torch.no_grad():
        
        prediction = torch.cat([model(**batch)
                               for batch in valid_dl])
    return prediction.numpy()


def load_best_state(model, checkpoint_callback):
    print('loading model to best score')
    print(f'best score = {checkpoint_callback.best_model_score}')
    best_model_parameters = torch.load(checkpoint_callback.best_model_path)['state_dict']
    model.load_state_dict(best_model_parameters)
    return model
    

def run_fn(config: Dict[str, Any],
           train_data: pd.DataFrame,
           valid_data: pd.DataFrame):
    
    if config.seed is not None:
        torch.manual_seed(config.seed)

    # hyperparameters
    hp = config.hp
    
    # create dls
    train_ds = PlayerDataset.from_df(train_data,
                                      features=config.features,
                                      categories=config.categories,
                                      target=config.target_cols,
                                      weight=config.weight,
                                      bptt=0)

    train_dl = DataLoader(train_ds, batch_size=hp.batch_size, shuffle=True,
                          num_workers=4)

    valid_ds = PlayerDataset.from_df(valid_data, 
                                      features=config.features,
                                      categories=config.categories,
                                      target=config.target_cols,
                                      bptt=0)

    valid_dl = DataLoader(valid_ds, batch_size=hp.batch_size,
                          shuffle=False, num_workers=4)
    
    categories = train_data.loc[:, config.categories].nunique().to_dict()

    model = RegressionEmbModel(cont_features=len(config.features),
                      categories=categories,
                      max_emb_sz=hp.max_emb_sz,
                      encoder_dim=hp.encoder_dim,
                      out_features=len(config.target_cols),
                      dropout=hp.dropout,
                      emb_dropout=hp.emb_dropout,
                      depth=hp.depth,
                      lr=hp.lr,
                      decrease_factor=hp.decrease_factor,
                      drop_decrease_factor=hp.drop_decrease_factor,
                      scale_output=hp.scale_output,
                      wd=hp.wd)
    print(model)

    patience = hp.early_stop_patience if hp.early_stop_patience is not None else 3
    early_stopping = EarlyStopping('valid_loss', patience=patience)
    checkpoint_callback  = ModelCheckpoint(monitor='valid_loss',
                                           save_top_k=3,
                                           save_weights_only=True)
    trainer = pl.Trainer(max_epochs=hp.epochs,
                         callbacks=[early_stopping, checkpoint_callback])

    trainer.fit(model, train_dl, valid_dl)

    # loading best model so far
    model = load_best_state(model, checkpoint_callback)
    
    def predict_fn(test_features: pd.DataFrame):
        test_ds = PlayerDataset.from_df(test_features,
                                        features=config.features,
                                        categories=config.categories,
                                        bptt=1)
        test_dl = DataLoader(test_ds, batch_size=hp.batch_size, shuffle=False)
        prediction = predict_dl(model, test_dl)
        del test_dl, test_ds
        return prediction

    valid_prediction = predict_dl(model, valid_dl)
    gc.collect()
    return ModelOutput(model, predict_fn, valid_prediction)


def run_classification_fn(config: Dict[str, Any],
                          train_data: pd.DataFrame,
                          valid_data: pd.DataFrame):
    
    if config.seed is not None:
        torch.manual_seed(config.seed)

    # hyperparameters
    hp = config.hp
    
    # create dls
    train_ds = PlayerDataset.from_df(train_data,
                                      features=config.features,
                                      categories=config.categories,
                                      target=config.target_name,
                                      bptt=0)

    train_dl = DataLoader(train_ds, batch_size=hp.batch_size, shuffle=True,
                          num_workers=4)

    valid_ds = PlayerDataset.from_df(valid_data, 
                                      features=config.features,
                                      categories=config.categories,
                                      target=config.target_name,
                                      bptt=0)

    valid_dl = DataLoader(valid_ds, batch_size=hp.batch_size,
                          shuffle=False, num_workers=4)
    
    categories = train_data.loc[:, config.categories].nunique().to_dict()
    output_features = train_data[config.target_name].nunique()
    
    pos_weight = train_data[config.target_name].value_counts()
    pos_weight = torch.from_numpy((pos_weight.min() / pos_weight)
                                   .to_numpy().astype(np.float32))

    model = ClassificationEmbModel(cont_features=len(config.features),
                                   categories=categories,
                                   max_emb_sz=hp.max_emb_sz,
                                   encoder_dim=hp.encoder_dim,
                                   out_features=output_features,
                                   dropout=hp.dropout,
                                   emb_dropout=hp.emb_dropout,
                                   depth=hp.depth,
                                   lr=hp.lr,
                                   decrease_factor=hp.decrease_factor,
                                   drop_decrease_factor=hp.drop_decrease_factor,
                                   wd=hp.wd, 
                                   pos_weight=pos_weight)
    print(model)

    patience = hp.early_stop_patience if hp.early_stop_patience is not None else 3
    early_stopping = EarlyStopping('valid_loss', patience=patience)
    checkpoint_callback  = ModelCheckpoint(monitor='valid_loss',
                                           save_top_k=3,
                                           save_weights_only=True)
    trainer = pl.Trainer(max_epochs=hp.epochs,
                         callbacks=[early_stopping, checkpoint_callback])

    trainer.fit(model, train_dl, valid_dl)

    # loading best model so far
    model = load_best_state(model, checkpoint_callback)
    
    def predict_fn(test_features: pd.DataFrame):
        test_ds = PlayerDataset.from_df(test_features,
                                        features=config.features,
                                        categories=config.categories,
                                        bptt=1)
        test_dl = DataLoader(test_ds, batch_size=hp.batch_size, shuffle=False)
        prediction = predict_dl(model, test_dl)
        del test_dl, test_ds
        return prediction

    valid_prediction = predict_dl(model, valid_dl)
    gc.collect()
    return ModelOutput(model, predict_fn, valid_prediction)

In [22]:
### src/train/core.py
from typing import Callable, Any
from dataclasses import dataclass
import numpy as np
from typing import List, Dict, Callable
from sklearn.pipeline import Pipeline
import gc
import pandas as pd


@dataclass
class ModelOutput:
    model: Any
    predict_fn: Callable
    prediction: np.ndarray


def ensemble_pred(preds: List[np.ndarray]):
    return np.stack(preds, axis=1).mean(axis=1)

class Ensemble():
    def __init__(self, models: List[ModelOutput],
                 pipeline: Pipeline):
        self.models = models
        self.pipeline = pipeline
    
    def __call__(self, raw_df: pd.DataFrame):
        test_features = self.pipeline.transform(raw_df)
        return ensemble_pred([_model.predict_fn(test_features)
                              for _model in self.models])


def predict_recursive(test_df: pd.DataFrame,
                      raw_df: pd.DataFrame, 
                      predict_fn: Callable,
                      n_days: int,
                      target_cols: List[str]):
    
    assert test_df['date'].nunique() == 1, \
           'the test set has more than one date'

    test_date = test_df['date'].iloc[0]
    last_date = raw_df['date'].max()
    
    raw_df = raw_df[raw_df['date'] >= (last_date - pd.to_timedelta(n_days, unit='d'))]
    
    if test_date <= last_date:
        print('test date in training data')
        raw_df = raw_df[raw_df['date'] < test_date]
        last_date = raw_df['date'].max()
    assert test_date - last_date == pd.to_timedelta(1, unit='d'), \
        f'the test date ({test_date}) must be one day after the last_date ({last_date})'
    # append information
    raw_df = raw_df.append(test_df, ignore_index=True)
    # sort by index and dates
    raw_df.sort_values(by=['playerId', 'date'], inplace=True)
    raw_df.reset_index(drop=True, inplace=True)
    # get the index to locate the test df
    index = (raw_df['date'] == test_date)
    
    assert index.sum() == len(test_df)
    # predict
    prediction = predict_fn(raw_df)
    # filter prediction
    prediction = prediction[index]
    # create a dataframe with the prediction
    prediction_df = pd.DataFrame(prediction, columns=target_cols)
    prediction_df['date'] = test_date
    prediction_df['playerId'] = raw_df.loc[index, 'playerId'].to_numpy()
    print(prediction_df.head())
    # add the prediction to the dataset
    raw_df.loc[index, target_cols] = prediction
    
    del prediction, index
    gc.collect()
    return prediction_df, raw_df
    

In [23]:
### src/train/lgbm.py
from lightgbm import LGBMRegressor
import pandas as pd
from typing import Dict, Any
import numpy as np
import gc


def run_lgbm(config: Dict[str, Any],
             train_data: pd.DataFrame,
             valid_data:  pd.DataFrame, verbose=100):

    models = []
    train_features = train_data.loc[:, config.features]
    valid_features = valid_data.loc[:, config.features]

    for target_name in sorted(config.target_cols):
        print(target_name)
        _model = LGBMRegressor(**config.hp)
        _model.fit(train_features,
                   train_data.loc[:, target_name],
                   eval_set=[(valid_features, valid_data.loc[:, target_name])],  
                    early_stopping_rounds=verbose, 
                    verbose=verbose,
                    categorical_feature=config.categories)
        models.append( _model)

    def predict_fn(test_features: pd.DataFrame):
        return np.stack([_model.predict(test_features.loc[:, config.features])
                         for _model in models], axis=1)

    valid_prediction = predict_fn(valid_data)

    del train_features, valid_features
    gc.collect()
    return ModelOutput(models, predict_fn, valid_prediction)



In [24]:
### src/train/lstm.py
from pytorch_lightning.callbacks import EarlyStopping
import pytorch_lightning as pl
from typing import Dict, Any
import torch
from torch.utils.data import DataLoader
from torch import nn
import pandas as pd
import gc


def predict_dl(model: nn.Module, valid_dl: DataLoader):
    model.eval()
    with torch.no_grad():
        prediction = torch.cat([model(**batch)
                               for batch in valid_dl])
    return prediction.numpy()


def run_lstm(config: Dict[str, Any],
             train_data: pd.DataFrame,
             valid_data: pd.DataFrame):
    
    if config.seed is not None:
        torch.manual_seed(config.seed)

    # hyperparameters
    hp = config.hp
    
    # create dls
    train_ds = PlayerDataset.from_df(train_data,
                                      features=config.static_features,
                                      time_features=config.time_features,
                                      categories=config.categories,
                                      target=config.target_cols,
                                      bptt=hp.bptt)

    train_dl = DataLoader(train_ds, batch_size=hp.batch_size, shuffle=True,
                          num_workers=4)
    
    valid_ds = PlayerDataset.from_df(valid_data, 
                                      features=config.static_features,
                                      time_features=config.time_features,
                                      categories=config.categories,
                                      target=config.target_cols,
                                      bptt=hp.bptt)
    valid_dl = DataLoader(valid_ds, batch_size=hp.batch_size,
                          shuffle=False, num_workers=4)
    
    
    categories = train_data.loc[:, config.categories].nunique().to_dict()
    
    model = LstmModel(static_features=len(config.static_features),
                      time_features=len(config.time_features),
                      categories=categories,
                      max_emb_sz=hp.max_emb_sz,
                      hidden_dim=hp.hidden_dim,
                      encoder_dim=hp.encoder_dim,
                      out_features=len(config.target_cols),
                      dropout=hp.dropout,
                      emb_dropout=hp.emb_dropout,
                      lr=hp.lr,
                      wd=hp.wd)
    print(model)
    early_stopping = EarlyStopping('valid_mae', patience=3)
    trainer = pl.Trainer(max_epochs=hp.epochs, callbacks=[early_stopping])

    trainer.fit(model, train_dl, valid_dl)
    
    def predict_fn(test_features: pd.DataFrame):
        test_ds = PlayerDataset.from_df(test_features,
                                        features=config.static_features,
                                        time_features=config.time_features,
                                        categories=config.categories,
                                        bptt=hp.bptt)
        test_dl = DataLoader(test_ds, batch_size=hp.batch_size, shuffle=False)
        prediction = predict_dl(model, test_dl)
        del test_dl, test_ds
        return prediction

    valid_prediction = predict_dl(model, valid_dl)

    del valid_dl, train_dl, train_ds, valid_ds, trainer
    gc.collect()
    return ModelOutput(model, predict_fn, valid_prediction)
