In [1]:
import gc
import glob
import pandas
import numpy as np

from math import ceil

import lightgbm as lgb
from xgboost import XGBRanker
from lightgbm.sklearn import LGBMRanker

from sklearn.model_selection import train_test_split

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        #print(os.path.join(dirname, filename))
        pass

try: import cudf; GPU = True
except: GPU = False
    
from tqdm import tqdm
tqdm.pandas() # progress_map, progress_apply ...

# * Полезные функции *

In [2]:
def groupType(x):
    res = list(set(x))
    return sum(res) if len(res) > 0 else -1
# -----------------------------------------------------------
def groupLen(x):
    return len(list(set(x)))
# -----------------------------------------------------------
def fillEvents(df, clone=True, value=[-1]):
    result = pd.DataFrame()
    _ = df[['session','type']].groupby(['session']).agg(ctype = ('type', groupType), count = ('type', groupLen))
    
    tmp = df[df['session'].isin(_[_['ctype'] == 0].index)].copy()
    if not clone: tmp['aid'] = tmp['aid'].apply(lambda x: value)
    tmp['type'] = 1
    result = pd.concat([result, tmp])
    tmp['type'] = 2
    result = pd.concat([result, tmp])

    tmp = df[df['session'].isin(_[_['ctype'] == 1].index)].copy().drop_duplicates(['session'])
    if not clone: tmp['aid'] = tmp['aid'].apply(lambda x: value)
    tmp['type'] = 2
    result = pd.concat([result, tmp])

    tmp = df[df['session'].isin(_[_['ctype'] == 2].index)].copy().drop_duplicates(['session'])
    if not clone: tmp['aid'] = tmp['aid'].apply(lambda x: value)
    tmp['type'] = 1
    result = pd.concat([result, tmp])
    
    tmp = df[df['session'].isin(_[(_['ctype'].isin([1, 2]))&(_['count'] == 1)].index)].copy()
    if not clone: tmp['aid'] = tmp['aid'].apply(lambda x: value)
    tmp['type'] = 0
    result = pd.concat([result, tmp])

    tmp = df[df['session'].isin(_[(_['ctype'] == 3)&(_['count'] == 2)].index)].copy()\
                .drop_duplicates(['session'])
    if not clone: tmp['aid'].apply(lambda x: [])
    tmp['type'] = 0
    result = pd.concat([result, tmp])

    return pd.concat([df, result]).astype({'session':'uint32', 'type':'uint8'})
# -----------------------------------------------------------
def cloneEvents(df):
    df['type'] = 0
    _ = df.copy()
    _['type'] = 1
    df = pd.concat([df, _])
    _['type'] = 2
    return pd.concat([df, _]).astype({'type': 'uint8'})

In [3]:
# ----------------------------------------------
def removeFiles(mask='*'):
    for file in glob.glob(mask):
        print('remove', '-'*10, file)
        os.remove(file)
# ----------------------------------------------
def explodeLabels(*, df = None, path = None, rename=True):
    if path is not None: df = pd.read_parquet(path)
    if type(df.head(1)['labels'][0]) is str:
        if GPU: df = df.to_pandas()
        df['labels'] = df['labels'].progress_apply(lambda x: [np.uint32(i) for i in x.split(' ')])
        if GPU: df = pd.from_pandas(df)
    df = df.explode('labels').astype({'labels': 'uint32'})
    if rename: df = df.rename(columns={'labels':'aid'})
    return df.reset_index(drop=True)
# ----------------------------------------------
def googleDownload(file_id, filename=None):
    import gdown
    import shutil
    gdown.download(f'https://drive.google.com/u/0/uc?id={file_id}', filename, quiet=False)
    if filename.find('.zip') > -1:
        shutil.unpack_archive(filename, format='zip')
        os.remove(filename)
# ----------------------------------------------
def getHistoryFromCandidates(df):
    df['n'] = df.groupby('session').cumcount()
    df = df.loc[df['n'] < df['hist']].drop('n', axis=1)
    return df
# ------------------------------------------------------
def loadTestLabels(path='/kaggle/input/otto-train-and-test-data-for-local-validation/', p=1.0):
    df = pd.read_parquet(f'{path}test_labels.parquet')
    #if GPU: df = df.to_pandas()
    #df = df.groupby(['session', 'type']).agg(labels=('aid', lambda x: list(dict.fromkeys(x))[:-20]))
    #if GPU: df = pd.from_pandas(df)
    #df = df.reset_index()
    df['type'] = df['type'].map({'clicks':0, 'carts':1, 'orders':2}).astype('uint8')
    df = df.rename(columns={'ground_truth':'labels'})
    if p < 1: df['labels'] = df['labels'].progress_apply(lambda x: x[-ceil(len(x) * p):])
    return df
