In [None]:
!pip install polars

In [2]:
import polars as pl

train = pl.read_parquet('../input/otto-train-and-test-data-for-local-validation/train.parquet')
validA = pl.read_parquet('../input/otto-train-and-test-data-for-local-validation/test.parquet')
validB = pl.read_parquet('../input/otto-train-and-test-data-for-local-validation/test_labels.parquet')

In [None]:
id2type = ['clicks', 'carts', 'orders']
type2id = {'clicks': 0, 'carts': 1, 'orders': 2}

In [3]:
import numpy as np

# Get subset of data 
fraction_of_sessions = 0.2

train_sessions = train['session'].sample(fraction=fraction_of_sessions, seed=42)
train = train.filter(pl.col("session").is_in(train_sessions))
train = train.sort("session")

validation_sessions = validA['session'].sample(fraction=fraction_of_sessions, seed=42)
validA = validA.filter(pl.col("session").is_in(validation_sessions))
validA = validA.sort("session")

validB = validB.filter(pl.col("session").is_in(validation_sessions))
validB = validB.sort("session")

print(train.shape[0], validA.shape[0], validB.shape[0])

print(train, validA, validB)

152268691 5763556 1007030
shape: (152_268_691, 4)
┌──────────┬─────────┬────────────┬──────┐
│ session  ┆ aid     ┆ ts         ┆ type │
│ ---      ┆ ---     ┆ ---        ┆ ---  │
│ i32      ┆ i32     ┆ i32        ┆ u8   │
╞══════════╪═════════╪════════════╪══════╡
│ 0        ┆ 1517085 ┆ 1659304800 ┆ 0    │
│ 0        ┆ 1563459 ┆ 1659304904 ┆ 0    │
│ 0        ┆ 1309446 ┆ 1659367439 ┆ 0    │
│ 0        ┆ 16246   ┆ 1659367719 ┆ 0    │
│ …        ┆ …       ┆ …          ┆ …    │
│ 11098496 ┆ 219035  ┆ 1661119183 ┆ 0    │
│ 11098507 ┆ 1195266 ┆ 1661119189 ┆ 0    │
│ 11098512 ┆ 8664    ┆ 1661119192 ┆ 0    │
│ 11098522 ┆ 1524949 ┆ 1661119197 ┆ 0    │
└──────────┴─────────┴────────────┴──────┘ shape: (5_763_556, 4)
┌──────────┬─────────┬────────────┬──────┐
│ session  ┆ aid     ┆ ts         ┆ type │
│ ---      ┆ ---     ┆ ---        ┆ ---  │
│ i32      ┆ i32     ┆ i32        ┆ u8   │
╞══════════╪═════════╪════════════╪══════╡
│ 11098529 ┆ 1105029 ┆ 1661119200 ┆ 0    │
│ 11098531 ┆ 452188  ┆ 16

In [None]:
# Simple candidate generator 
def generate_candidates_simple(df, dict_format=False):
    df = df.groupby(['session', 'aid']).agg(pl.count())
    df = df.sort('session', 'count', descending=[False, True]).groupby('session').head(50)
    if dict_format:
        df = df.groupby('session').agg(pl.col("aid"))
        result = {}
        for session, aid_list in zip(df['session'].to_list(), df['aid'].to_list()):
            result[session] = aid_list 
        return result
    
    return df.select('session', 'aid').sort(['session', 'aid'])

In [None]:
type_weight = {0: 1, 1: 6, 2: 3}

