In [1]:
DTYPES = {
    'date_time': 'string',
    'site_name': 'uint8',
    'posa_continent': 'uint8',
    'user_location_country': 'uint8',
    'user_location_region': 'uint16',
    'user_location_city': 'uint16',
    'orig_destination_distance': 'float32',
    'user_id': 'uint32',
    'is_mobile': 'bool',
    'is_package': 'bool',
    'channel': 'uint8',
    'srch_ci': 'string',
    'srch_co': 'string',
    'srch_adults_cnt': 'uint8',
    'srch_children_cnt': 'uint8',
    'srch_rm_cnt': 'uint8',
    'srch_destination_id': 'uint16',
    'srch_destination_type_id': 'uint8',
    'is_booking': 'bool',
    'cnt': 'uint16',
    'hotel_continent': 'uint8',
    'hotel_country': 'uint8',
    'hotel_market': 'uint16',
    'hotel_cluster': 'uint8',
}
DATETIME_COLUMNS = ['date_time', 'srch_ci', 'srch_co']
BOOLEAN_COLUMNS = ['is_booking', 'is_mobile', 'is_package']

In [2]:
import numpy as np
import polars as pl


def map_to_polars(dtype: str):
    conversion = {
        'string': pl.String,
        'uint8': pl.UInt8,
        'uint16': pl.UInt16,
        'uint32': pl.UInt32,
        'int16': pl.Int16,
        'int32': pl.Int32,
        'float32': pl.Float32,
        'bool': pl.UInt8 
    }
    return conversion[dtype]

def map_to_np(dtype):
    conversion = {
        pl.String: 'string',
        pl.UInt8: 'uint8',
        pl.UInt16: 'uint16',
        pl.UInt32: 'uint32',
        pl.Float32: 'float32',
        pl.Int8: 'int8',
        pl.Int16: 'int16',
        pl.Int32: 'int32',
        pl.Boolean: 'bool'
    }
    return conversion.get(dtype, 'datetime')

In [3]:
dtypes = {k: map_to_polars(v) for k, v in DTYPES.items()}
df = pl.read_csv('../data/raw/train.csv', dtypes=dtypes)
df = df.with_columns(
    *[pl.col(col).str.to_datetime() for col in DATETIME_COLUMNS]
)
for col in BOOLEAN_COLUMNS:
    df = df.replace(col, df[col] == 1)

    df = df.with_columns(new_column.alias(column_name))
instead.
  df = df.replace(col, df[col] == 1)


In [4]:
TARGET = 'is_booking'

In [6]:
from collections.abc import Callable
from dataclasses import dataclass, field


@dataclass
class DataPreparationPipelineState:
    schema: dict[str, str] = field(default_factory=dict)
    numerical_columns: list[str] = field(default_factory=list)
    categorical_columns: list[str] = field(default_factory=list)
    numerical_categorical_columns: list[str] = field(default_factory=list)