# ------------------------------------------------------
def localMetrics(sub, df_true=None, load=loadTestLabels, *, drop=False):
    # -- load ground truth
    if df_true is None: df_true=load()
    
    # -- calculate metrics
    test_labels = pd.merge(df_true, sub, how='left', on=['session', 'type'])
    if drop: test_labels.dropna(inplace=True)
    else: test_labels.fillna('', inplace=True)

    if GPU: test_labels     = test_labels.to_pandas()
    test_labels['hits']     = test_labels.apply(lambda df: len(set(df['labels_x']).intersection(set(df['labels_y']))), axis=1)
    if GPU: test_labels     = pd.from_pandas(test_labels)
    test_labels['gt_count'] = test_labels['labels_x'].str.len().clip(0,20)  
    
    recall_per_type = test_labels.groupby(['type'])['hits'].sum() / test_labels.groupby(['type'])['gt_count'].sum() 

    score = (recall_per_type * pd.Series({0: 0.10, 1: 0.30, 2: 0.60})).fillna(0)

    print(f"{score.sum():.3f} = {score[0]:.3f} + {score[1]:.3f} + {score[2]:.3f}")
    print('score:', score.sum())
    
    return test_labels
# ------------------------------------------------------
def get_groups(df, column='gt'):
    return df.groupby('session')[column].count().sort_index().values
# ----------------------------------------------
def splitDataFrame(df, *, p=0.9, t=None):
    if t is not None:
        train_cutoff = df['ts'].max() - t
        train = df[df['ts'] <= train_cutoff]
        test = df[df['ts'] > train_cutoff].copy()
    else:
        df['n'] = df.groupby('session').cumcount()
        group = df.groupby('session')['n']
        train_cutoff = (group.max()*p).rename('ts_agg').astype('uint8')
        train = pd.merge(df, train_cutoff, how='left', on='session')
        test = train[train['n'] > train['ts_agg']].drop(['n', 'ts_agg'], axis=1)
        train = train[train['n'] <= train['ts_agg']].drop(['n', 'ts_agg'], axis=1)
    return train, test
# ----------------------------------------------
def loadTrainData(path, *, p=1.0):
    if path.find('.csv') > -1:
        df = pd.read_csv(path)
    elif path.find('.parquet') > -1 or path.find('.pqt') > -1:
        df = pd.read_parquet(path)
    if p < 1: df = df.sample(n=int(len(df)*p))
    return df
# ----------------------------------------------
def setGrounTruth(df, labels):
    df = df.merge(labels, on=['session', 'aid', 'type'], how='left')
    df['gt'].fillna(0, inplace=True)
    return df.astype({'gt':'float32'})
# ----------------------------------------------
def prepareTest(*, df=None, path=None, fill=True, clone=True):
    if df is None: df = pd.read_parquet(path)
    if fill: df = fillEvents(df, clone)
    return df
# ------------------------------------------------------
def setSubmission(df):
    df = df.copy()
    df['type'] = df['type'].map({0:'clicks', 1:'orders', 2:'carts'})
    df['session_type'] = df['session'].astype('str') + '_' + df['type']
    if GPU: df = df.to_pandas()
    df['labels'] = df['labels'].fillna('').progress_apply(lambda x: ' '.join(str(i) for i in x))
    if GPU: df = pd.from_pandas(df)
    return df.drop(['session', 'type'], axis=1)
# ------------------------------------------------------
def loadTestDataset(*, path='/kaggle/input/otto-full-optimized-memory-footprint/test.parquet', p=0.1):
    df = pd.read_parquet(path)
    #__, df = splitDataFrame(df, t=7*24*60*60)
    train, test = splitDataFrame(df, p=1-p)
    return train, test
# ------------------------------------------------------
def getHistory(*, df=None, path=None, n=20):
    if path is not None: df = pd.read_parquet(path)
    df = df.sort_values('ts', ascending=True)
    df.drop_duplicates(['session', 'aid'], inplace=True)
    df['n'] = df.groupby(['session', 'type']).cumcount()
    df = df.loc[df['n']<20].drop('n', axis=1)
    if GPU: df = df.to_pandas()
    df = df.groupby(['session', 'type']).agg(labels=('aid', lambda x: list(dict.fromkeys(x))[:n]))
    if GPU: df = pd.from_pandas(df)
    return df.reset_index()
# ------------------------------------------------------
def prepareCandidates(path, *, p=1.0, t=0):
    df = pd.read_csv(path)
    if GPU: df = df.to_pandas()
    df['labels'] = df['labels'].progress_apply(lambda x: [np.uint32(i) for i in x.split(' ')])
    if GPU: df = pd.from_pandas(df)
    df['type'] = t
    return df.astype({'session':'uint32', 'type':'uint8'})

# * Фичи *

In [4]:
# =========================== Basic =================================
def fSession(df):
    return 'session', df
# -----------------------------------------------------
def fAid(df):
    return 'aid', df
# -----------------------------------------------------
def fType(df):
    return 'type', df
# -----------------------------------------------------
def fts(df):
    return 'ts', df

#	covisit_clicks_candidate_num		0.24848206202059703
#	covisit_buys_candidate_num 			0.009287900880354305
#	aid_interacted_with_count 			0.004570131040186471
#	relative_position_in_session 		0.00021918611998695227
#	popularity_0 						0.0
#	popularity_1 						0.0
#	popularity_2 						0.0
# -----------------------------------------------------

In [5]:
# =========================== Basic =================================
# == session features ===============================================
# -----------------------------------------------------
def session_length(df):
    # длина сессии по количеству событий
    if df is not None:
        gr = df.groupby('session')['session'].count().astype('uint16')
        df.set_index('session', inplace=True)
        df['session_length'] = gr
        df.reset_index(inplace=True)
    return 'session_length', df