# Memory Issues when merging
def generate_covisit(df, event_type):
    if event_type == 0:
        df = train
        df = df.sort(['session', 'ts'], descending=[False, True])
        # get most recent events for each session
        df = df.groupby('session').tail(30)
        # find pairs of items that were accessed within a day of each other
        df = df.join(df, on='session', suffix="_next")
        df = df.filter((pl.col("aid") != pl.col("aid_next")) & 
                       (pl.col("type") != pl.col("type_next")) &
                       (pl.col("ts") != pl.col("ts_next")))
        df = df.with_columns(((pl.col("ts_next") - pl.col("ts")) / (24 * 60 * 60 * 1000)).alias("days_elapsed"))
        df = df.filter((pl.col("days_elapsed") >= 0) & (pl.col("days_elapsed") <= 1))
        df = df.with_columns(pl.col("type").map_dict(type_weight).alias("weight"))
        df = df.groupby(['aid', 'aid_next']).agg(pl.sum("weight"))
        df = df.sort(['aid', 'aid_next', 'weight'], descending=[False, False, True]).groupby('aid').head(40).select(['aid', 'aid_next'])
        df = df.groupby('aid').agg(pl.col("aid_next"))
        
        covisit = {}
        for aid, aid_list in zip(df["aid"], df["aid_next"]):
            covisit[aid] = aid_list
            
        return covisit
    if event_type == 1:
        return 
    if event_type == 2:
        return 

def generate_candidate(df, event_type):
    if event_type == 0:
        aids = df["aid"].to_list()
        types = df["type"].to_list()
        unique_aids = dict(list(aids[::-1]))
        
        if len(unique_aids) >= 20:
            weights=np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
            aids_temp=defaultdict(lambda: 0)
            for aid,w,t in zip(AIDs,weights,types): 
                aids_temp[aid]+= w * type_weight_multipliers[t]

            sorted_aids=[k for k, v in sorted(aids_temp.items(), key=lambda item: -item[1])]
            labels.append(sorted_aids[:20])

    else:
        return
    
    generate_covisit(df, 0)
    
# def most_common_values(df):
#     def most_common_values_(l):
#         values, count = zip(*Counter(l).most_common(50))
#         return values
    
#     top_clicks = most_common_values_(df.filter(pl.col("type") == 0)).to_list()
#     top_add2carts = most_common_values_(df.filter(pl.col("type") == 1)).to_list()
#     top_buys = most_common_values_(df.filter(pl.col("type") == 2)).to_list()
    
#     return (top_clicks, top_add2carts, top_buys)



In [8]:
from collections import Counter
type_weight = {0: 1, 1: 6, 2: 3}
def generate_candidates(df, event_type, covisit):
    if event_type == 0:
        aids = df["aid"].to_list()
        types = df["type"].to_list()
        unique_aids = list(dict.fromkeys(aids[::-1]))
        
        time_weights = np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
        aids_counter = {}
        for aid, w, t in zip(aids, time_weights, types):
            aids_counter[aid] = aids_counter.get(aid, 0) + w * type_weight[t]
            
        aids_counter_sorted = sorted(aids_counter.items(), key=lambda x: x[1])
        candidates = [k for k, v in aids_counter_sorted]
        
        if len(candidates) <= 20:
            secondary_candidates_counter = Counter()
            for candidate in candidates:
                secondary_candidates_counter.update(covisit.get(aid, []))  
            secondary_candidates = [k for k, v in secondary_candidates_counter.most_common(40)]
            return candidates[:40] + secondary_candidates[:(40 - len(candidates))]

        return candidates[:40]
    return

In [5]:
import pandas as pd

DISK_PIECES = 4

# Improved speed for 2X using polars. 
def pqt_to_dict(path):
    return pl.read_parquet(path).groupby('aid_x').agg(pl.col('aid_y')).to_pandas().set_index('aid_x').aid_y.apply(list).to_dict()

# LOAD THREE CO-VISITATION MATRICES
covisit = pqt_to_dict(f'/kaggle/input/otto-covisitation-matrix-parquet-files/top_20_clicks_v7_0.pqt')

for k in range(1,DISK_PIECES): 
    covisit.update(pd.read_parquet(f'/kaggle/input/otto-covisitation-matrix-parquet-files/top_20_clicks_v7_{k}.pqt') ) 

session,aid
i64,i64
11098529,1105029
11098531,624163
11098531,1553691
11098531,1449555
11098531,1309633
11098531,1239060
11098531,1365569
11098531,1557766
11098531,1271998
11098531,1728212


# Re-Ranking


