In [1]:
import pandas as pd, numpy as np, polars as pl
from tqdm.notebook import tqdm
import os, sys, pickle, glob, gc
from collections import Counter
from pandarallel import pandarallel
import cudf, itertools

pandarallel.initialize(nb_workers=28, progress_bar=False)

VER = 1

print('We will use RAPIDS version',cudf.__version__)

INFO: Pandarallel will run on 28 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.
We will use RAPIDS version 22.12.0


### Compute Three Co-visitation Matrices with RAPIDS¶
We will compute 3 co-visitation matrices using RAPIDS cuDF on GPU. This is 30x faster than using Pandas CPU like other public notebooks! For maximum speed, set the variable DISK_PIECES to the smallest number possible based on the GPU you are using without incurring memory errors. If you run this code offline with 32GB GPU ram, then you can use DISK_PIECES = 1 and compute each co-visitation matrix in almost 1 minute! Kaggle's GPU only has 16GB ram, so we use DISK_PIECES = 4 and it takes an amazing 3 minutes each! Below are some of the tricks to speed up computation

Use RAPIDS cuDF GPU instead of Pandas CPU
Read disk once and save in CPU RAM for later GPU multiple use
Process largest amount of data possible on GPU at one time
Merge data in two stages. Multiple small to single medium. Multiple medium to single large.
Write result as parquet instead of dictionary

In [2]:
%%time
# CACHE FUNCTIONS
def read_file(f):
    return cudf.DataFrame( data_cache[f] )
def read_file_to_cache(f):
    df = pd.read_parquet(f)
    df.ts = (df.ts/1000).astype('int32')
    df['type'] = df['type'].map(type_labels).astype('int8')
    return df

# CACHE THE DATA ON CPU BEFORE PROCESSING ON GPU
data_cache = {}
type_labels = {'clicks':0, 'carts':1, 'orders':2}
files = glob.glob('./val_data/*_parquet/*')
for f in files: data_cache[f] = read_file_to_cache(f)

# CHUNK PARAMETERS
READ_CT = 5
CHUNK = int( np.ceil( len(files)/6 ))
print(f'We will process {len(files)} files, in groups of {READ_CT} and chunks of {CHUNK}.')

We will process 120 files, in groups of 5 and chunks of 20.
CPU times: user 10.1 s, sys: 4.62 s, total: 14.7 s
Wall time: 19.4 s


### 1) "Carts Orders" Co-visitation Matrix - Type Weighted

In [4]:
%%time
type_weight = {0:0.5, 1:9, 2:.5}
topn = 40

# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 2
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = df.type_y.map(type_weight)
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP N
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<topn].drop('n',axis=1)
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'./val_co_visitation_matrix/top_{topn}_carts_orders_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 

### DISK PART 2
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 
CPU times: user 27.2 s, sys: 18.3 s, total: 45.6 s
Wall time: 46.5 s


### 2) "Buy2Buy" Co-visitation Matrix

In [5]:
%%time
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 1
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.loc[df['type'].isin([1,2])] # ONLY WANT CARTS AND ORDERS
            df = df.sort_values(['session','ts'],ascending=[True,False])
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 14 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ] # 14 DAYS
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = 1
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<topn].drop('n',axis=1)
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'./val_co_visitation_matrix/top_{topn}_buy2buy_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 
CPU times: user 4.84 s, sys: 4.06 s, total: 8.91 s
Wall time: 9.16 s


### 3) "Clicks" Co-visitation Matrix - Time Weighted

In [6]:
%%time
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 2
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<topn].drop('n',axis=1)
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'./val_co_visitation_matrix/top_{topn}_clicks_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 

### DISK PART 2
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 
CPU times: user 26.7 s, sys: 17.6 s, total: 44.2 s
Wall time: 45.2 s


In [7]:
# FREE MEMORY
del data_cache, tmp
_ = gc.collect()

### Step 2 - ReRank (choose 20) using handcrafted rules

In [9]:
def load_test():    
    dfs = []
    for e, chunk_file in enumerate(glob.glob('./val_data/test_parquet/*')):
        chunk = pd.read_parquet(chunk_file)
        chunk.ts = (chunk.ts/1000).astype('int32')
        chunk['type'] = chunk['type'].map(type_labels).astype('int8')
        dfs.append(chunk)
    return pd.concat(dfs).reset_index(drop=True)