# -----------------------------------------------------
def user_item_count(df):
    # со сколькми уникальными товарами взаимодействовал пользователь
    if df is not None:
        gr = df.groupby('session')['aid'].nunique().astype('uint16')
        df.set_index('session', inplace=True)
        df['user_item_count'] = gr
        df.reset_index(inplace=True)
    return 'user_item_count', df
# -----------------------------------------------------
def user_buy_ratio(df):
    # средний тип события пользователя  (type, mean)
    if df is not None:
        gr = df.groupby('session')['type'].mean().astype('float32')
        df.set_index('session', inplace=True)
        df['user_buy_ratio'] = gr
        df.reset_index(inplace=True)
    return 'user_buy_ratio', df
# -----------------------------------------------------
def user_events(df):
    # количество событий пользователя
    if df is not None:
        gr = df.groupby(['session', 'type'])['type'].count().rename('count').reset_index()
        df = df.merge(gr[gr['type'] == 0].rename(columns={'count':'user_clicks'})[['session', 'user_clicks']], how='left', on=['session'])
        df = df.merge(gr[gr['type'] == 1].rename(columns={'count':'user_carts'})[['session', 'user_carts']], how='left', on=['session'])
        df = df.merge(gr[gr['type'] == 2].rename(columns={'count':'user_orders'})[['session', 'user_orders']], how='left', on=['session'])
        df.fillna(0, inplace=True)
        df = df.astype({'user_clicks':'uint16', 'user_carts':'uint16', 'user_orders':'uint16'})
    return ['user_clicks', 'user_carts', 'user_orders'], df
# -----------------------------------------------------
def user_ratio(df):
    # соотношение кликов/покупок/корзин пользователя (type, mean)
    if df is not None:
        if not ('user_clicks' in df.columns and 'user_carts' in df.columns and 'user_orders' in df.columns):
            __, df = user_events(df)
        df['user_carts_clicks_ratio'] = df['user_carts'] / df['user_clicks'].astype('float32')
        df['user_orders_carts_ratio'] = df['user_orders'] / df['user_carts'].astype('float32')
        df['user_orders_clicks_ratio'] = df['user_orders'] / df['user_clicks'].astype('float32')
        df.fillna(0, inplace=True)
    return ['user_carts_clicks_ratio', 'user_orders_carts_ratio', 'user_orders_clicks_ratio'], df
# -----------------------------------------------------

In [6]:
# =========================== Basic =================================
# == aid features ===================================================
# -----------------------------------------------------
def clicks_user_counter(df):
    # в скольких сессия был кликнут товар
    if df is not None:
        if 'clicks_user_counter' in df.columns: df.drop('clicks_user_counter', axis=1, inplace=True)
        tmp = df[df['type']==0].groupby(['aid', 'type']).agg(clicks_user_counter=('session', 'nunique')).reset_index().drop('type', axis=1)
        df = df.merge(tmp, how='left', on='aid')
        df['clicks_user_counter'].fillna(0, inplace=True)
        df = df.astype({'clicks_user_counter':'uint16'})
    return 'clicks_user_counter', df
# ----------------------------------------------------- 
def carts_user_counter(df):
    # в скольких сессия был положен в корзину товар
    if df is not None:
        if 'carts_user_counter' in df.columns: df.drop('carts_user_counter', axis=1, inplace=True)
        tmp = df[df['type']==1].groupby(['aid', 'type']).agg(carts_user_counter=('session', 'nunique')).reset_index().drop('type', axis=1)
        df = df.merge(tmp, how='left', on='aid')
        df['carts_user_counter'].fillna(0, inplace=True)
        df = df.astype({'carts_user_counter':'uint16'})
    return 'carts_user_counter', df
# -----------------------------------------------------
def orders_user_counter(df):
    # в скольких сессия был куплен товар
    if df is not None:
        if 'orders_user_counter' in df.columns: df.drop('orders_user_counter', axis=1, inplace=True)
        tmp = df[df['type']==2].groupby(['aid', 'type']).agg(orders_user_counter=('session', 'nunique')).reset_index().drop('type', axis=1)
        df = df.merge(tmp, how='left', on='aid')
        df['orders_user_counter'].fillna(0, inplace=True)
        df = df.astype({'orders_user_counter':'uint16'})
    return 'orders_user_counter', df
# -----------------------------------------------------
def clicks_timed_global_counter(df):
    if df is not None:
        df['wgt'] = df.groupby('session').cumcount() + 1
        df['wgt'] = df['wgt'] / df['wgt'].max()
        gr = df[df['type']==0].groupby(['aid'])['wgt'].sum()
        df.set_index('aid', inplace=True)
        df['clicks_timed_global_counter'] = gr
        df.drop('wgt', axis=1, inplace=True)
        df.reset_index(inplace=True)
        df['clicks_timed_global_counter'].fillna(0, inplace=True)
        df['clicks_timed_global_counter'] = df['clicks_timed_global_counter'].astype('float32')
    return 'clicks_timed_global_counter', df