In [None]:
# Generate Session Features 
def generate_session_features(train_df_user):
    df_session_grouped = train_df_user.sort('session', 'ts', descending=[False, True]).groupby('session')
    df_session_grouped_clicks = train_df_user.filter(train_df_user['type'] == 0).sort('session', 'ts', descending=[False, True]).groupby('session')
    df_session_grouped_carts = train_df_user.filter(train_df_user['type'] == 1).sort('session', 'ts', descending=[False, True]).groupby('session')
    df_session_grouped_orders = train_df_user.filter(train_df_user['type'] == 2).sort('session', 'ts', descending=[False, True]).groupby('session')

    # last event type and aid
    last_event_type = df_session_grouped.agg(last_type=pl.col('type').first())

    # length of session
    session_length = df_session_grouped.agg(session_length=pl.col('ts').first() - pl.col('ts').last())

    # duplication rate

    # number of clicks, carts, orders, events that session
    clicks_ratio = df_session_grouped_clicks.agg(clicks_ratio=pl.col('type').count())
    carts_ratio = df_session_grouped_carts.agg(carts_ratio=pl.col('type').count())
    orders_ratio = df_session_grouped_orders.agg(orders_ratio=pl.col('type').count())
    events_ratio = df_session_grouped.agg(events_ratio=pl.col('type').count())

    return [last_event_type, session_length, events_ratio, clicks_ratio, carts_ratio, orders_ratio]

In [None]:
# AID FEATURES
train_df_item = pl.concat([train, validA])

# total interaction for aid
interaction_counts = train_df_item['aid'].value_counts().sort(by='counts').rename({"counts":"interaction_count"})

# click for aid
click_counts = train_df_item.filter(train_df_item['type'] == 0).groupby('aid').agg(click_count=pl.count('type'))

# cart for aid
cart_counts = train_df_item.filter(train_df_item['type'] == 1).groupby('aid').agg(cart_count=pl.count('type'))

# buy for aid
buy_counts = train_df_item.filter(train_df_item['type'] == 2).groupby('aid').agg(buy_count=pl.count('type'))

# last ts for aid
last_ts = train_df_item.groupby('aid').agg(last_ts=pl.col('ts').max())

# click, cart, buy ratios
global_events_ratio = click_counts.join(cart_counts, on='aid').join(buy_counts, on='aid')
global_events_ratio = global_events_ratio.with_columns(click2cart = pl.col('click_count') / pl.col('cart_count'))
global_events_ratio = global_events_ratio.with_columns(cart2buy = pl.col('cart_count') / pl.col('buy_count'))
global_events_ratio = global_events_ratio.with_columns(click2buy = pl.col('click_count') / pl.col('buy_count'))
global_events_ratio = global_events_ratio.drop('click_count').drop('cart_count').drop('buy_count')

# interaction rate in last 7 days
last_ts_int = train_df_item['ts'].max()
last_7_days_count = train_df_item.filter(train_df_item['ts'] >= last_ts_int - (7 * 24 * 60 * 60))['aid'].value_counts().sort(by='counts').rename({"counts":"last7days_count"})
merged = interaction_counts.join(last_7_days_count, on='aid')
merged = merged.with_columns(last7days_interaction_rate = pl.col('last7days_count') / pl.col('interaction_count'))
last7days_interaction_count = merged.drop('interaction_count').drop('last7days_count')

# inclusion rate in all sessions
# sessions_with_aid / total_sessions
total_sessions = train_df_item['aid'].n_unique()
unique_sessions = train_df_item.groupby('aid').agg(unique_sessions=pl.n_unique('session')).sort(by='unique_sessions')
inclusion_rate = unique_sessions.with_columns(inclusion_rate = pl.col('unique_sessions') * 1000 / total_sessions).drop('unique_sessions')

# average interactions per hour over all sessions
num_hours = train_df_item.groupby('aid').agg(num_hours = (pl.max('ts') - pl.min('ts')) / (60 * 60)).sort(by='num_hours')
num_hours = num_hours.with_columns(num_hours = pl.when(num_hours['num_hours']==0).then(1).otherwise(num_hours['num_hours']))
merged = num_hours.join(interaction_counts, on='aid')
average_interactions_ph = merged.with_columns(average_interactions_ph = pl.col('interaction_count') / pl.col('num_hours'))
average_interactions_ph = average_interactions_ph.drop('interaction_count').drop('num_hours')

