In [1]:
VER = 5

import pandas as pd, numpy as np
from tqdm.notebook import tqdm
import os, sys, pickle, glob, gc
from collections import Counter
import cudf, itertools
print('We will use RAPIDS version',cudf.__version__)

# multiprocessing 
import psutil
N_CORES = psutil.cpu_count()//2 + 1     # Available CPU cores
print(f"N Cores : {N_CORES}")
from multiprocessing import Pool

We will use RAPIDS version 22.10.01+2.gca9a422da9
N Cores : 9


In [2]:
%%time
# CACHE FUNCTIONS
def read_file(f):
    return cudf.DataFrame( data_cache[f] )
def read_file_to_cache(f):
    df = pd.read_parquet(f)
    df.ts = (df.ts/1000).astype('int32')
    df['type'] = df['type'].map(type_labels).astype('int8')
    return df

# CACHE THE DATA ON CPU BEFORE PROCESSING ON GPU
data_cache = {}
type_labels = {'clicks':0, 'carts':1, 'orders':2}
files = glob.glob('../dataset/otto-recommender-system/otto-chunk-data-inparquet-format/*_parquet/*')
for f in files: data_cache[f] = read_file_to_cache(f)

# CHUNK PARAMETERS
READ_CT = 1
CHUNK = int( np.ceil( len(files)/6 ))
print(f'We will process {len(files)} files, in groups of {READ_CT} and chunks of {CHUNK}.')

We will process 146 files, in groups of 1 and chunks of 25.
CPU times: user 19.4 s, sys: 3.7 s, total: 23.1 s
Wall time: 14.2 s


In [2]:
type_labels = {'clicks':0, 'carts':1, 'orders':2}

In [3]:
%%time
type_weight = {0:1, 1:6, 2:3}

# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 10
CARTS_ORDERS_DISK_PIECES = DISK_PIECES
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = df.type_y.map(type_weight)
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<15].drop('n',axis=1)
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'top_15_carts_orders_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 24 in groups of 1...
0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 20 , 21 , 22 , 23 , 24 , 
Processing files 25 thru 49 in groups of 1...
25 , 26 , 27 , 28 , 29 , 30 , 31 , 32 , 33 , 34 , 35 , 36 , 37 , 38 , 39 , 40 , 41 , 42 , 43 , 44 , 45 , 46 , 47 , 48 , 49 , 
Processing files 50 thru 74 in groups of 1...
50 , 51 , 52 , 53 , 54 , 55 , 56 , 57 , 58 , 59 , 60 , 61 , 62 , 63 , 64 , 65 , 66 , 67 , 68 , 69 , 70 , 71 , 72 , 73 , 74 , 
Processing files 75 thru 99 in groups of 1...
75 , 76 , 77 , 78 , 79 , 80 , 81 , 82 , 83 , 84 , 85 , 86 , 87 , 88 , 89 , 90 , 91 , 92 , 93 , 94 , 95 , 96 , 97 , 98 , 99 , 
Processing files 100 thru 124 in groups of 1...
100 , 101 , 102 , 103 , 104 , 105 , 106 , 107 , 108 , 109 , 110 , 111 , 112 , 113 , 114 , 115 , 116 , 117 , 118 , 119 , 120 , 121 , 122 , 123 , 124 , 
Processing files 125 thru 145 in groups of 1...
125 , 126 , 127 , 128 , 129 , 130 , 131 , 132 , 133 , 134 ,

100 , 101 , 102 , 103 , 104 , 105 , 106 , 107 , 108 , 109 , 110 , 111 , 112 , 113 , 114 , 115 , 116 , 117 , 118 , 119 , 120 , 121 , 122 , 123 , 124 , 
Processing files 125 thru 145 in groups of 1...
125 , 126 , 127 , 128 , 129 , 130 , 131 , 132 , 133 , 134 , 135 , 136 , 137 , 138 , 139 , 140 , 141 , 142 , 143 , 144 , 145 , 