class DataPreparationPipeline:
    '''
    Lazy pipeline for data preprocessing
    '''
    Task = Callable[[tuple[pl.DataFrame, DataPreparationPipelineState]], tuple[pl.DataFrame, DataPreparationPipelineState]]
    def __init__(self) -> None:
        self.pipeline = []

    def _add_task(self, task: Task) -> None:
        self.pipeline.append(task)

    def add_columns(self, expressions: dict[pl.expr, str]) -> 'FeatureBuilder':
        '''
        Add column to a DataFrame
        :param expressions: dict with keys as column names, values are tuples of polars expressions and result dtype
        '''

        def task(df: pl.DataFrame, state: DataPreparationPipelineState) -> tuple[pl.DataFrame, DataPreparationPipelineState]:
            df = df.with_columns(**{
                col: expr.cast(map_to_polars(dtype)) for col, (expr, dtype) in expressions.items() 
            })
            for col, (_, dtype) in expressions.items():
                state.schema[col] = dtype
            return df, state
        
        self._add_task(task)
        return self
    
    def map_columns(self, expressions: dict[pl.expr, str]) -> 'FeatureBuilder':
        '''
        Maps column in a DataFrame
        :param expressions: dict with keys as column names, values are tuples of polars expressions and result dtype
        '''

        return self.add_columns(expressions)

    def drop_columns(self, columns: list[str]) -> 'FeatureBuilder':
        '''
        Drops columns from dataframe
        :param columns: columns to drop
        '''

        def task(df: pl.DataFrame, state: DataPreparationPipelineState) -> tuple[pl.DataFrame, DataPreparationPipelineState]:
            df = df.drop(*columns)
            state.schema = {k: v for k, v in state.schema.items() if k not in columns}
            return df, state
        
        self._add_task(task)
        return self
    
    def cast(self, to: dict[str, str]) -> 'FeatureBuilder':
        '''
        Casts given columns to given dtype
        :param to: dict[column, dtype]
        '''

        def task(df: pl.DataFrame, state: DataPreparationPipelineState) -> tuple[pl.DataFrame, DataPreparationPipelineState]:
            df = df.with_columns(**{
                col: pl.col(col).cast(map_to_polars(dtype)) for col, dtype in to.items()   
            })
            for col, (_, dtype) in to.items():
                state.schema[col] = dtype
            return df, state
        
        self._add_task(task)
        return self
    
    def make_column_split(self, num_cat_threshold: int = 0) -> 'FeatureBuilder':
        '''
        Splits columns into 3 categories:
        1. Categorical columns
        2. "Numerical categorical" columns (numerical columns where number of unique values less than threshold)
        3. Numerical columns

        :param num_cat_threshold: number of unique values, below which numerical feature is considered as numerical categorical
        by default it is 0, so there are no numerical categorical columns
        '''

        def task(df: pl.DataFrame, state: DataPreparationPipelineState) -> tuple[pl.DataFrame, DataPreparationPipelineState]:
            state.numerical_categorical_columns = [
                col for col in df.columns
                if df[col].dtype not in [pl.Boolean, pl.String, pl.Datetime] and df[col].n_unique() < num_cat_threshold
            ]
            state.numerical_columns = [
                col for col in df.columns
                if df[col].dtype not in [pl.Boolean, pl.String, pl.Datetime] and col not in state.numerical_categorical_columns
            ]
            state.categorical_columns = [
                col for col in df.columns
                if col not in state.numerical_columns and col not in state.numerical_categorical_columns
            ]
            return df, state

        self._add_task(task)
        return self
    
    def __call__(self, df: pl.DataFrame) -> tuple[pl.DataFrame, DataPreparationPipelineState]:
        '''
        Processes the DataFrame
        :param df: pl.DataFrame to process
        :returns: tuple of processed DataFrame and last pipeline state 
        '''
        
        state = DataPreparationPipelineState(
            schema={
                k: map_to_np(v) for k, v in df.schema.items()
            }
        )
        for task in self.pipeline:
            df, state = task(df, state)
        return df, state

In [7]:
feature_extractor = DataPreparationPipeline() \
    .add_columns({
        'co_ci_diff': ((pl.col('srch_co') - pl.col('srch_ci')).dt.total_days(), 'int16'),
        'ci_dt_diff': ((pl.col('srch_ci') - pl.col('date_time')).dt.total_days(), 'int32'),
        'ci_weekday': (pl.col('srch_ci').dt.weekday(), 'uint8'),
        'co_weekday': (pl.col('srch_co').dt.weekday(), 'uint8'),
        'date_time_weekday': (pl.col('date_time').dt.weekday(), 'uint8')
    }) \
    .drop_columns(['orig_destination_distance', 'srch_ci', 'srch_co']) \
    .make_column_split(num_cat_threshold=250)

In [8]:
data, state = feature_extractor(df)

In [297]:
x = data.drop(TARGET)
y = data[TARGET]

In [298]:
from collections.abc import Callable

from numpy.typing import NDArray
from sklearn.base import BaseEstimator


Metric = Callable[[pl.DataFrame, pl.DataFrame, BaseEstimator], float]

In [299]:
from datetime import datetime, timedelta
from math import floor

import pandas as pd

from sklearn.base import BaseEstimator, clone
from tqdm.notebook import tqdm


def get_interval(start: datetime, end: datetime, dt_column: pl.Series) -> pl.Series:
    return (dt_column >= start) & (dt_column < end)

def handle_nans(df: pd.DataFrame) -> pd.DataFrame:
    df[df.isna()] = np.nan
    return df