# -----------------------------------------------------
def carts_timed_global_counter(df):
    if df is not None:
        df['wgt'] = df.groupby('session').cumcount() + 1
        df['wgt'] = df['wgt'] / df['wgt'].max()
        gr = df[df['type']==1].groupby(['aid'])['wgt'].sum()
        df.set_index('aid', inplace=True)
        df['carts_timed_global_counter'] = gr
        df.drop('wgt', axis=1, inplace=True)
        df.reset_index(inplace=True)
        df['carts_timed_global_counter'].fillna(0, inplace=True)
        df['carts_timed_global_counter'] = df['carts_timed_global_counter'].astype('float32')
    return 'carts_timed_global_counter', df
# -----------------------------------------------------
def orders_timed_global_counter(df):
    if df is not None:
        df['wgt'] = df.groupby('session').cumcount() + 1
        df['wgt'] = df['wgt'] / df['wgt'].max()
        gr = df[df['type']==2].groupby(['aid'])['wgt'].sum()
        df.set_index('aid', inplace=True)
        df['orders_timed_global_counter'] = gr
        df.drop('wgt', axis=1, inplace=True)
        df.reset_index(inplace=True)
        df['orders_timed_global_counter'].fillna(0, inplace=True)
        df['orders_timed_global_counter'] = df['orders_timed_global_counter'].astype('float32')
    return 'orders_timed_global_counter', df
# -----------------------------------------------------
def aid_clicked_count(df):
    # сколько товар был кликнут
    if df is not None:
        tmp = df.loc[df['type']==0].groupby(['aid', 'type']).agg(aid_clicked_count=('aid','count')).reset_index().drop('type', axis=1)
        df = df.merge(tmp, how='left', on='aid')
        df['aid_clicked_count'] = df['aid_clicked_count'].fillna(0)
        df = df.astype({'aid_clicked_count':'uint16'})
    return 'aid_clicked_count', df
# -----------------------------------------------------
def aid_carted_count(df):
    # сколько товар был положен в корзину
    if df is not None:
        tmp = df.loc[df['type']==1].groupby(['aid', 'type']).agg(aid_carted_count=('aid','count')).reset_index().drop('type', axis=1)
        df = df.merge(tmp, how='left', on='aid')
        df['aid_carted_count'].fillna(0, inplace=True)
        df = df.astype({'aid_carted_count':'uint16'})
    return 'aid_carted_count', df
# -----------------------------------------------------
def aid_ordered_count(df):
    # сколько товар был куплен
    if df is not None:
        tmp = df.loc[df['type']==2].groupby(['aid', 'type']).agg(aid_ordered_count=('aid','count')).reset_index().drop('type', axis=1)
        df = df.merge(tmp, how='left', on='aid')
        df['aid_ordered_count'].fillna(0, inplace=True)
        df = df.astype({'aid_ordered_count':'uint16'})
    return 'aid_ordered_count', df
# -----------------------------------------------------
def item_item_count(df):
    # сколько раз товар встречался во всех сессиях
    if df is not None:
        gr = df.groupby('aid')['aid'].count().astype('uint16')
        df.set_index('aid', inplace=True)
        df['item_item_count'] = gr
        df.reset_index(inplace=True)
    return 'item_item_count', df
# -----------------------------------------------------
def item_user_count(df):
    # в скольких сессиях встречался товар
    if df is not None:
        gr = df.groupby('aid')['session'].nunique().astype('uint16')
        df.set_index('aid', inplace=True)
        df['item_user_count'] = gr
        df.reset_index(inplace=True)
    return 'item_user_count', df
# -----------------------------------------------------
def item_buy_ratio(df):
    # коэфициент покупки товара (type, mean)
    if df is not None:
        gr = df.groupby('aid')['type'].mean().astype('float32')
        df.set_index('aid', inplace=True)
        df['item_buy_ratio'] = gr
        df.reset_index(inplace=True)
    return 'item_buy_ratio', df

In [7]:
# =========================== Basic =================================
# == session aid features ===========================================
# -----------------------------------------------------
def this_aid_clicked_count(df):
    # сколько товар был кликнут в рамках сессии
    if df is not None:
        if 'this_aid_clicked_count' in df.columns: df.drop('this_aid_clicked_count', axis=1, inplace=True)
        tmp = df.loc[df['type']==0].groupby(['session', 'aid', 'type']).agg(this_aid_clicked_count=('ts', 'count')).reset_index().drop('type', axis=1)
        df = df.merge(tmp, how='left', on=['session', 'aid'])
        df['this_aid_clicked_count'].fillna(0, inplace=True)
        df = df.astype({'this_aid_clicked_count':'uint8'})
    return 'this_aid_clicked_count', df
# -----------------------------------------------------
def this_aid_carted_count(df):
    # сколько товар был положен в корзину в рамках сессии
    if df is not None:
        if 'this_aid_carted_count' in df.columns: df.drop('this_aid_carted_count', axis=1, inplace=True)
        tmp = df.loc[df['type']==0].groupby(['session', 'aid', 'type']).agg(this_aid_carted_count=('ts', 'count')).reset_index().drop('type', axis=1)
        df = df.merge(tmp, how='left', on=['session', 'aid'])
        df['this_aid_carted_count'].fillna(0, inplace=True)
        df = df.astype({'this_aid_carted_count':'uint8'})
    return 'this_aid_carted_count', df