# average num clicks before buy
# this bit too hard
xd = train_df_item.sort(by=['session','ts'])
aid_features = [interaction_counts, click_counts, buy_counts, last_ts, global_events_ratio, last7days_interaction_count, inclusion_rate, average_interactions_ph]

In [None]:
# add candidates + features to df

# # Using simple candidate generator
# candidates_df = generate_candidates_simple(pl.concat([train, validA]))
# train_df = validA.unique(subset=['session'])
# train_df = candidates_df.join(train_df, on="session", how="inner").select(['session', 'aid'])

# Using advanced candidate generator
train_df = validA.to_pandas()
train_df = train_df.sort_values(['session', 'ts'], ascending=[True, False]).groupby('session').apply(lambda x: generate_candidates(x, 0, covisit)).reset_index()
train_df = pl.from_pandas(train_df)
train_df.columns = ['session', 'aid']
train_df = train_df.explode('aid')

session_features = generate_session_features(validA)
for session in session_features:
    train_df = train_df.join(session, on='session', how='left')

for aid in aid_features:
    train_df = train_df.join(aid, on='aid', how='left')

In [None]:
# add gts
type2id = {"clicks": 0, "carts": 1, "orders": 2}
validB_long = validB.explode("ground_truth").select([
    pl.col("session").cast(pl.datatypes.Int32),
    pl.col("ground_truth").cast(pl.datatypes.Int32).alias("aid"),
    pl.col("type").map_dict(type2id).cast(pl.datatypes.Int8),
]).with_columns(pl.lit(1).alias('gt'))

validB_click = validB_long.filter(pl.col("type") == 0)
validB_cart = validB_long.filter(pl.col("type") == 1)
validB_order = validB_long.filter(pl.col("type") == 2)

train_df_click = train_df.join(validB_click, on=["session", "aid"], how="left").with_columns(pl.all().fill_null(0)).drop("type")
train_df_cart = train_df.join(validB_cart, on=["session", "aid"], how="left").with_columns(pl.all().fill_null(0)).drop("type")
train_df_order = train_df.join(validB_order, on=["session", "aid"], how="left").with_columns(pl.all().fill_null(0)).drop("type")

print(train_df_click, train_df_cart, train_df_order)

# Load Test Data

In [None]:
# Get test data
import numpy as np
import pandas as pd

from pathlib import Path

data_path = Path('/kaggle/input/recsys-dataset/')

test_sessions = pd.DataFrame()
chunks = pd.read_json(data_path / 'otto-recsys-test.jsonl', lines=True, chunksize=100_000)

for e, chunk in enumerate(chunks):
    event_dict = {
        'session': [],
        'aid': [],
        'ts': [],
        'type': [],
    }
    if e < 2:
        for session, events in zip(chunk['session'].tolist(), chunk['events'].tolist()):
            for event in events:
                event_dict['session'].append(session)
                event_dict['aid'].append(event['aid'])
                event_dict['ts'].append(event['ts'])
                event_dict['type'].append(event['type'])
        chunk_session = pd.DataFrame(event_dict)
        test_sessions = pd.concat([test_sessions, chunk_session])
    else:
        break
        

test_sessions = pl.from_pandas(test_sessions.reset_index(drop=True))
test_sessions = test_sessions.groupby('session').agg(pl.all()).sort(by='session')

In [None]:
# Split test data into testA (session up to certain point) and testB (prediction)
dictA = {'session': [], 'aid': [], 'ts': [], 'type': []}
dictB = {'session': [], 'aid': [], 'ts': [], 'type': []}

for row in test_sessions.iter_rows():
    split_idx = np.random.randint(1,len(row[1]))
    dictA['session'].append(row[0])
    dictA['aid'].append(row[1][:split_idx])
    dictA['ts'].append(row[2][:split_idx])
    dictA['type'].append(row[3][:split_idx])

    dictB['session'].append(row[0])
    dictB['aid'].append(row[1][split_idx:])
    dictB['ts'].append(row[2][split_idx:])
    dictB['type'].append(row[3][split_idx:])
    
testA = pl.DataFrame(data=dictA).explode(['aid', 'ts', 'type'])
testB = pl.DataFrame(data=dictB)