def blocked_cross_validation(
    start: datetime,
    end: datetime,
    training_interval_len: timedelta,
    test_interval_len: timedelta,
    estimator: BaseEstimator,
    x: pl.DataFrame,
    y: pl.DataFrame,
    dt_column: pl.Series,
    metrics: dict[str, Metric]
) -> tuple[dict[str, list[float]], list[BaseEstimator]]:
    result_metrics = {key: [] for key in metrics.keys()}
    estimators = []
    n_intervals = floor((end - test_interval_len - start) / training_interval_len)
    for i in tqdm(range(n_intervals - 1)):
        est = clone(estimator)
        training_dt_interval = start + i * training_interval_len, start + (i + 1) * training_interval_len
        test_dt_interval = training_dt_interval[-1], training_dt_interval[-1] + test_interval_len
        training_interval = get_interval(*training_dt_interval, dt_column)
        test_interval = get_interval(*test_dt_interval, dt_column)
        est = est.fit(
            handle_nans(x.filter(training_interval).to_pandas()), 
            y.filter(training_interval).to_pandas()
        )
        x_test = x.filter(test_interval)
        y_test = y.filter(test_interval)

        for key, metric in metrics.items():
            result_metrics[key].append(metric(x_test, y_test, est))
        
        estimators.append(est)

    return result_metrics, estimators        

In [300]:
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler


cat_columns = state.categorical_columns.copy()
cat_columns.remove('is_booking')
train_data = x.drop('date_time')

numerical_transform = Pipeline([
    ('NumericalImputer', SimpleImputer(strategy='median', missing_values=np.nan)),
    ('Scaler', StandardScaler()),
])

cat_transform = Pipeline([
    ('CategoricalImputer', SimpleImputer(strategy='most_frequent')),
    ('OneHot', OneHotEncoder(handle_unknown='ignore'))
])

numerical_oh_transform = Pipeline([
    ('Imputer', SimpleImputer(strategy='most_frequent')),
    ('OneHot', OneHotEncoder(handle_unknown='ignore'))
])


base_pipeline = Pipeline([
    (
        'TransformingColumns',
        ColumnTransformer([
            ('Numerical', numerical_transform, numerical_columns),
            ('Categorical', cat_transform, str_columns),
            ('NumCategorical', numerical_oh_transform, one_hot_numerical)
        ])
    ),
    ('Logreg', LogisticRegression(max_iter=1_000))
])

In [301]:
from collections import defaultdict


cluster_country = x.select('hotel_country', 'hotel_market', 'hotel_cluster')
unique_values = cluster_country.unique()
cluster_to_hotel_market = defaultdict(list)
cluster_to_hotel_country = defaultdict(list)

for row in unique_values.iter_rows():
    cluster_to_hotel_country[row[2]].append(row[0])
    cluster_to_hotel_market[row[2]].append(row[1])

hotel_data = {
    key: (np.array(cluster_to_hotel_market[key]), np.array(cluster_to_hotel_country[key]))
    for key in cluster_to_hotel_country.keys()
}

In [302]:
cluster_len = np.zeros((100,), dtype=np.uint16)
for idx, values in cluster_to_hotel_country.items():
    cluster_len[idx] = len(values)
max_cluster_len = cluster_len.max()

In [303]:
from collections.abc import Generator

from sklearn.metrics import accuracy_score


def accuracy(x_test: NDArray, y_test: NDArray, estimator: BaseEstimator) -> float:
    return accuracy_score(y_test, estimator.predict(x_test))


def iterate_batched(data: pl.DataFrame, batch_size: int) -> Generator[pl.DataFrame, None, None]:
    i = 0
    j = min(batch_size, len(data))
    yield data[i:j]
    while j < len(data):
        i += batch_size
        j = min(j + batch_size, len(data))
        yield data[i:j]


def sample_rows(row, columns: list[str], n: int) -> np.ndarray:
    country_idx = columns.index('hotel_country')
    market_idx = columns.index('hotel_market')
    cluster_idx = columns.index('hotel_cluster')
    cluster = row[cluster_idx]
    markets, countries = hotel_data[cluster]
    indices = np.random.choice(len(markets), size=n)
    row = np.array(row)[None, :].repeat(n + 1, 0)
    row[1:, market_idx] = markets[indices]
    row[1:, country_idx] = countries[indices]
    return row


def batch_mrr(x_test: pl.DataFrame, estimator: BaseEstimator, n: int = 5) -> float:
    rows = np.row_stack([
        sample_rows(row, x_test.columns, n)
        for row in x_test.iter_rows()
    ])
    rows = pl.DataFrame(
        data={
            col: rows[:, idx].astype(map_to_np(x_test[col].dtype)) for idx, col in enumerate(x_test.columns)
        },
        schema=x_test.schema
    )
    predictions = estimator.predict_proba(rows.to_pandas())[:, 1]
    predictions = predictions.reshape(len(x_test), n + 1)
    real_values = predictions[:, 0]
    predictions = np.sort(predictions, axis=1)[:, ::-1]
    indices = (predictions == real_values.reshape(-1, 1)).argmax(axis=1)
    return np.sum(1 / (1 + indices))