test_df = load_test()
print('Test data has shape',test_df.shape)
test_df.head()

Test data has shape (7683577, 4)


Unnamed: 0,session,aid,ts,type
0,11098528,11830,1661119200,0
1,11098529,1105029,1661119200,0
2,11098530,264500,1661119200,0
3,11098530,264500,1661119288,0
4,11098530,409236,1661119369,0


In [10]:
%%time
# LOAD THREE CO-VISITATION MATRICES
def pqt_to_dict(path):
    return pl.read_parquet(path).groupby('aid_x').agg(pl.col('aid_y').list()).to_pandas().set_index('aid_x').aid_y.apply(list).to_dict()

# load topn clicks co visitation matrix
topn_clicks = pqt_to_dict(f'./val_co_visitation_matrix/top_{topn}_clicks_v{VER}_0.pqt')
for k in range(1, DISK_PIECES): 
    topn_clicks.update( pqt_to_dict(f'./val_co_visitation_matrix/top_{topn}_clicks_v{VER}_{k}.pqt') )

# load topn buys co visitation matrix
topn_buys = pqt_to_dict(f'./val_co_visitation_matrix/top_{topn}_carts_orders_v{VER}_0.pqt')
for k in range(1, DISK_PIECES): 
    topn_buys.update( pqt_to_dict(f'./val_co_visitation_matrix/top_{topn}_carts_orders_v{VER}_{k}.pqt') )
    
# load topn b2b co visitation matrix
topn_b2b = pqt_to_dict(f'./val_co_visitation_matrix/top_{topn}_buy2buy_v{VER}_0.pqt')

# TOP CLICKS AND ORDERS IN TEST
top_clicks = test_df.loc[test_df['type']=='clicks','aid'].value_counts().index.values[:40]
top_orders = test_df.loc[test_df['type']=='orders','aid'].value_counts().index.values[:40]

print('Here are size of our 3 co-visitation matrices:')
print( len( topn_clicks ), len( topn_buys ), len( topn_b2b ) )

Here are size of our 3 co-visitation matrices:
1812132 1812132 1055146
CPU times: user 16.3 s, sys: 1.87 s, total: 18.1 s
Wall time: 11.8 s


In [11]:
type_weight_multipliers = {0: 1, 1: 6, 2: 3}

topn = 100

def suggest_clicks(df):
    # USE USER HISTORY AIDS AND TYPES
    aids=df.aid.tolist()
    types = df.type.tolist()
    unique_aids = list(dict.fromkeys(aids[::-1] ))
    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=topn:
        weights=np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        sorted_aids = [k for k,v in aids_temp.most_common(topn)]
        return sorted_aids
    # USE "CLICKS" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[topn_clicks[aid] for aid in unique_aids if aid in topn_clicks]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2).most_common(topn) if aid2 not in unique_aids]
    result = unique_aids + top_aids2[:topn - len(unique_aids)]
    # USE TOP20 TEST CLICKS
    return result + list(top_clicks)[:topn-len(result)]