In [None]:
# Generate test candidates
test_df = generate_candidates_simple(testA).select([pl.all().cast(pl.datatypes.Int32)])    
session_features = generate_session_features(testA.select([
    pl.col('session').cast(pl.datatypes.Int32),
    pl.col('aid').cast(pl.datatypes.Int32),
    pl.col('ts').apply(lambda x: x / 1000).cast(pl.datatypes.Int32),
    pl.col("type").map_dict(type2id).cast(pl.datatypes.Int8)
]))

for session in session_features:
    test_df = test_df.join(session, on='session', how='left')

for aid in aid_features:
    test_df = test_df.join(aid, on='aid', how='left')
    
test_df = test_df.with_columns(pl.all().fill_null(0))
print(test_df)

# Model Training

In [None]:
import pandas as pd
from sklearn.model_selection import GroupShuffleSplit

train_click = train_df_click.to_pandas()
train_cart = train_df_cart.to_pandas()
train_order = train_df_order.to_pandas()

In [None]:
# XG Boost with 5-fold CV
# import xgboost as xgb
# from sklearn.model_selection import GroupKFold

# features = data.columns[: -1]

# candidates = data
# skf = GroupKFold(n_splits=5)
# for fold,(train_idx, valid_idx) in enumerate(skf.split(candidates, candidates['gt'], groups=candidates['session'] )):

#     X_train = candidates.loc[train_idx, features]
#     y_train = candidates.loc[train_idx, 'gt']
#     X_valid = candidates.loc[valid_idx, features]
#     y_valid = candidates.loc[valid_idx, 'gt']

#     groups_train = X_train.groupby('session').size().to_frame('size')['size'].to_numpy()
#     groups_valid = X_valid.groupby('session').size().to_frame('size')['size'].to_numpy()
#     dtrain = xgb.DMatrix(X_train, y_train, group=groups_train) 
#     dvalid = xgb.DMatrix(X_valid, y_valid, group=groups_valid) 

#     xgb_parms = {'objective':'rank:pairwise', 'tree_method':'hist'}
#     model = xgb.train(xgb_parms, 
#         dtrain=dtrain,
#         evals=[(dtrain,'train'),(dvalid,'valid')],
#         num_boost_round=1000,
#         verbose_eval=100)

In [None]:
# Pairwise Methods
import xgboost as xgb
import lightgbm

# XGBoost
def train_XGB(data):
    X_train = data.iloc[:,:-1]
    y_train = data.iloc[:, -1]
    
    groups_train = X_train.groupby('session').size().to_frame('size')['size'].to_numpy()

    model = xgb.XGBRanker(  
        tree_method='hist',
        booster='gbtree',
        objective='rank:pairwise',
        random_state=42, 
        learning_rate=0.1,
        n_estimators=110
    )

    model.fit(
        X_train, 
        y_train, 
        group=groups_train,
        verbose=True
    )
    
    return model

# LightGBM
def train_LGBM(data):
    X_train = data.iloc[:,:-1]
    y_train = data.iloc[:, -1]
    
    groups_train = X_train.groupby('session').size().to_frame('size')['size'].to_numpy()
    
    model = lightgbm.LGBMRanker(
        objective="lambdarank",
        boosting_type = "gbdt",
        n_estimators = 5,
        importance_type = "gain",
        metric= "ndcg",
        num_leaves = 10,
        learning_rate = 0.05,
        max_depth = -1
    )
    
    model.fit(
        X=X_train,
        y=y_train,
        group=groups_train
    )
    
    return model

In [None]:
def train_model(model_name):
    if model_name == 'XGB':
        return [train_XGB(train_click), train_XGB(train_cart), train_XGB(train_order)]
    elif model_name == 'LR':
        return [train_LR(train_click), train_LR(train_cart), train_LR(train_order)]
    elif model_name == 'LGBM':
        return [train_LGBM(train_click), train_LGBM(train_cart), train_LGBM(train_order)]
    else:
        return None

# Model Evaluation

In [None]:
actual_events = testB.explode(['aid', 'ts', 'type']).select([
    pl.col('session').cast(pl.datatypes.Int32),
    pl.col('aid').cast(pl.datatypes.Int32),
    pl.col('ts').apply(lambda x: x / 1000).cast(pl.datatypes.Int32),
    pl.col('type').map_dict(type2id).cast(pl.datatypes.Int8)
]).to_pandas()