### DISK PART 9
Processing files 0 thru 24 in groups of 1...
0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 20 , 21 , 22 , 23 , 24 , 
Processing files 25 thru 49 in groups of 1...
25 , 26 , 27 , 28 , 29 , 30 , 31 , 32 , 33 , 34 , 35 , 36 , 37 , 38 , 39 , 40 , 41 , 42 , 43 , 44 , 45 , 46 , 47 , 48 , 49 , 
Processing files 50 thru 74 in groups of 1...
50 , 51 , 52 , 53 , 54 , 55 , 56 , 57 , 58 , 59 , 60 , 61 , 62 , 63 , 64 , 65 , 66 , 67 , 68 , 69 , 70 , 71 , 72 , 73 , 74 , 
Processing files 75 thru 99 in groups of 1...
75 , 76 , 77 , 78 , 79 , 80 , 81 , 82 , 83 , 84 , 85 , 86 , 87 , 88 , 89 , 90 , 91 , 92 , 93 , 94 , 95 , 9

In [4]:
%%time
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
BUY2BUY_DISK_PIECES = DISK_PIECES
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.loc[df['type'].isin([1,2])] # ONLY WANT CARTS AND ORDERS
            df = df.sort_values(['session','ts'],ascending=[True,False])
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 14 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ] # 14 DAYS
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = 1
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<15].drop('n',axis=1)
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'top_15_buy2buy_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 24 in groups of 1...
0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 20 , 21 , 22 , 23 , 24 , 
Processing files 25 thru 49 in groups of 1...
25 , 26 , 27 , 28 , 29 , 30 , 31 , 32 , 33 , 34 , 35 , 36 , 37 , 38 , 39 , 40 , 41 , 42 , 43 , 44 , 45 , 46 , 47 , 48 , 49 , 
Processing files 50 thru 74 in groups of 1...
50 , 51 , 52 , 53 , 54 , 55 , 56 , 57 , 58 , 59 , 60 , 61 , 62 , 63 , 64 , 65 , 66 , 67 , 68 , 69 , 70 , 71 , 72 , 73 , 74 , 
Processing files 75 thru 99 in groups of 1...
75 , 76 , 77 , 78 , 79 , 80 , 81 , 82 , 83 , 84 , 85 , 86 , 87 , 88 , 89 , 90 , 91 , 92 , 93 , 94 , 95 , 96 , 97 , 98 , 99 , 
Processing files 100 thru 124 in groups of 1...
100 , 101 , 102 , 103 , 104 , 105 , 106 , 107 , 108 , 109 , 110 , 111 , 112 , 113 , 114 , 115 , 116 , 117 , 118 , 119 , 120 , 121 , 122 , 123 , 124 , 
Processing files 125 thru 145 in groups of 1...
125 , 126 , 127 , 128 , 129 , 130 , 131 , 132 , 133 , 134 ,

In [5]:
%%time
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 8
CLICKS_DISK_PIECES = DISK_PIECES
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<20].drop('n',axis=1)
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'top_20_clicks_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 24 in groups of 1...
0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 20 , 21 , 22 , 23 , 24 , 
Processing files 25 thru 49 in groups of 1...
25 , 26 , 27 , 28 , 29 , 30 , 31 , 32 , 33 , 34 , 35 , 36 , 37 , 38 , 39 , 40 , 41 , 42 , 43 , 44 , 45 , 46 , 47 , 48 , 49 , 
Processing files 50 thru 74 in groups of 1...
50 , 51 , 52 , 53 , 54 , 55 , 56 , 57 , 58 , 59 , 60 , 61 , 62 , 63 , 64 , 65 , 66 , 67 , 68 , 69 , 70 , 71 , 72 , 73 , 74 , 
Processing files 75 thru 99 in groups of 1...
75 , 76 , 77 , 78 , 79 , 80 , 81 , 82 , 83 , 84 , 85 , 86 , 87 , 88 , 89 , 90 , 91 , 92 , 93 , 94 , 95 , 96 , 97 , 98 , 99 , 
Processing files 100 thru 124 in groups of 1...
100 , 101 , 102 , 103 , 104 , 105 , 106 , 107 , 108 , 109 , 110 , 111 , 112 , 113 , 114 , 115 , 116 , 117 , 118 , 119 , 120 , 121 , 122 , 123 , 124 , 
Processing files 125 thru 145 in groups of 1...
125 , 126 , 127 , 128 , 129 , 130 , 131 , 132 , 133 , 134 ,