# -----------------------------------------------------
def this_aid_ordered_count(df):
    # сколько товар был куплен в рамках сессии
    if df is not None:
        if 'this_aid_ordered_count' in df.columns: df.drop('this_aid_ordered_count', axis=1, inplace=True)
        tmp = df.loc[df['type']==0].groupby(['session', 'aid', 'type']).agg(this_aid_ordered_count=('ts', 'count')).reset_index().drop('type', axis=1)
        df = df.merge(tmp, how='left', on=['session', 'aid'])
        df['this_aid_ordered_count'].fillna(0, inplace=True)
        df = df.astype({'this_aid_ordered_count':'uint8'})
    return 'this_aid_ordered_count', df
# -----------------------------------------------------
def sec_to_session_end(df):
    # секунд до конца сессии
    if df is not None:
        gr = df.groupby('session')['ts'].max()
        df.set_index('session', inplace=True)
        df['ts_max'] = gr
        df['sec_to_session_end'] = df['ts_max'] - df['ts']
        df.drop('ts_max', axis=1, inplace=True)
        df.reset_index(inplace=True)
        df['sec_to_session_end'] = df['sec_to_session_end'].astype('uint32')
    return 'sec_to_session_end', df
# -----------------------------------------------------
def sec_since_session_start(df):
    # секунд с начала сессии
    if df is not None:
        gr = df.groupby('session')['ts'].min()
        df.set_index('session', inplace=True)
        df['ts_min'] = gr
        df['sec_since_session_start'] = df['ts'] - df['ts_min']
        df.drop('ts_min', axis=1, inplace=True)
        df.reset_index(inplace=True)
        df['sec_since_session_start'] = df['sec_since_session_start'].astype('uint32')
    return 'sec_since_session_start', df
# -----------------------------------------------------
def action_num_reverse_chrono(df):
    # реверсивный порядок событий
    if df is not None:
        df['action_num_reverse_chrono'] = df.groupby('session').cumcount(ascending=True).astype('uint16')
    return 'action_num_reverse_chrono', df
# -----------------------------------------------------
def log_recency_score(df):
    # логарифмический порядок событий 
    if df is not None:
        linear_interpolation = 0.1 + ((1-0.1) / (df['session_length']-1)) * (df['session_length']-df['action_num_reverse_chrono']-1)
        df['log_recency_score'] = (2 ** linear_interpolation - 1).fillna(1.0).astype('float32')
    return 'log_recency_score', df
# -----------------------------------------------------
def type_weighted_log_recency_score(df):
    # логарифмический порядок событий с весами по типам
    if df is not None:
        df['type_weighted_log_recency_score'] = (df['log_recency_score'] / df['type'].map({0:1, 1:6, 2:3})).astype('float32')
    return 'type_weighted_log_recency_score', df
# -----------------------------------------------------
def bought_in_session(df):
    # был ли товар куплен в рамках сессии
    if df is not None:
        if 'bought_in_session' in df.columns: df.drop('bought_in_session', axis=1, inplace=True)
        _ = df[df['type'] == 2][['session', 'aid']].drop_duplicates()
        _['bought_in_session'] = 1
        df = df.merge(_, how='left', on=['session', 'aid']).fillna(0).astype({'bought_in_session':'uint8'})
    return 'bought_in_session', df
# -----------------------------------------------------
def onehot_clicks(df):
    if df is not None:
        df['onehot_clicks'] = 0
        df.loc[df['type']==0, 'onehot_clicks'] = 1
        df = df.astype({'onehot_clicks':'uint8'})
    return 'onehot_clicks', df
# -----------------------------------------------------
def onehot_carts(df):
    if df is not None:
        df['onehot_carts'] = 0
        df.loc[df['type']==1, 'onehot_carts'] = 1
        df = df.astype({'onehot_carts':'uint8'})
    return 'onehot_carts', df
# -----------------------------------------------------
def onehot_orders(df):
    if df is not None:
        df['onehot_orders'] = 0
        df.loc[df['type']==2, 'onehot_orders'] = 1
        df = df.astype({'onehot_orders':'uint8'})
    return 'onehot_orders', df

# * Пайплайн *

In [8]:
def callPipeline(df, pipeline):
    ft = []
    if type(pipeline) is list:
        for func in pipeline:
            result, df = func(df)
            if result is not None:
                ft.extend(result if type(result) is list else [result])
        return ft, df
    else: return pipeline(df)
# -----------------------------------------------------   
def Example(df):
    pipeline = [fSession, fAid]
    ft, df = runPipeline(df, pipeline)
    return ft, df
# -----------------------------------------------------
def sessionPipline(df):
    pipeline = [session_length, user_item_count, user_buy_ratio, user_events, user_ratio]
    ft, df = callPipeline(df, pipeline)
    return ft, df
# -----------------------------------------------------
def aidPipline(df):
    pipeline = [clicks_user_counter, carts_user_counter, orders_user_counter, clicks_timed_global_counter, carts_timed_global_counter, 
                orders_timed_global_counter, aid_clicked_count, aid_carted_count, aid_ordered_count, item_item_count, item_user_count, item_buy_ratio]
    ft, df = callPipeline(df, pipeline)
    return ft, df
# -----------------------------------------------------
def session_aidPipline(df):
    pipeline = [this_aid_clicked_count, this_aid_carted_count, this_aid_ordered_count, sec_to_session_end, sec_since_session_start, 
               action_num_reverse_chrono, bought_in_session, onehot_clicks, onehot_carts, onehot_orders]
    ft, df = callPipeline(df, pipeline)
    return ft, df