def suggest_buys(df):
    # USE USER HISTORY AIDS AND TYPES
    aids=df.aid.tolist()
    types = df.type.tolist()
    # UNIQUE AIDS AND UNIQUE BUYS
    unique_aids = list(dict.fromkeys(aids[::-1] ))
    df = df.loc[(df['type']==1)|(df['type']==2)]
    unique_buys = list(dict.fromkeys( df.aid.tolist()[::-1] ))
    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=topn:
        weights=np.logspace(0.5,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        # RERANK CANDIDATES USING "BUY2BUY" CO-VISITATION MATRIX
        aids3 = list(itertools.chain(*[topn_b2b[aid] for aid in unique_buys if aid in topn_b2b]))
        for aid in aids3: aids_temp[aid] += 0.1
        sorted_aids = [k for k,v in aids_temp.most_common(topn)]
        return sorted_aids
    # USE "CART ORDER" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[topn_buys[aid] for aid in unique_aids if aid in topn_buys]))
    # USE "BUY2BUY" CO-VISITATION MATRIX
    aids3 = list(itertools.chain(*[topn_b2b[aid] for aid in unique_buys if aid in topn_b2b]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2+aids3).most_common(topn) if aid2 not in unique_aids] 
    result = unique_aids + top_aids2[:topn - len(unique_aids)]
    # USE TOP20 TEST ORDERS
    return result + list(top_orders)[:topn-len(result)]

### Create submission csv to validate cv score
- Inferring test data with Pandas groupby is slow. We use pandarallel to accelerate process

In [12]:
%%time
pred_df_clicks = test_df.sort_values(["session", "ts"]).groupby(["session"]).parallel_apply(
    lambda x: suggest_clicks(x)
)

  iterator = iter(dataframe_groupby)


CPU times: user 49 s, sys: 5.11 s, total: 54.2 s
Wall time: 1min 51s


In [13]:
%%time
pred_df_buys = test_df.sort_values(["session", "ts"]).groupby(["session"]).parallel_apply(
    lambda x: suggest_buys(x)
)

  iterator = iter(dataframe_groupby)


CPU times: user 59.4 s, sys: 7.83 s, total: 1min 7s
Wall time: 2min 31s


In [14]:
# generate submission dataframe
clicks_pred_df = pd.DataFrame(pred_df_clicks.add_suffix("_clicks"), columns=["labels"]).reset_index()
orders_pred_df = pd.DataFrame(pred_df_buys.add_suffix("_orders"), columns=["labels"]).reset_index()
carts_pred_df = pd.DataFrame(pred_df_buys.add_suffix("_carts"), columns=["labels"]).reset_index()

pred_df = pd.concat([clicks_pred_df, orders_pred_df, carts_pred_df])
pred_df.columns = ["session_type", "labels"]
pred_df["labels"] = pred_df.labels.apply(lambda x: " ".join(map(str,x)))
# pred_df.to_csv("validation_preds.csv", index=False)
pred_df.head()

Unnamed: 0,session_type,labels
0,11098528_clicks,11830 588923 1732105 571762 884502 1157882 876...
1,11098529_clicks,1105029 459126 1339838 1544564 217742 1694360 ...
2,11098530_clicks,409236 264500 1603001 963957 254154 583026 167...
3,11098531_clicks,396199 1271998 452188 1728212 1365569 624163 1...
4,11098532_clicks,876469 7651 108125 612920 1202618 1159379 7790...


### Compute Validation Score
This code is from Radek here. It has been modified to use less memory.

In [17]:
%%time
# COMPUTE METRIC
score = 0
weights = {'clicks': 0.10, 'carts': 0.30, 'orders': 0.60}
for t in ['clicks','carts','orders']:
    sub = pred_df.loc[pred_df.session_type.str.contains(t)].copy()
    sub['session'] = sub.session_type.apply(lambda x: int(x.split('_')[0]))
    sub.labels = sub.labels.apply(lambda x: [int(i) for i in x.split(' ')[:topn]])
    test_labels = pd.read_parquet('./val_data/test_labels.parquet')
    test_labels = test_labels.loc[test_labels['type']==t]
    test_labels = test_labels.merge(sub, how='left', on=['session'])
    test_labels['hits'] = test_labels.apply(lambda df: len(set(df.ground_truth).intersection(set(df.labels))), axis=1)
    test_labels['gt_count'] = test_labels.ground_truth.str.len().clip(0,20)
    recall = test_labels['hits'].sum() / test_labels['gt_count'].sum()
    score += weights[t]*recall
    print(f'{t} recall =',recall)
    
print('=============')
print('Overall Recall =',score)
print('=============')

clicks recall = 0.620077423735456
carts recall = 0.49115323635430075
orders recall = 0.6999135022645809
Overall Recall = 0.6293018146385844
CPU times: user 49.6 s, sys: 2.16 s, total: 51.7 s
Wall time: 51.9 s


In [18]:
%%time
# write candidates to local disk for stage 2 model
pred_df.to_parquet('./val_data/cv_candidates.pgt')

CPU times: user 6.54 s, sys: 1.74 s, total: 8.28 s
Wall time: 11.1 s