def mrr(x_test: pl.DataFrame, y_test: pl.DataFrame, 
        estimator: BaseEstimator, n: int = 5, batch_size: int = 1024) -> float:
    x_test = x_test.filter(y_test)
    mrr_ = 0
    for batch in iterate_batched(x_test, batch_size):
        mrr_ += batch_mrr(batch, estimator, n)
    return mrr_ / len(x_test)


In [306]:
from functools import partial


metrics, estimators = blocked_cross_validation(
    x['date_time'].min(),
    x['date_time'].max(),
    timedelta(days=1),
    timedelta(days=1),
    base_pipeline,
    x=x.drop('date_time'),
    y=y,
    dt_column=x['date_time'],
    metrics={
        'mrr': partial(mrr, n=20, batch_size=1024)
    }
)

  0%|          | 0/721 [00:00<?, ?it/s]

['site_name', 'posa_continent', 'user_location_country', 'user_location_region', 'user_location_city', 'user_id', 'is_mobile', 'is_package', 'channel', 'srch_adults_cnt', 'srch_children_cnt', 'srch_rm_cnt', 'srch_destination_id', 'srch_destination_type_id', 'cnt', 'hotel_continent', 'hotel_country', 'hotel_market', 'hotel_cluster', 'co_ci_diff', 'ci_dt_diff', 'ci_weekday', 'co_weekday', 'date_time_weekday']
OrderedDict([('site_name', UInt8), ('posa_continent', UInt8), ('user_location_country', UInt8), ('user_location_region', UInt16), ('user_location_city', UInt16), ('user_id', UInt32), ('is_mobile', Boolean), ('is_package', Boolean), ('channel', UInt8), ('srch_adults_cnt', UInt8), ('srch_children_cnt', UInt8), ('srch_rm_cnt', UInt8), ('srch_destination_id', UInt16), ('srch_destination_type_id', UInt8), ('cnt', UInt16), ('hotel_continent', UInt8), ('hotel_country', UInt8), ('hotel_market', UInt16), ('hotel_cluster', UInt8), ('co_ci_diff', Int16), ('ci_dt_diff', Int32), ('ci_weekday', U

KeyboardInterrupt: 

In [275]:
x

date_time,site_name,posa_continent,user_location_country,user_location_region,user_location_city,user_id,is_mobile,is_package,channel,srch_adults_cnt,srch_children_cnt,srch_rm_cnt,srch_destination_id,srch_destination_type_id,cnt,hotel_continent,hotel_country,hotel_market,hotel_cluster,co_ci_diff,ci_dt_diff,ci_weekday,co_weekday,date_time_weekday
datetime[μs],u8,u8,u8,u16,u16,u32,bool,bool,u8,u8,u8,u8,u16,u8,u16,u8,u8,u16,u8,i16,i32,u8,u8,u8
2014-08-11 07:46:59,2,3,66,348,48862,12,false,true,9,2,0,1,8250,1,3,2,50,628,1,4,15,3,7,1
2014-08-11 08:22:12,2,3,66,348,48862,12,false,true,9,2,0,1,8250,1,1,2,50,628,1,4,17,5,2,1
2014-08-11 08:24:33,2,3,66,348,48862,12,false,false,9,2,0,1,8250,1,1,2,50,628,1,4,17,5,2,1
2014-08-09 18:05:16,2,3,66,442,35390,93,false,false,3,2,0,1,14984,1,1,2,50,1457,80,5,105,7,5,6
2014-08-09 18:08:18,2,3,66,442,35390,93,false,false,3,2,0,1,14984,1,1,2,50,1457,21,5,105,7,5,6
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
2014-09-02 08:08:28,2,3,66,174,26232,1198182,false,true,2,2,0,1,8855,1,1,2,50,213,26,6,74,7,6,2
2014-09-08 14:52:52,2,3,66,174,26232,1198182,false,false,0,1,0,1,8281,1,1,2,50,663,9,5,41,1,6,1
2014-09-15 06:56:51,2,3,66,174,26232,1198182,false,false,0,1,0,1,5620,3,1,2,50,663,94,1,0,1,2,1
2014-09-18 08:49:33,2,3,66,462,12565,1198182,false,false,0,1,0,1,18811,1,1,2,50,592,42,1,0,4,5,4