# * Конфиг *

In [9]:
# ==================================================================
# ----- Конфигурация сессии для экспериментов-----------------------
# ==================================================================
LOCAL = False # Локально ли работает (чтобы не удалить все файлы)
SHOW  = True # Выводить ли таблицы в процессе
# ----------------------------------------------------
params = {
    'dataset': [
        {'path':'/kaggle/input/otto-full-optimized-memory-footprint/train.parquet'},
    ],
    'dataset_sample': 1.0,
    'prepare_dataset': True,
    
    'prepare_session': True,
    'prepare_aid': True,
    'prepare_session_aid': True,
    
    'merge_features': True,
    'chunks': 10,
    'pipelines': {
        'session': [fSession, sessionPipline],
        'aid': [fAid, aidPipline],
        'session_aid': [fSession, fAid, fType, session_aidPipline]
    }
}
# ----------------------------------------------------
pd = cudf if GPU else pandas
print('We will use RAPIDS version',cudf.__version__) if GPU else print('We will use CPU', '...')

We will use RAPIDS version 21.10.01


# * Подготовка датасета и фич *

In [10]:
%%time
# -- разбиваем все датасеты на файлы по сессиям (сессиий в файлах не пересекаются) --
# ~ 1m CPU
# ~ 30s GPU
if params.get('prepare_dataset') != False:
    for j, dataset in enumerate(params['dataset']):
        print("process '", dataset['path'], "' ...")
        train = loadTrainData(dataset['path'], p=params.get('dataset_sample') or 1.0)
        start, end = train['session'].min(), train['session'].max()
        ln = ceil((end - start) / params['chunks'])
        for i in range(0, params['chunks']):
            s = start + i*ln
            e = min(start + (i+1)*ln, end + 1)
            if e == end: e += 1
            __ = train[(train['session']>=s)&(train['session']<e)]
            if j == 0: __.to_parquet(f'train_{i}.parquet', index=False)
            else:
                tmp = pd.read_parquet(f'train_{i}.parquet')
                __ = pd.concat([tmp, __])
                __.to_parquet(f'train_{i}.parquet', index=False)
            print('\r', f'end chunk {i+1} of {params["chunks"]} ({s} to {e-1})')#, end='   ')

        if SHOW and i == params['chunks'] -1: display(train.head(4))
        del train; gc.collect()

process ' /kaggle/input/otto-full-optimized-memory-footprint/train.parquet ' ...
 end chunk 1 of 10 (0 to 1289977)
 end chunk 2 of 10 (1289978 to 2579955)
 end chunk 3 of 10 (2579956 to 3869933)
 end chunk 4 of 10 (3869934 to 5159911)
 end chunk 5 of 10 (5159912 to 6449889)
 end chunk 6 of 10 (6449890 to 7739867)
 end chunk 7 of 10 (7739868 to 9029845)
 end chunk 8 of 10 (9029846 to 10319823)
 end chunk 9 of 10 (10319824 to 11609801)
 end chunk 10 of 10 (11609802 to 12899778)


Unnamed: 0,session,aid,ts,type
0,0,1517085,1659304800,0
1,0,1563459,1659304904,0
2,0,1309446,1659367439,0
3,0,16246,1659367719,0


CPU times: user 4 s, sys: 4.53 s, total: 8.53 s
Wall time: 34.8 s


# * Вычисляем фичи *

In [11]:
%%time
# -- вычисляем фичи для сессий --
# ~ 6m 30s CPU
# ~ 5m 16s GPU
if params.get('prepare_session') != False:
    print('--- start', '-'*50)
    key = 'session'
    for i in range(0, params['chunks']):
        train = loadTrainData(path=f'train_{i}.parquet')
        s, e = train['session'].min(), train['session'].max()
        features, train = callPipeline(train.to_pandas() if GPU else train, params['pipelines'].get(key))
        train.drop_duplicates(features, inplace=True)
        train = train[features]
        train.to_parquet(f'train_{i}f_{key}.parquet', index=False)
        print('\r', f'end chunk {i+1} of {params["chunks"]} ({s} to {e})')#, end = '   ')
    if SHOW and i == params['chunks'] -1: display(train.head(4))
    del train; gc.collect()

--- start --------------------------------------------------
 end chunk 1 of 10 (0 to 1289977)
 end chunk 2 of 10 (1289978 to 2579955)
 end chunk 3 of 10 (2579956 to 3869933)
 end chunk 4 of 10 (3869934 to 5159911)
 end chunk 5 of 10 (5159912 to 6449889)
 end chunk 6 of 10 (6449890 to 7739867)
 end chunk 7 of 10 (7739868 to 9029845)
 end chunk 8 of 10 (9029846 to 10319823)
 end chunk 9 of 10 (10319824 to 11609801)
 end chunk 10 of 10 (11609802 to 12899778)


Unnamed: 0,session,session_length,user_item_count,user_buy_ratio,user_clicks,user_carts,user_orders,user_carts_clicks_ratio,user_orders_carts_ratio,user_orders_clicks_ratio
0,11609802,20,11,0.9,8,6,6,0.75,1.0,0.75
20,11609803,4,3,0.0,4,0,0,0.0,0.0,0.0
24,11609804,2,2,0.0,2,0,0,0.0,0.0,0.0
26,11609805,19,13,0.0,19,0,0,0.0,0.0,0.0