100 , 101 , 102 , 103 , 104 , 105 , 106 , 107 , 108 , 109 , 110 , 111 , 112 , 113 , 114 , 115 , 116 , 117 , 118 , 119 , 120 , 121 , 122 , 123 , 124 , 
Processing files 125 thru 145 in groups of 1...
125 , 126 , 127 , 128 , 129 , 130 , 131 , 132 , 133 , 134 , 135 , 136 , 137 , 138 , 139 , 140 , 141 , 142 , 143 , 144 , 145 , 
CPU times: user 3min 47s, sys: 1min 56s, total: 5min 44s
Wall time: 5min 47s


# Carts Matrix

In [None]:
%%time
type_weight = {0:1, 1:6, 2:3}

# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 10
CARTS_DISK_PIECES = DISK_PIECES
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = df.type_y.map(type_weight)
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<15].drop('n',axis=1)
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'top_15_carts_orders_v{VER}_{PART}.pqt')

In [6]:
# FREE MEMORY
del data_cache, tmp
_ = gc.collect()

In [3]:
CARTS_ORDERS_DISK_PIECES = 10
BUY2BUY_DISK_PIECES = 4
CLICKS_DISK_PIECES = 8

In [4]:
%%time
def pqt_to_dict(df):
    return df.groupby('aid_x').aid_y.apply(list).to_dict()
# LOAD THREE CO-VISITATION MATRICES
top_20_clicks = pqt_to_dict( pd.read_parquet(f'top_20_clicks_v{VER}_0.pqt') )
for k in range(1,CLICKS_DISK_PIECES): 
    top_20_clicks.update( pqt_to_dict( pd.read_parquet(f'top_20_clicks_v{VER}_{k}.pqt') ) )
top_20_buys = pqt_to_dict( pd.read_parquet(f'top_15_carts_orders_v{VER}_0.pqt') )
for k in range(1,CARTS_ORDERS_DISK_PIECES): 
    top_20_buys.update( pqt_to_dict( pd.read_parquet(f'top_15_carts_orders_v{VER}_{k}.pqt') ) )
top_20_buy2buy = pqt_to_dict( pd.read_parquet(f'top_15_buy2buy_v{VER}_0.pqt') )
for k in range(1,BUY2BUY_DISK_PIECES): 
    top_20_buy2buy.update( pqt_to_dict( pd.read_parquet(f'top_15_buy2buy_v{VER}_{k}.pqt') ) )

# TOP CLICKS AND ORDERS IN TEST
# top_clicks = test_df.loc[test_df['type']=='clicks','aid'].value_counts().index.values[:20]
# top_orders = test_df.loc[test_df['type']=='orders','aid'].value_counts().index.values[:20]

print('Here are size of our 3 co-visitation matrices:')
print( len( top_20_clicks ), len( top_20_buy2buy ), len( top_20_buys ) )

Here are size of our 3 co-visitation matrices:
1837166 1168768 1837166
CPU times: user 1min 23s, sys: 2.69 s, total: 1min 26s
Wall time: 1min 24s


# Fast Inference

In [25]:
def df_parallelize_run(func, t_split):
    
    num_cores = np.min([N_CORES, len(t_split)])
    pool = Pool(num_cores)
    df = pool.map(func, t_split)
    pool.close()
    pool.join()
    
    return df

In [26]:
test = load_test('../dataset/otto-recommender-system/otto-chunk-data-inparquet-format/test_parquet/*')
print('Test data has shape',test.shape)
test.head()
top_clicks = test.loc[test['type']=='clicks','aid'].value_counts().index.values[:20]
top_orders = test.loc[test['type']=='orders','aid'].value_counts().index.values[:20]

Test data has shape (6928123, 4)


In [27]:
%%time
PIECES = 5
test_bysession_list = []
for PART in range(PIECES):
    with open(f'../dataset/otto-recommender-system/otto-fast-group/test_group_tolist_{PART}_1.pkl', 'rb') as f:
        test_bysession_list.extend(pickle.load(f))
