# Candidates creation

In [1]:
!pip install pyarrow fastparquet

Collecting fastparquet
  Downloading fastparquet-0.8.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m39.2 MB/s[0m eta [36m0:00:00[0m00:01[0m
Collecting cramjam>=2.3.0
  Downloading cramjam-2.7.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m57.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: cramjam, fastparquet
Successfully installed cramjam-2.7.0 fastparquet-0.8.1
[0m

In [2]:
VALIDA = True

In [3]:
DEBUG = False

# --- OTTO [Build metadata] notebook params
TOP_K = 20
TOP_OUTPUT = 100
LEAK_DATA = True
DELTA_TS = 24 # hours
DISK_PIECES = 4 
SIZE = 1.86e6/DISK_PIECES # total sessions
# ---

type_labels = {'clicks':0, 'carts':1, 'orders':2}

if VALIDA:
    metadata_path = "/kaggle/input/otto-validation-covisitations"
else:
    metadata_path = "/kaggle/input/otto-test-covisitations"

In [4]:
import pandas as pd
import numpy as np
import copy
from tqdm.notebook import tqdm
import os, sys, pickle, glob, gc
from collections import Counter
import itertools

from multiprocessing import Pool
import psutil
N_CPU = psutil.cpu_count()
print("Number of cpu:", N_CPU)

Number of cpu: 4


In [5]:
def df_parallelize_run(func, t_split):
    
    num_cores = np.min([N_CPU, len(t_split)])
    pool = Pool(num_cores)
    df = pool.map(func, t_split)
    pool.close()
    pool.join()
    
    return df

In [6]:
def pqt_to_dict(df):
#     if USE_FREQUENCY_SCORE:
#         return df.groupby('aid_x').apply(lambda df: Counter(dict(zip(df.aid_y, df.wgt))))
    return df.groupby('aid_x').aid_y.apply(list).to_dict()

# Load data

In [7]:
def load_test(path):    
    dfs = []
    for e, chunk_file in sorted(enumerate(glob.glob(path))):
        chunk = pd.read_parquet(chunk_file)
        chunk.ts = (chunk.ts/1000).astype('int32')
        chunk['type'] = chunk['type'].map(type_labels).astype('int8')
        dfs.append(chunk)
    return pd.concat(dfs).reset_index(drop=True) #.astype({"ts": "datetime64[ms]"})

if VALIDA:
    test_df = load_test('../input/otto-validation/test_parquet/*')
else:
    test_df = load_test('/kaggle/input/otto-chunk-data-inparquet-format/test_parquet/*')

print('Test data has shape',test_df.shape)
test_df.head()

Test data has shape (7683577, 4)


Unnamed: 0,session,aid,ts,type
0,12089221,700554,1661448002,0
1,12089221,619488,1661448024,0
2,12089221,579241,1661449547,0
3,12089221,619488,1661449585,0
4,12089221,619488,1661456661,0


In [8]:
%%time
if VALIDA:
    dir_ = '../input/otto-valid-test-list/valid_group_tolist'
else:
    dir_ = '../input/otto-valid-test-list/test_group_tolist'

PIECES = 5
VER = 1
test_bysession_list = []
for PART in range(PIECES):
    with open(dir_+f'_{PART}_{VER}.pkl', 'rb') as f:
        test_bysession_list.extend(pickle.load(f))
print(len(test_bysession_list))

1801251
CPU times: user 8.32 s, sys: 861 ms, total: 9.18 s
Wall time: 10.1 s


In [9]:
if False:
    top_clicks = list(test_df.loc[test_df['type']== 0,'aid'].value_counts().index.values[:TOP_OUTPUT]) 
    top_carts = list(test_df.loc[test_df['type']== 1,'aid'].value_counts().index.values[:TOP_OUTPUT])
    top_orders = list(test_df.loc[test_df['type']== 2,'aid'].value_counts().index.values[:TOP_OUTPUT])

del test_df
_ = gc.collect()

In [10]:
def load_covisitation(TYPE_COO, DELTA_TS):
    OUTPUT_NAME = f"top_{TOP_K}_{TYPE_COO}_{DELTA_TS}hours"
    try:
        assert USE_PICKLE_COVI == True
        with open(metadata_path+f'/{OUTPUT_NAME}_full.pickle', 'rb') as file:
            top = pickle.load(file)
        print(f"Load {TYPE_COO} covisitaion | dict format")
    except:
        print(f"Load {TYPE_COO} covisitaion | dataframe format")
        top = pqt_to_dict(pd.read_parquet(metadata_path+f'/{OUTPUT_NAME}_full.pqt'))
    return top

In [11]:
class Candidates:
    def __init__(self, first_option, second_option):
        self.first_option = first_option       
        self.second_option = second_option
        #type_weight_multipliers = {'clicks': 1, 'carts': 6, 'orders': 3}
        self.type_weight_multipliers = {0: 0.5, 1: 9, 2: 0.5}
        
    def suggest_candidates(self, df):
        session = df[0]
        aids = df[1]
        types = df[2]
        unique_aids = list(dict.fromkeys(aids[::-1]))

        # history candidates
        weights=np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * self.type_weight_multipliers[t]

        sorted_aids = []
        weights = []
        type_cands = []

        if len(unique_aids) >= TOP_OUTPUT:
            for aid, cnt in aids_temp.most_common(TOP_OUTPUT):
                sorted_aids.append(aid)
                weights.append(cnt)
                type_cands.append(1)
            return session, sorted_aids, weights, type_cands
        else:
            for aid, cnt in aids_temp.most_common(len(unique_aids)):
                sorted_aids.append(aid)
                weights.append(cnt)
                type_cands.append(1)

        # potential candidates
        aids2 = list(itertools.chain(*[self.first_option[aid] for aid in unique_aids if aid in self.first_option]))
        aids3 = list(itertools.chain(*[self.second_option[aid] for aid in unique_aids if aid in self.second_option]))

        top_aids2 = Counter(aids2+aids3)
        top_aids2 = [(aid2,cnt) for aid2, cnt in top_aids2.most_common(TOP_OUTPUT) if aid2 not in unique_aids]
        for aid, cnt in top_aids2[:TOP_OUTPUT - len(unique_aids)]:
            sorted_aids.append(aid)
            weights.append(cnt)
            type_cands.append(0)
        
        return session, sorted_aids, weights, type_cands

In [12]:
def predict_candidates(suggest, test_bysession_list, type_name):
    # # Predict on all sessions in parallel
    temp = df_parallelize_run(suggest, test_bysession_list)
    session, aids, weights, type_cands = [], [], [], []
    for _session, _aids, _weights, _type in temp:
        session.extend([_session]*len(_aids))
        aids.extend(_aids)
        weights.extend(_weights)
        type_cands.extend(_type)

    candidates_df = pd.DataFrame({'session':session,'aid':aids,'score':weights,'type_candidate':type_cands})
    candidates_df.to_parquet(f'/kaggle/working/{type_name}_candidates.pqt')
    
    del temp, candidates_df
    del session, aids, weights, type_cands
    _ = gc.collect()

In [13]:
%%time
TYPE_COO = "fulltype"
DELTA_TS = 24
top_fulltype = load_covisitation(TYPE_COO, DELTA_TS)

Load fulltype covisitaion | dataframe format
CPU times: user 41.9 s, sys: 3.4 s, total: 45.3 s
Wall time: 44.9 s


# Predict clicks

In [14]:
%%time
TYPE_COO = "click"
DELTA_TS = 24
top_click = load_covisitation(TYPE_COO, DELTA_TS)

Load click covisitaion | dataframe format
CPU times: user 42.4 s, sys: 3.18 s, total: 45.6 s
Wall time: 45.8 s


In [15]:
%%time
suggest_clicks = Candidates(top_click, top_fulltype)
predict_candidates(suggest_clicks.suggest_candidates, test_bysession_list, 'clicks')
del suggest_clicks, top_click
_ = gc.collect()

CPU times: user 4min 22s, sys: 36.4 s, total: 4min 58s
Wall time: 6min 39s


# Predict carts

In [16]:
%%time
TYPE_COO = "cart"
DELTA_TS = 24*7
top_cart = load_covisitation(TYPE_COO, DELTA_TS)

Load cart covisitaion | dataframe format
CPU times: user 40.1 s, sys: 2.99 s, total: 43.1 s
Wall time: 42.7 s


In [17]:
%%time
suggest_carts = Candidates(top_cart, top_fulltype)
predict_candidates(suggest_carts.suggest_candidates, test_bysession_list, 'carts')
del suggest_carts, top_cart
_ = gc.collect()

CPU times: user 4min 32s, sys: 33.3 s, total: 5min 5s
Wall time: 6min 39s


# Predict purchases

In [18]:
%%time
TYPE_COO = "purchase"
DELTA_TS = 24*7
top_purchase = load_covisitation(TYPE_COO, DELTA_TS)

Load purchase covisitaion | dataframe format
CPU times: user 40.3 s, sys: 3.15 s, total: 43.5 s
Wall time: 43.1 s


In [None]:
%%time
suggest_purchases = Candidates(top_purchase, top_fulltype)
predict_candidates(suggest_purchases.suggest_candidates, test_bysession_list, 'orders')
del suggest_purchases, top_purchase
_ = gc.collect()

# Compute metric

In [None]:
full_test_labels = pd.read_parquet('/kaggle/input/otto-validation/test_labels.parquet')

types = ['clicks','carts','orders']
candidates = dict()
for _type in types:
    cans = pd.read_parquet(f"/kaggle/working/{_type}_candidates.pqt")
    candidates[_type] = (
        cans[['session','aid']]
        .groupby('session')
        .agg({'aid': lambda x: list(x)})
        .reset_index()
        .rename(columns={'aid':'labels'})
    )
    del cans
    gc.collect()

    
candidates['clicks']['n'] = candidates['clicks']['labels'].apply(lambda x: len(x))
candidates['carts']['n'] = candidates['carts']['labels'].apply(lambda x: len(x))
candidates['orders']['n'] = candidates['orders']['labels'].apply(lambda x: len(x))

In [None]:
candidates['clicks']['n'].value_counts().head(10)

In [None]:
candidates['carts']['n'].value_counts().head(10)

In [None]:
candidates['orders']['n'].value_counts().head(10)

In [None]:
score = 0
weights = {'clicks': 0.10, 'carts': 0.30, 'orders': 0.60}

for t in types:
    sub = candidates[t]
    test_labels = full_test_labels.loc[full_test_labels['type']==t]
    test_labels = test_labels.merge(sub, how='left', on=['session'])
    test_labels['hits'] = test_labels.apply(lambda df: len(set(df.ground_truth).intersection(set(df.labels))), axis=1)
    test_labels['gt_count'] = test_labels.ground_truth.str.len().clip(0,20)

    recall = test_labels['hits'].sum() / test_labels['gt_count'].sum()
    score += weights[t]*recall
    print(f'{t} recall =',recall)

print('=============')
print('Overall Recall =',score)
print('=============')