CPU times: user 4min 8s, sys: 1min 5s, total: 5min 14s
Wall time: 5min 16s


In [12]:
%%time
# -- вычисляем фичи для товаров --
# ~ 20m CPU
# ~ 1m 30s GPU
if params.get('prepare_aid') != False:
    print('--- start', '-'*50)
    key = 'aid'
    for i in range(0, params['chunks']):
        train = loadTrainData(path=f'train_{i}.parquet')
        s, e = train['session'].min(), train['session'].max()
        features, tmp = callPipeline(train.to_pandas() if GPU else train, params['pipelines'].get(key))
        tmp.drop_duplicates(features, inplace=True)
        tmp = tmp[features]
        tmp.to_parquet(f'train_{i}f_{key}.parquet', index=False)
        print('\r', f'end chunk {i+1} of {params["chunks"]} ({s} to {e})')#, end = '   ')
    if SHOW and i == params['chunks'] -1: display(tmp.head(4))
    del train; gc.collect()

--- start --------------------------------------------------
 end chunk 1 of 10 (0 to 1289977)
 end chunk 2 of 10 (1289978 to 2579955)
 end chunk 3 of 10 (2579956 to 3869933)
 end chunk 4 of 10 (3869934 to 5159911)
 end chunk 5 of 10 (5159912 to 6449889)
 end chunk 6 of 10 (6449890 to 7739867)
 end chunk 7 of 10 (7739868 to 9029845)
 end chunk 8 of 10 (9029846 to 10319823)
 end chunk 9 of 10 (10319824 to 11609801)
 end chunk 10 of 10 (11609802 to 12899778)


Unnamed: 0,aid,clicks_user_counter,carts_user_counter,orders_user_counter,clicks_timed_global_counter,carts_timed_global_counter,orders_timed_global_counter,aid_clicked_count,aid_carted_count,aid_ordered_count,item_item_count,item_user_count,item_buy_ratio
0,1418554,289,71,21,10.923237,2.340249,1.377593,347,74,22,443,292,0.266366
2,1778816,6,1,0,0.107884,0.029046,0.0,6,1,0,7,7,0.142857
3,652818,4,2,1,0.128631,0.029046,0.033195,6,2,1,9,5,0.444444
4,1453830,28,4,1,3.03112,0.076763,0.041494,41,5,1,47,28,0.148936


CPU times: user 11min 53s, sys: 2min 36s, total: 14min 30s
Wall time: 14min 34s


In [13]:
%%time
# -- вычисляем фичи для сессий/товаров --
# ~ 25m CPU
# ~ 13m 30s GPU
if params.get('prepare_session_aid') != False:
    print('--- start', '-'*50)
    key = 'session_aid'
    for i in range(0, params['chunks']):
        train = loadTrainData(path=f'train_{i}.parquet')
        s, e = train['session'].min(), train['session'].max()
        features, tmp = callPipeline(train.to_pandas() if GPU else train.copy(), params['pipelines'].get(key))
        if GPU: tmp = pd.from_pandas(tmp)
        tmp.drop_duplicates(features, inplace=True)
        tmp = tmp[features]
        for val, n in { 'orders':2 }.items(): # 'clicks', 'carts', 
            tmp[tmp['type'] == n].to_parquet(f'train_{i}f_{key}_{val}.parquet', index=False)
        print('\r', f'end chunk {i+1} of {params["chunks"]} ({s} to {e})')#, end = '   ')
    if SHOW and i == params['chunks'] -1: display(tmp.head(4))
    del train; gc.collect()

--- start --------------------------------------------------
 end chunk 1 of 10 (0 to 1289977)
 end chunk 2 of 10 (1289978 to 2579955)
 end chunk 3 of 10 (2579956 to 3869933)
 end chunk 4 of 10 (3869934 to 5159911)
 end chunk 5 of 10 (5159912 to 6449889)
 end chunk 6 of 10 (6449890 to 7739867)
 end chunk 7 of 10 (7739868 to 9029845)
 end chunk 8 of 10 (9029846 to 10319823)
 end chunk 9 of 10 (10319824 to 11609801)
 end chunk 10 of 10 (11609802 to 12899778)


Unnamed: 0,session,aid,type,this_aid_clicked_count,this_aid_carted_count,this_aid_ordered_count,sec_to_session_end,sec_since_session_start,action_num_reverse_chrono,bought_in_session,onehot_clicks,onehot_carts,onehot_orders
9,11609802,394571,0,1,1,1,203,445,9,1,1,0,0
10,11609802,394571,1,1,1,1,188,460,10,1,0,1,0
14,11609802,394571,2,1,1,1,0,648,14,1,0,0,1
7,11609802,557439,0,1,1,1,338,310,7,0,1,0,0


CPU times: user 10min 46s, sys: 2min 46s, total: 13min 32s
Wall time: 13min 36s


# * Собираем чанки в файлы *