print(len(test_bysession_list))

1671803
CPU times: user 7.03 s, sys: 135 ms, total: 7.17 s
Wall time: 7.26 s


In [28]:
#type_weight_multipliers = {'clicks': 1, 'carts': 6, 'orders': 3}
type_weight_multipliers = {0: 1, 1: 5, 2: 4}

def suggest_clicks(df):
    
    session = df[0]
    aids = df[1]
    types = df[2]
    unique_aids = list(dict.fromkeys(aids[::-1] ))
    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=1:
        weights=np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return session, sorted_aids
    
    # USE "CLICKS" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[top_20_clicks[aid] for aid in unique_aids if aid in top_20_clicks]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2).most_common(20) if aid2 not in unique_aids]    
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    
    # USE TOP20 TEST CLICKS
    return session, result + list(top_clicks)[:20-len(result)]

def suggest_buys(df):
    # USE USER HISTORY AIDS AND TYPES
    session = df[0]
    aids = df[1]
    types = df[2]

    unique_aids = list(dict.fromkeys(aids[::-1] ))
    unique_buys = list(dict.fromkeys( [f for i, f in enumerate(aids) if types[i] in [1, 2]][::-1] ))

    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=20:
        
        weights=np.logspace(0.5,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        # RERANK CANDIDATES USING "BUY2BUY" CO-VISITATION MATRIX
        aids3 = list(itertools.chain(*[top_20_buy2buy[aid] for aid in unique_buys if aid in top_20_buy2buy]))
        for aid in aids3: aids_temp[aid] += 0.1
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return session, sorted_aids
            
    # USE "CART ORDER" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[top_20_buys[aid] for aid in unique_aids if aid in top_20_buys]))
    # USE "BUY2BUY" CO-VISITATION MATRIX
    aids3 = list(itertools.chain(*[top_20_buy2buy[aid] for aid in unique_buys if aid in top_20_buy2buy]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2 + aids3).most_common(20) if aid2 not in unique_aids] 
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    # USE TOP20 TEST ORDERS
    return session, result + list(top_orders)[:20-len(result)]

In [29]:
%%time

temp = df_parallelize_run(suggest_clicks, test_bysession_list)
clicks_pred_df = pd.Series([f[1] for f in temp], index=[f[0] for f in temp])
clicks_pred_df = clicks_pred_df.add_suffix("_clicks")
clicks_pred_df.head()

temp = df_parallelize_run(suggest_buys, test_bysession_list)
buys_pred_df = pd.Series([f[1] for f in temp], index=[f[0] for f in temp])
orders_pred_df = buys_pred_df.add_suffix("_orders")
carts_pred_df = buys_pred_df.add_suffix("_carts")

CPU times: user 26.4 s, sys: 3.33 s, total: 29.7 s
Wall time: 51.4 s


In [30]:
pred_df = pd.concat([clicks_pred_df, orders_pred_df, carts_pred_df]).reset_index()
pred_df.columns = ["session_type", "labels"]
pred_df["labels"] = pred_df.labels.apply(lambda x: " ".join(map(str,x)))
pred_df.to_csv("submission.csv", index=False)
pred_df.head()

Unnamed: 0,session_type,labels
0,12899779_clicks,59625
1,12899780_clicks,1142000 736515 973453 582732
2,12899781_clicks,199008 918667 194067 57315 141736
3,12899782_clicks,834354 595994 740494 889671 987399 779477 8291...
4,12899783_clicks,1817895 607638 1754419 300127 1216820 1729553 ...


# Submission must have 5015409 rows

In [31]:
pred_df.shape

(5015409, 2)

# Fast Validation

In [22]:
type_labels = {'clicks':0, 'carts':1, 'orders':2}

def load_test(files):    
    dfs = []
    for e, chunk_file in enumerate(glob.glob(files)):
        chunk = pd.read_parquet(chunk_file)
        chunk.ts = (chunk.ts/1000).astype('int32')
        chunk['type'] = chunk['type'].map(type_labels).astype('int8')
        dfs.append(chunk)
    return pd.concat(dfs).reset_index(drop=True) #.astype({"ts": "datetime64[ms]"})

valid = load_test('../dataset/otto-recommender-system/otto-validation/test_parquet/*')
print('Valid data has shape',valid.shape)
valid.head()
top_clicks = valid.loc[valid['type']=='clicks','aid'].value_counts().index.values[:20]
top_orders = valid.loc[valid['type']=='orders','aid'].value_counts().index.values[:20]

Valid data has shape (7683577, 4)


In [23]:
def df_parallelize_run(func, t_split):
    
    num_cores = np.min([N_CORES, len(t_split)])
    pool = Pool(num_cores)
    df = pool.map(func, t_split)
    pool.close()
    pool.join()
    
    return df

In [24]:
%%time
PIECES = 5
valid_bysession_list = []
for PART in range(PIECES):
    with open(f'../dataset/otto-recommender-system/otto-fast-group/valid_group_tolist_{PART}_1.pkl', 'rb') as f:
        valid_bysession_list.extend(pickle.load(f))
print(len(valid_bysession_list))

1801251
CPU times: user 6.33 s, sys: 164 ms, total: 6.5 s
Wall time: 6.49 s


In [18]:
valid_bysession_list

[[11098528, [11830], [0]],
 [11098529, [1105029], [0]],
 [11098530,
  [264500, 264500, 409236, 409236, 409236, 409236],
  [0, 0, 0, 0, 0, 1]],
 [11098531,
  [452188,
   1239060,
   1557766,
   452188,
   396199,
   1309633,
   1449555,
   1557766,
   1728212,
   1365569,
   1728212,
   1271998,
   396199,
   1271998,
   1553691,
   624163,
   1271998,
   1728212,
   1365569,
   1365569,
   1728212,
   452188,
   1271998,
   396199],
  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2]],
 [11098532, [7651, 876469], [0, 0]],
 [11098533,
  [1309900,
   1074173,
   1074173,
   1016140,
   86580,
   1309900,
   1074173,
   935297,
   508665,
   125394,
   1622419,
   833149,
   765030,
   1074173,
   978918,
   385390,
   1165015],
  [1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]],
 [11098534,
  [1449202, 223062, 1607945, 1342293, 223062, 223062, 908024],
  [0, 0, 0, 0, 0, 0, 0]],
 [11098535,
  [745365,
   745365,
   896972,
   1750442,
   803918,
   803918,
   17

In [19]:
#type_weight_multipliers = {'clicks': 1, 'carts': 6, 'orders': 3}
type_weight_multipliers = {0: 1, 1: 5, 2: 4}

def suggest_clicks(df):
    
    session = df[0]
    aids = df[1]
    types = df[2]
    unique_aids = list(dict.fromkeys(aids[::-1] ))
    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=1:
        weights=np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return session, sorted_aids
    
    # USE "CLICKS" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[top_20_clicks[aid] for aid in unique_aids if aid in top_20_clicks]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2).most_common(20) if aid2 not in unique_aids]    
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    
    # USE TOP20 TEST CLICKS
    return session, result + list(top_clicks)[:20-len(result)]

def suggest_buys(df):
    # USE USER HISTORY AIDS AND TYPES
    session = df[0]
    aids = df[1]
    types = df[2]

    unique_aids = list(dict.fromkeys(aids[::-1] ))
    unique_buys = list(dict.fromkeys( [f for i, f in enumerate(aids) if types[i] in [1, 2]][::-1] ))

    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=20:
        
        weights=np.logspace(0.5,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        # RERANK CANDIDATES USING "BUY2BUY" CO-VISITATION MATRIX
        aids3 = list(itertools.chain(*[top_20_buy2buy[aid] for aid in unique_buys if aid in top_20_buy2buy]))
        for aid in aids3: aids_temp[aid] += 0.1
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return session, sorted_aids
            
    # USE "CART ORDER" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[top_20_buys[aid] for aid in unique_aids if aid in top_20_buys]))
    # USE "BUY2BUY" CO-VISITATION MATRIX
    aids3 = list(itertools.chain(*[top_20_buy2buy[aid] for aid in unique_buys if aid in top_20_buy2buy]))
#     aids4 = list(itertools.chain(*[top_20_clicks[aid] for aid in unique_aids if aid in top_20_clicks]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2 + aids3).most_common(20) if aid2 not in unique_aids] 
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    # USE TOP20 TEST ORDERS
    return session, result + list(top_orders)[:20-len(result)]

In [16]:
valid_bysession_list

[[11098528, [11830], [0]],
 [11098529, [1105029], [0]],
 [11098530,
  [264500, 264500, 409236, 409236, 409236, 409236],
  [0, 0, 0, 0, 0, 1]],
 [11098531,
  [452188,
   1239060,
   1557766,
   452188,
   396199,
   1309633,
   1449555,
   1557766,
   1728212,
   1365569,
   1728212,
   1271998,
   396199,
   1271998,
   1553691,
   624163,
   1271998,
   1728212,
   1365569,
   1365569,
   1728212,
   452188,
   1271998,
   396199],
  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2]],
 [11098532, [7651, 876469], [0, 0]],
 [11098533,
  [1309900,
   1074173,
   1074173,
   1016140,
   86580,
   1309900,
   1074173,
   935297,
   508665,
   125394,
   1622419,
   833149,
   765030,
   1074173,
   978918,
   385390,
   1165015],
  [1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]],
 [11098534,
  [1449202, 223062, 1607945, 1342293, 223062, 223062, 908024],
  [0, 0, 0, 0, 0, 0, 0]],
 [11098535,
  [745365,
   745365,
   896972,
   1750442,
   803918,
   803918,
   17

In [20]:
%%time

temp = df_parallelize_run(suggest_clicks, valid_bysession_list)
val_clicks = pd.Series([f[1]  for f in temp], index=[f[0] for f in temp])

temp = df_parallelize_run(suggest_buys, valid_bysession_list)
val_buys = pd.Series([f[1]  for f in temp], index=[f[0] for f in temp])

CPU times: user 30.3 s, sys: 3.94 s, total: 34.3 s
Wall time: 37.4 s


In [14]:
benchmark = {"clicks":0.59122, "carts":0.49840, "orders":0.69900, "all":0.62804}
weights = {'clicks': 0.10, 'carts': 0.30, 'orders': 0.60}

valid_labels = pd.read_parquet('../dataset/otto-recommender-system/otto-validation/test_labels.parquet')


def hits(b):
    # b[0] : session id
    # b[1] : ground truth
    # b[2] : aids prediction 
    return b[0], len(set(b[1]).intersection(set(b[2]))), np.clip(len(b[1]), 0, 20)

def otto_metric_piece(values, typ, verbose=True):
    
    c1 = pd.DataFrame(values, columns=["labels"]).reset_index().rename({"index":"session"}, axis=1)
    a = valid_labels.loc[valid_labels['type']==typ].merge(c1, how='left', on=['session'])

    b=[[a0, a1, a2] for a0, a1, a2 in zip(a["session"], a["ground_truth"], a["labels"])]
    c = df_parallelize_run(hits, b)
    c = np.array(c)
    
    recall = c[:,1].sum() / c[:,2].sum()
    
    print('{} recall = {:.5f} (vs {:.5f} in benchmark)'.format(typ ,recall, benchmark[typ]))
    
    return recall

def otto_metric(clicks, carts, orders, verbose = True):
    
    score = 0
    score += weights["clicks"] * otto_metric_piece(clicks, "clicks", verbose = verbose)
    score += weights["carts"] * otto_metric_piece(carts, "carts", verbose = verbose)
    score += weights["orders"] * otto_metric_piece(orders, "orders", verbose = verbose)
    
    if verbose:
        print('=============')
        print('Overall Recall = {:.5f} (vs {:.5f} in benchmark)'.format(score, benchmark["all"]))
        print('=============')
    
    return score

In [21]:
%%time
# _ = otto_metric_piece(val_buys, "orders")
# _ = otto_metric_piece(val_buys, "carts")
_ = otto_metric(val_clicks, val_buys, val_buys)

clicks recall = 0.32349 (vs 0.59122 in benchmark)
carts recall = 0.49840 (vs 0.49840 in benchmark)
orders recall = 0.69888 (vs 0.69900 in benchmark)
Overall Recall = 0.60120 (vs 0.62804 in benchmark)
CPU times: user 11.3 s, sys: 3.57 s, total: 14.8 s
Wall time: 16.1 s


# Inference

In [22]:
def load_test():    
    dfs = []
    for e, chunk_file in enumerate(glob.glob('../dataset/otto-recommender-system/otto-validation/test_parquet/*')):
        chunk = pd.read_parquet(chunk_file)
        chunk.ts = (chunk.ts/1000).astype('int32')
        chunk['type'] = chunk['type'].map(type_labels).astype('int8')
        dfs.append(chunk)
    return pd.concat(dfs).reset_index(drop=True) #.astype({"ts": "datetime64[ms]"})

test_df = load_test()
print('Test data has shape',test_df.shape)
test_df.head()

Test data has shape (7683577, 4)


Unnamed: 0,session,aid,ts,type
0,11909095,36607,1661374181,0
1,11909095,721498,1661374243,0
2,11909096,1551315,1661374181,0
3,11909096,1551315,1661374224,0
4,11909096,1551315,1661374533,0


In [28]:
#type_weight_multipliers = {'clicks': 1, 'carts': 6, 'orders': 3}
type_weight_multipliers = {0: 1, 1: 6, 2: 3}

def suggest_clicks(df):
    # USER HISTORY AIDS AND TYPES
    aids=df.aid.tolist()
    types = df.type.tolist()
    unique_aids = list(dict.fromkeys(aids[::-1] ))
    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=20:
        weights=np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return sorted_aids
    # USE "CLICKS" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[top_20_clicks[aid] for aid in unique_aids if aid in top_20_clicks]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2).most_common(20) if aid2 not in unique_aids]    
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    # USE TOP20 TEST CLICKS
    return result + list(top_clicks)[:20-len(result)]

def suggest_buys(df):
    # USER HISTORY AIDS AND TYPES
    aids=df.aid.tolist()
    types = df.type.tolist()
    # UNIQUE AIDS AND UNIQUE BUYS
    unique_aids = list(dict.fromkeys(aids[::-1] ))
    df = df.loc[(df['type']==1)|(df['type']==2)]
    unique_buys = list(dict.fromkeys( df.aid.tolist()[::-1] ))
    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=20:
        weights=np.logspace(0.5,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        # RERANK CANDIDATES USING "BUY2BUY" CO-VISITATION MATRIX
        aids3 = list(itertools.chain(*[top_20_buy2buy[aid] for aid in unique_buys if aid in top_20_buy2buy]))
        for aid in aids3: aids_temp[aid] += 0.1
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return sorted_aids
    # USE "CART ORDER" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[top_20_buys[aid] for aid in unique_aids if aid in top_20_buys]))
    # USE "BUY2BUY" CO-VISITATION MATRIX
    aids3 = list(itertools.chain(*[top_20_buy2buy[aid] for aid in unique_buys if aid in top_20_buy2buy]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2+aids3).most_common(20) if aid2 not in unique_aids] 
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    # USE TOP20 TEST ORDERS
    return result + list(top_orders)[:20-len(result)]

In [None]:
def load_test():    
    dfs = []
    for e, chunk_file in enumerate(glob.glob('../dataset/otto-recommender-system/otto-chunk-data-inparquet-format/test_parquet/*')):
        chunk = pd.read_parquet(chunk_file)
        chunk.ts = (chunk.ts/1000).astype('int32')
        chunk['type'] = chunk['type'].map(type_labels).astype('int8')
        dfs.append(chunk)
    return pd.concat(dfs).reset_index(drop=True) #.astype({"ts": "datetime64[ms]"})

test_df = load_test()
print('Test data has shape',test_df.shape)
test_df.head()

In [None]:
%%time
pred_df_clicks = test_df.sort_values(["session", "ts"]).groupby(["session"]).apply(
    lambda x: suggest_clicks(x)
)

pred_df_buys = test_df.sort_values(["session", "ts"]).groupby(["session"]).apply(
    lambda x: suggest_buys(x)
)

In [None]:
clicks_pred_df = pd.DataFrame(pred_df_clicks.add_suffix("_clicks"), columns=["labels"]).reset_index()
orders_pred_df = pd.DataFrame(pred_df_buys.add_suffix("_orders"), columns=["labels"]).reset_index()
carts_pred_df = pd.DataFrame(pred_df_buys.add_suffix("_carts"), columns=["labels"]).reset_index()

In [None]:
pred_df = pd.concat([clicks_pred_df, orders_pred_df, carts_pred_df])
pred_df.columns = ["session_type", "labels"]
pred_df["labels"] = pred_df.labels.apply(lambda x: " ".join(map(str,x)))
pred_df.to_csv("submission.csv", index=False)
pred_df.head()

# Validation

In [None]:
def load_test():    
    dfs = []
    for e, chunk_file in enumerate(glob.glob('../dataset/otto-recommender-system/otto-validation/test_parquet/*')):
        chunk = pd.read_parquet(chunk_file)
        chunk.ts = (chunk.ts/1000).astype('int32')
        chunk['type'] = chunk['type'].map(type_labels).astype('int8')
        dfs.append(chunk)
    return pd.concat(dfs).reset_index(drop=True) #.astype({"ts": "datetime64[ms]"})

test_df = load_test()
print('Test data has shape',test_df.shape)
test_df.head()

In [None]:
%%time
pred_df_clicks = test_df.sort_values(["session", "ts"]).groupby(["session"]).apply(
    lambda x: suggest_clicks(x)
)

pred_df_buys = test_df.sort_values(["session", "ts"]).groupby(["session"]).apply(
    lambda x: suggest_buys(x)
)

In [None]:
clicks_pred_df = pd.DataFrame(pred_df_clicks.add_suffix("_clicks"), columns=["labels"]).reset_index()
orders_pred_df = pd.DataFrame(pred_df_buys.add_suffix("_orders"), columns=["labels"]).reset_index()
carts_pred_df = pd.DataFrame(pred_df_buys.add_suffix("_carts"), columns=["labels"]).reset_index()

In [None]:
pred_df = pd.concat([clicks_pred_df, orders_pred_df, carts_pred_df])
pred_df.columns = ["session_type", "labels"]
pred_df["labels"] = pred_df.labels.apply(lambda x: " ".join(map(str,x)))
pred_df.to_csv("submission.csv", index=False)
pred_df.head()

In [18]:
# FREE MEMORY
del pred_df_clicks, pred_df_buys, clicks_pred_df, orders_pred_df, carts_pred_df
del top_20_clicks, top_20_buy2buy, top_20_buys, top_clicks, top_orders, test_df
_ = gc.collect()

In [33]:
# %%time
# COMPUTE METRIC
score = 0
weights = {'clicks': 0.10, 'carts': 0.30, 'orders': 0.60}
for t in ['clicks','carts','orders']:
    sub = pred_df.loc[pred_df.session_type.str.contains(t)].copy()
    sub['session'] = sub.session_type.apply(lambda x: int(x.split('_')[0]))
    sub.labels = sub.labels.apply(lambda x: [int(i) for i in x.split(' ')[:20]])
    test_labels = pd.read_parquet('../dataset/otto-recommender-system/otto-validation/test_labels.parquet')
    test_labels = test_labels.loc[test_labels['type']==t]
    test_labels = test_labels.merge(sub, how='left', on=['session'])
    test_labels['hits'] = test_labels.apply(lambda df: len(set(df.ground_truth).intersection(set(df.labels))), axis=1)
    test_labels['gt_count'] = test_labels.ground_truth.str.len().clip(0,20)
    recall = test_labels['hits'].sum() / test_labels['gt_count'].sum()
    score += weights[t]*recall
    print(f'{t} recall =',recall)
    
print('=============')
print('Overall Recall =',score)
print('=============')

clicks recall = 0.5912166896226447
carts recall = 0.4983971745865439
orders recall = 0.6989974561367112
Overall Recall = 0.6280392950202544