In [None]:
# Use Kaggle evaluation metric
def kaggle_evaluate(name, preds):
    submission = preds
    valid = actual_events
    
    submission['session'] = submission.session_type.apply(lambda x: int(x.split('_')[0]))
    submission['type'] = submission.session_type.apply(lambda x: x.split('_')[1])
    submission.labels = submission.labels.apply(lambda x: [int(i) for i in x.split(' ')][:20])

    valid.type = valid.type.map(lambda idx: id2type[idx])
    ground_truth = valid.groupby(['session', 'type'])['aid'].apply(list)
    ground_truth = ground_truth.reset_index().rename(columns={'aid': 'labels'})
    ground_truth.loc[ground_truth.type == 'clicks', 'labels'] = ground_truth.loc[ground_truth.type == 'clicks', 'labels'].str[:1]

    submission_with_gt = submission.merge(ground_truth[['session', 'type', 'labels']], how='left', on=['session', 'type'])
    submission_with_gt = submission_with_gt[~submission_with_gt.labels_y.isna()]
    submission_with_gt['hits'] = submission_with_gt.apply(lambda df: len(set(df.labels_x).intersection(set(df.labels_y))), axis=1)
    submission_with_gt['gt_count'] = submission_with_gt.labels_y.str.len().clip(0,20)

    recall_per_type = submission_with_gt.groupby(['type'])['hits'].sum() / submission_with_gt.groupby(['type'])['gt_count'].sum() 
    local_validation_score = (recall_per_type * pd.Series({'clicks': 0.10, 'carts': 0.30, 'orders': 0.60})).sum()
    print(f'{name}: {local_validation_score}')

In [None]:
import pandas as pd

def get_in_correct_format(pred_df):
    click_preds = pred_df.groupby('session').agg(pl.col('aid').sort_by('click_scores', descending=True).slice(0,20))
    cart_preds = pred_df.groupby('session').agg(pl.col('aid').sort_by('cart_scores', descending=True).slice(0,20))
    order_preds = pred_df.groupby('session').agg(pl.col('aid').sort_by('order_scores', descending=True).slice(0,20))

    click_new = click_preds.with_columns(
        (pl.col('session').cast(pl.Utf8) + '_clicks').alias('session_type'), 
        pl.lit('clicks').alias('type'),
        pl.col('aid').cast(pl.List(pl.Utf8))
    ).rename({'aid':'labels'})
    cart_new = cart_preds.with_columns(
        (pl.col('session').cast(pl.Utf8) + '_carts').alias('session_type'), 
        pl.lit('carts').alias('type'),
        pl.col('aid').cast(pl.List(pl.Utf8))
    ).rename({'aid':'labels'})
    order_new = order_preds.with_columns(
        (pl.col('session').cast(pl.Utf8) + '_orders').alias('session_type'), 
        pl.lit('orders').alias('type'),
        pl.col('aid').cast(pl.List(pl.Utf8))
    ).rename({'aid':'labels'})

    preds = pl.concat([click_new, cart_new, order_new]).to_pandas()
    preds['labels'] = preds['labels'].apply(lambda x: ' '.join(x))
    return preds

In [None]:
def evaluate_model(name, model):
    model_click = model[0]
    model_cart = model[1]
    model_order = model[2]
    
    click_scores = model_click.predict(test_df.to_pandas())
    cart_scores = model_cart.predict(test_df.to_pandas())
    order_scores = model_order.predict(test_df.to_pandas())

    pred_df = test_df.with_columns(click_scores = pl.lit(click_scores), cart_scores = pl.lit(cart_scores), order_scores = pl.lit(order_scores))
    # print(pred_df)
    
    preds = get_in_correct_format(pred_df)
    
    kaggle_evaluate(name, preds)

# Train Models and Evaluate

In [None]:
models = {'XGB' : train_model('XGB'), 'LGBM' : train_model('LGBM')}

for name, model in models:
    evaluate_model(name, model)

# model_click, model_cart, model_order = train_model('LR')