In [14]:
%%time
# -- собираем session --
# ~ 7s
if params.get('merge_features') != False:
    key = 'session'
    result = pd.DataFrame()
    for file in glob.glob(f'train_*f_{key}.parquet'):
        train = loadTrainData(path=file)
        print('\r', f"file '{file}'")#, end = '   ')
        result = pd.concat([result, train])
    result.to_parquet(f'features_{key}.parquet', index=False)
    if SHOW: display(result.head(4))
    del train, result; gc.collect()

 file 'train_9f_session.parquet'
 file 'train_7f_session.parquet'
 file 'train_1f_session.parquet'
 file 'train_4f_session.parquet'
 file 'train_0f_session.parquet'
 file 'train_8f_session.parquet'
 file 'train_2f_session.parquet'
 file 'train_3f_session.parquet'
 file 'train_6f_session.parquet'
 file 'train_5f_session.parquet'


Unnamed: 0,session,session_length,user_item_count,user_buy_ratio,user_clicks,user_carts,user_orders,user_carts_clicks_ratio,user_orders_carts_ratio,user_orders_clicks_ratio
0,11609802,20,11,0.9,8,6,6,0.75,1.0,0.75
1,11609803,4,3,0.0,4,0,0,0.0,0.0,0.0
2,11609804,2,2,0.0,2,0,0,0.0,0.0,0.0
3,11609805,19,13,0.0,19,0,0,0.0,0.0,0.0


CPU times: user 2.13 s, sys: 464 ms, total: 2.59 s
Wall time: 2.58 s


In [15]:
%%time
# -- собираем aid --
# ~ 7s
if params.get('merge_features') != False:
    key = 'aid'
    result = pd.DataFrame()
    for file in glob.glob(f'train_*f_{key}.parquet'):
        train = loadTrainData(path=file)
        print('\r', f"file '{file}'")#, end = '   ')
        result = pd.concat([result, train])
    result.to_parquet(f'features_{key}.parquet', index=False)
    if SHOW: display(result.head(4))
    del train, result; gc.collect()

 file 'train_0f_aid.parquet'
 file 'train_4f_aid.parquet'
 file 'train_2f_aid.parquet'
 file 'train_9f_aid.parquet'
 file 'train_3f_aid.parquet'
 file 'train_6f_aid.parquet'
 file 'train_7f_aid.parquet'
 file 'train_1f_aid.parquet'
 file 'train_8f_aid.parquet'
 file 'train_5f_aid.parquet'


Unnamed: 0,aid,clicks_user_counter,carts_user_counter,orders_user_counter,clicks_timed_global_counter,carts_timed_global_counter,orders_timed_global_counter,aid_clicked_count,aid_carted_count,aid_ordered_count,item_item_count,item_user_count,item_buy_ratio
0,1517085,21,1,1,2.987952,0.240964,0.246988,34,2,2,38,21,0.157895
1,1563459,19,0,0,3.401606,0.0,0.0,20,0,0,20,19,0.0
2,1309446,592,90,25,142.367477,12.843373,3.369478,1218,121,29,1368,594,0.130848
3,16246,199,34,20,53.483936,8.405622,3.634538,395,52,20,467,199,0.197002


CPU times: user 1.78 s, sys: 573 ms, total: 2.35 s
Wall time: 2.34 s


In [16]:
%%time
# -- собираем session aid --
# ~ 7s CPU
if params.get('merge_features') != False:
    key = 'session_aid'
    result = pd.DataFrame()
    for val in ['orders']: # 'clicks', 'carts', 
        for file in glob.glob(f'train_*f_{key}_{val}.parquet'):
            train = loadTrainData(path=file)
            print('\r', f"file '{file}'")#, end = '   ')
            result = pd.concat([result, train])
        result.to_parquet(f'features_{key}_{val}.parquet', index=False)
    if SHOW: display(result.head(4))
    del train, result; gc.collect()

 file 'train_8f_session_aid_orders.parquet'
 file 'train_9f_session_aid_orders.parquet'
 file 'train_7f_session_aid_orders.parquet'
 file 'train_0f_session_aid_orders.parquet'
 file 'train_6f_session_aid_orders.parquet'
 file 'train_4f_session_aid_orders.parquet'
 file 'train_3f_session_aid_orders.parquet'
 file 'train_1f_session_aid_orders.parquet'
 file 'train_5f_session_aid_orders.parquet'
 file 'train_2f_session_aid_orders.parquet'


Unnamed: 0,session,aid,type,this_aid_clicked_count,this_aid_carted_count,this_aid_ordered_count,sec_to_session_end,sec_since_session_start,action_num_reverse_chrono,bought_in_session,onehot_clicks,onehot_carts,onehot_orders
0,10319832,1060928,2,1,1,1,0,591,11,1,0,0,1
1,10319832,1690705,2,2,2,2,0,591,10,1,0,0,1
2,10319835,145948,2,2,2,2,671409,996,14,1,0,0,1
3,10319835,326481,2,5,5,5,671409,996,11,1,0,0,1


CPU times: user 784 ms, sys: 248 ms, total: 1.03 s
Wall time: 1.05 s


In [17]:
#pd.DataFrame().to_csv('free_df.csv')

In [None]:
if not LOCAL: 
    removeFiles('train_*.parquet')
    #for file in ['features_session_aid_carts', 'features_session_aid_clicks']:
    #    os.remove(file + '.parquet')