In [48]:
# reset test입니다.

In [1]:
import os
import gc
import heapq
import pickle
import numba as nb
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

tail = 30
parallel = 1024
topn = 20
ops_weights = np.array([1.0, 6.0, 3.0])
OP_WEIGHT = 0; TIME_WEIGHT = 1
parallel = 1024
test_ops_weights = np.array([1.0, 6.0, 3.0])

In [2]:
path = 'C:/Users/ghtyu/OneDrive/Desktop/OTTO/otto_data/LB_574_Dataset_Tony'
df = pd.read_csv(os.path.join(path,"train.csv"))
df_test = pd.read_csv(os.path.join(path,"test.csv"))
df = pd.concat([df, df_test]).reset_index(drop = True)
npz = np.load(os.path.join(path,"train.npz"))
npz_test = np.load(os.path.join(path,"test.npz"))
aids = np.concatenate([npz['aids'], npz_test['aids']])
ts = np.concatenate([npz['ts'], npz_test['ts']])
ops = np.concatenate([npz['ops'], npz_test['ops']])

df["idx"] = np.cumsum(df.length) - df.length  # length
df["end_time"] = df.start_time + ts[df.idx + df.length - 1]

In [3]:
df.head(50)  # session / start_time / length / idx / end_time

Unnamed: 0,session,start_time,length,idx,end_time
0,0,1659304800,276,0,1661684983
1,1,1659304800,32,276,1661714854
2,2,1659304800,33,308,1661714215
3,3,1659304800,226,341,1661109666
4,4,1659304800,19,567,1661586681
5,5,1659304800,15,586,1660348787
6,6,1659304800,204,601,1661549531
7,7,1659304800,23,805,1660538518
8,8,1659304800,4,828,1659304839
9,9,1659304800,7,832,1659648132


In [4]:
# get pair dict {(aid1, aid2): weight} for each session
# The maximum time span between two points is 1 day = 24 * 60 * 60 sec
@nb.jit(nopython = True, cache = True)
def get_single_pairs(pairs, aids, ts, ops, idx, length, start_time, ops_weights, mode):
    max_idx = idx + length
    min_idx = max(max_idx - tail, idx)
    for i in range(min_idx, max_idx):
        for j in range(i + 1, max_idx):
            if ts[j] - ts[i] >= 24 * 60 * 60: break
            if aids[i] == aids[j]: continue
            if mode == OP_WEIGHT:
                w1 = ops_weights[ops[j]]
                w2 = ops_weights[ops[i]]
            elif mode == TIME_WEIGHT:
                w1 = 1 + 3 * (ts[i] + start_time - 1659304800) / (1662328791 - 1659304800)
                w2 = 1 + 3 * (ts[j] + start_time - 1659304800) / (1662328791 - 1659304800)
            pairs[(aids[i], aids[j])] = w1
            pairs[(aids[j], aids[i])] = w2

# get pair dict of each session in parallel
# merge pairs into a nested dict format (cnt)
@nb.jit(nopython = True, parallel = True, cache = True)
def get_pairs(aids, ts, ops, row, cnts, ops_weights, mode):
    par_n = len(row)
    pairs = [{(0, 0): 0.0 for _ in range(0)} for _ in range(par_n)]
    for par_i in nb.prange(par_n):
        _, idx, length, start_time = row[par_i]
        get_single_pairs(pairs[par_i], aids, ts, ops, idx, length, start_time, ops_weights, mode)
    for par_i in range(par_n):
        for (aid1, aid2), w in pairs[par_i].items():
            if aid1 not in cnts: cnts[aid1] = {0: 0.0 for _ in range(0)}
            cnt = cnts[aid1]
            if aid2 not in cnt: cnt[aid2] = 0.0
            cnt[aid2] += w
    
# util function to get most common keys from a counter dict using min-heap
# overwrite == 1 means the later item with equal weight is more important
# otherwise, means the former item with equal weight is more important
# the result is ordered from higher weight to lower weight
@nb.jit(nopython = True, cache = True)
def heap_topk(cnt, overwrite, cap):
    q = [(0.0, 0, 0) for _ in range(0)]
    for i, (k, n) in enumerate(cnt.items()):
        if overwrite == 1:
            heapq.heappush(q, (n, i, k))   # heapq??
        else:
            heapq.heappush(q, (n, -i, k))
        if len(q) > cap:
            heapq.heappop(q)
    return [heapq.heappop(q)[2] for _ in range(len(q))][::-1]
   
# save top-k aid2 for each aid1's cnt
@nb.jit(nopython = True, cache = True)
def get_topk(cnts, topk, k):
    for aid1, cnt in cnts.items():
        topk[aid1] = np.array(heap_topk(cnt, 1, k))

numba function speed up python code and part of numpy code.
as I know, when we ran numba function, python interpreter does not step in but compiler do.

In [5]:
topks = {}

# for two modes
for mode in [OP_WEIGHT, TIME_WEIGHT]:
    # get nested counter
    cnts = nb.typed.Dict.empty(
        key_type = nb.types.int64,
        value_type = nb.typeof(nb.typed.Dict.empty(key_type = nb.types.int64, value_type = nb.types.float64)))
    max_idx = len(df)
    for idx in tqdm(range(0, max_idx, parallel)):
        row = df.iloc[idx:min(idx + parallel, max_idx)][['session', 'idx', 'length', 'start_time']].values
        get_pairs(aids, ts, ops, row, cnts, ops_weights, mode)

    # get topk from counter
    topk = nb.typed.Dict.empty(
            key_type = nb.types.int64,
            value_type = nb.types.int64[:])
    get_topk(cnts, topk, topn)

    del cnts; gc.collect()
    topks[mode] = topk

  0%|          | 0/14231 [00:00<?, ?it/s]

  0%|          | 0/14231 [00:00<?, ?it/s]

In [8]:
@nb.jit(nopython = True, cache = True)
def inference_(aids, ops, row, result, topk, test_ops_weights, seq_weight):
    for session, idx, length in row:
        unique_aids = nb.typed.Dict.empty(key_type = nb.types.int64, value_type = nb.types.float64)
        cnt = nb.typed.Dict.empty(key_type = nb.types.int64, value_type = nb.types.float64)
        
        candidates = aids[idx:idx + length][::-1]
        candidates_ops = ops[idx:idx + length][::-1]
        for a in candidates:
            unique_aids[a] = 0
                
        if len(unique_aids) >= 20:
            sequence_weight = np.power(2, np.linspace(seq_weight, 1, len(candidates)))[::-1] - 1
            for a, op, w in zip(candidates, candidates_ops, sequence_weight):
                if a not in cnt: cnt[a] = 0
                cnt[a] += w * test_ops_weights[op]
            result_candidates = heap_topk(cnt, 0, 20)
        else:
            result_candidates = list(unique_aids)
            for a in result_candidates:
                if a not in topk: continue
                for b in topk[a]:
                    if b in unique_aids: continue
                    if b not in cnt: cnt[b] = 0
                    cnt[b] += 1
            result_candidates.extend(heap_topk(cnt, 0, 20 - len(result_candidates)))
        result[session] = np.array(result_candidates)
        
@nb.jit(nopython = True)
def inference(aids, ops, row, 
              result_clicks, result_buy,
              topk_clicks, topk_buy,
              test_ops_weights):
    inference_(aids, ops, row, result_clicks, topk_clicks, test_ops_weights, 0.1)
    inference_(aids, ops, row, result_buy, topk_buy, test_ops_weights, 0.5)

In [10]:
# result place holder
result_clicks = nb.typed.Dict.empty(
    key_type = nb.types.int64,
    value_type = nb.types.int64[:])
result_buy = nb.typed.Dict.empty(
    key_type = nb.types.int64,
    value_type = nb.types.int64[:])
for idx in tqdm(range(len(df) - len(df_test), len(df), parallel)):
    row = df.iloc[idx:min(idx + parallel, len(df))][['session', 'idx', 'length']].values
    inference(aids, ops, row, result_clicks, result_buy, topks[TIME_WEIGHT], topks[OP_WEIGHT], test_ops_weights)

  0%|          | 0/1633 [00:00<?, ?it/s]

In [46]:
print(type(aids))  # product code
print(aids.shape)
aids[:50]

<class 'numpy.ndarray'>
(223644219,)


array([1517085, 1563459, 1309446,   16246, 1781822, 1152674, 1649869,
        461689,  305831,  461689,  362233, 1649869, 1649869,  984597,
       1649869,  803544, 1110941, 1190046, 1760685,  631008,  461689,
       1190046, 1650637,  313546, 1650637,  979517,  351157, 1062149,
       1157384, 1841388, 1469630,  305831, 1110548, 1110548,  305831,
       1650114, 1604396, 1009750, 1800933,  495779,  394655,  495779,
        789245,  789245,  366890,  361317, 1700164, 1755597,  789245,
        784978], dtype=int64)

In [47]:
print(type(ops))
print(ops.shape)
ops[:15]

<class 'numpy.ndarray'>
(223644219,)


array([0, 0, 0, 0, 0, 0, 1, 1, 2, 2, 0, 0, 0, 0, 0], dtype=uint8)

In [44]:
print(type(row))  # session, idx, length
print(row.shape)
row[:20]

<class 'numpy.ndarray'>
(635, 3)


array([[ 14570947, 223643030,         1],
       [ 14570948, 223643031,         5],
       [ 14570949, 223643036,         1],
       [ 14570950, 223643037,         2],
       [ 14570951, 223643039,         1],
       [ 14570952, 223643040,         2],
       [ 14570953, 223643042,         1],
       [ 14570954, 223643043,         6],
       [ 14570955, 223643049,         1],
       [ 14570956, 223643050,         3],
       [ 14570957, 223643053,         1],
       [ 14570958, 223643054,         1],
       [ 14570959, 223643055,         2],
       [ 14570960, 223643057,         1],
       [ 14570961, 223643058,         1],
       [ 14570962, 223643059,         3],
       [ 14570963, 223643062,         9],
       [ 14570964, 223643071,         4],
       [ 14570965, 223643075,         1],
       [ 14570966, 223643076,         1]], dtype=int64)

In [36]:
print(type(result_buy))
print(len(result_buy))
buy_df = pd.DataFrame(result_buy.items())
buy_df.head()

<class 'numba.typed.typeddict.Dict'>
1671803


Unnamed: 0,0,1
0,12899779,"[59625, 397451, 469285, 1253524, 737445, 17907..."
1,12899780,"[1142000, 736515, 973453, 582732, 1502122, 889..."
2,12899781,"[918667, 199008, 194067, 57315, 141736, 146057..."
3,12899782,"[834354, 779477, 595994, 740494, 889671, 98739..."
4,12899783,"[1817895, 607638, 1754419, 1216820, 1729553, 3..."


In [38]:
print(type(result_clicks))
print(len(result_clicks))
clicks_df = pd.DataFrame(result_clicks.items())
clicks_df.head()

<class 'numba.typed.typeddict.Dict'>
1671803


Unnamed: 0,0,1
0,12899779,"[59625, 1253524, 737445, 438191, 731692, 17907..."
1,12899780,"[1142000, 736515, 973453, 582732, 1502122, 889..."
2,12899781,"[918667, 199008, 194067, 57315, 141736, 146057..."
3,12899782,"[834354, 595994, 740494, 889671, 987399, 77947..."
4,12899783,"[1817895, 607638, 1754419, 1216820, 1729553, 3..."


In [39]:
print(type(topks[TIME_WEIGHT]))
print(len(topks[TIME_WEIGHT]))
time_weights_df = pd.DataFrame(topks[TIME_WEIGHT].items())
time_weights_df.head()

<class 'numba.typed.typeddict.Dict'>
1837169


Unnamed: 0,0,1
0,543308,"[723612, 423558, 589213, 138431, 798763, 75512..."
1,961113,"[216668, 1570779, 1024163, 1543589, 1569761, 2..."
2,883849,"[818923, 760363, 412500, 192094, 959954, 16820..."
3,701766,"[552456, 230088, 768033, 25475, 1095492, 17914..."
4,924751,"[1283527, 226042, 780500, 667322, 168206, 1059..."


In [40]:
print(type(topks[OP_WEIGHT]))
print(len(topks[OP_WEIGHT]))
op_weight_df = pd.DataFrame(topks[OP_WEIGHT].items())
op_weight_df.head()

<class 'numba.typed.typeddict.Dict'>
1837169


Unnamed: 0,0,1
0,543308,"[723612, 423558, 589213, 138431, 798763, 53730..."
1,961113,"[216668, 1543589, 1570779, 234245, 1024163, 18..."
2,883849,"[818923, 412500, 760363, 192094, 959954, 79432..."
3,701766,"[552456, 1600159, 230088, 768033, 1095492, 179..."
4,924751,"[780500, 226042, 1283527, 667322, 1059726, 168..."


In [42]:
print(type(result))
print(len(result))
result_df = pd.DataFrame(result.items())
result_df.head()

<class 'numba.typed.typeddict.Dict'>
1671803


Unnamed: 0,0,1
0,12899779,"[59625, 397451, 469285, 1253524, 737445, 17907..."
1,12899780,"[1142000, 736515, 973453, 582732, 1502122, 889..."
2,12899781,"[918667, 199008, 194067, 57315, 141736, 146057..."
3,12899782,"[834354, 779477, 595994, 740494, 889671, 98739..."
4,12899783,"[1817895, 607638, 1754419, 1216820, 1729553, 3..."


In [11]:
subs = []
op_names = ["clicks", "carts", "orders"]
for result, op in zip([result_clicks, result_buy, result_buy], op_names):

    sub = pd.DataFrame({"session_type": result.keys(), "labels": result.values()})
    sub.session_type = sub.session_type.astype(str) + f"_{op}"
    sub.labels = sub.labels.apply(lambda x: " ".join(x.astype(str)))
    subs.append(sub)
    


In [12]:
sub.head()

Unnamed: 0,session_type,labels
0,12899779_orders,59625 397451 469285 1253524 737445 1790770 731...
1,12899780_orders,1142000 736515 973453 582732 1502122 889686 48...
2,12899781_orders,918667 199008 194067 57315 141736 1460571 7594...
3,12899782_orders,834354 779477 595994 740494 889671 987399 4760...
4,12899783_orders,1817895 607638 1754419 1216820 1729553 300127 ...


In [75]:
print(sub.shape)
type(sub)

(1671803, 2)


pandas.core.frame.DataFrame

In [76]:
sub = pd.concat(subs).reset_index(drop = True)
sub.to_csv('submission_574.csv', index = False)
sub.head()

TypeError: __init__() got an unexpected keyword argument 'line_terminator'

## 576 CODE

In [1]:
VER = 6

import pandas as pd, numpy as np
from tqdm.notebook import tqdm
import os, sys, pickle, glob, gc
from collections import Counter
import itertools
#print('We will use RAPIDS version',cudf.__version__)

In [21]:
import numba as nb

In [2]:
%%time
# CACHE FUNCTIONS
def read_file(f):
    return pd.DataFrame( data_cache[f] )
def read_file_to_cache(f):
    df = pd.read_parquet(f)
    df.ts = (df.ts/1000).astype('int32')
    df['type'] = df['type'].map(type_labels).astype('int8')
    return df

# CACHE THE DATA ON CPU BEFORE PROCESSING ON GPU
data_cache = {}
type_labels = {'clicks':0, 'carts':1, 'orders':2}
files = glob.glob('input/otto-chunk-data-inparquet-format/*_parquet/*')
for f in files: data_cache[f] = read_file_to_cache(f)

# CHUNK PARAMETERS
READ_CT = 5
CHUNK = int( np.ceil( len(files)/6 ))
print(f'We will process {len(files)} files, in groups of {READ_CT} and chunks of {CHUNK}.')


We will process 17 files, in groups of 5 and chunks of 3.
CPU times: total: 938 ms
Wall time: 711 ms


In [3]:
%%time
type_weight = {0:1, 1:5, 2:4}

# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = pd.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y', 'type_y'])
            df['wgt'] = df.type_y.map(type_weight)
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        
        print()
        
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()

    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<15].drop('n',axis=1)
    
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    #tmp.to_pandas().to_parquet(f'top_15_carts_orders_v{VER}_{PART}.pqt')
    tmp.to_parquet(f'top_15_carts_orders_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 2 in groups of 5...
0 , 
Processing files 3 thru 5 in groups of 5...
3 , 
Processing files 6 thru 8 in groups of 5...
6 , 
Processing files 9 thru 11 in groups of 5...
9 , 
Processing files 12 thru 14 in groups of 5...
12 , 
Processing files 15 thru 16 in groups of 5...
15 , 

### DISK PART 2
Processing files 0 thru 2 in groups of 5...
0 , 
Processing files 3 thru 5 in groups of 5...
3 , 
Processing files 6 thru 8 in groups of 5...
6 , 
Processing files 9 thru 11 in groups of 5...
9 , 
Processing files 12 thru 14 in groups of 5...
12 , 
Processing files 15 thru 16 in groups of 5...
15 , 

### DISK PART 3
Processing files 0 thru 2 in groups of 5...
0 , 
Processing files 3 thru 5 in groups of 5...
3 , 
Processing files 6 thru 8 in groups of 5...
6 , 
Processing files 9 thru 11 in groups of 5...
9 , 
Processing files 12 thru 14 in groups of 5...
12 , 
Processing files 15 thru 16 in groups of 5...
15 , 

### DISK PART 4
Processing files 0 thru 2 in 

In [4]:
%%time
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 1
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = pd.concat(df,ignore_index=True,axis=0)
            df = df.loc[df['type'].isin([1,2])] # ONLY WANT CARTS AND ORDERS
            df = df.sort_values(['session','ts'],ascending=[True,False])
            
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 14 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ] # 14 DAYS
            
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y', 'type_y'])
            df['wgt'] = 1
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')

        print()
        
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()

    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<15].drop('n',axis=1)
    
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_parquet(f'top_15_buy2buy_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 2 in groups of 5...
0 , 
Processing files 3 thru 5 in groups of 5...
3 , 
Processing files 6 thru 8 in groups of 5...
6 , 
Processing files 9 thru 11 in groups of 5...
9 , 
Processing files 12 thru 14 in groups of 5...
12 , 
Processing files 15 thru 16 in groups of 5...
15 , 
CPU times: total: 6.64 s
Wall time: 6.63 s


In [5]:
%%time
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = pd.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
            # 1659304800 : minimum timestamp
            # 1662328791 : maximum timestamp
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()

    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<20].drop('n',axis=1)
    
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_parquet(f'top_20_clicks_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 2 in groups of 5...
0 , 
Processing files 3 thru 5 in groups of 5...
3 , 
Processing files 6 thru 8 in groups of 5...
6 , 
Processing files 9 thru 11 in groups of 5...
9 , 
Processing files 12 thru 14 in groups of 5...
12 , 
Processing files 15 thru 16 in groups of 5...
15 , 

### DISK PART 2
Processing files 0 thru 2 in groups of 5...
0 , 
Processing files 3 thru 5 in groups of 5...
3 , 
Processing files 6 thru 8 in groups of 5...
6 , 
Processing files 9 thru 11 in groups of 5...
9 , 
Processing files 12 thru 14 in groups of 5...
12 , 
Processing files 15 thru 16 in groups of 5...
15 , 

### DISK PART 3
Processing files 0 thru 2 in groups of 5...
0 , 
Processing files 3 thru 5 in groups of 5...
3 , 
Processing files 6 thru 8 in groups of 5...
6 , 
Processing files 9 thru 11 in groups of 5...
9 , 
Processing files 12 thru 14 in groups of 5...
12 , 
Processing files 15 thru 16 in groups of 5...
15 , 

### DISK PART 4
Processing files 0 thru 2 in 

In [6]:
# FREE MEMORY
del data_cache, tmp
_ = gc.collect()

In [7]:
def load_test():    
    dfs = []
    for e, chunk_file in enumerate(glob.glob('C:/Users/ghtyu/OneDrive/Desktop/OTTO/otto_data/LB_576_Dataset_Tony/test_parquet/*')):
        chunk = pd.read_parquet(chunk_file)
        chunk.ts = (chunk.ts/1000).astype('int32')
        chunk['type'] = chunk['type'].map(type_labels).astype('int8')
        dfs.append(chunk)
    return pd.concat(dfs).reset_index(drop=True) #.astype({"ts": "datetime64[ms]"})

test_df = load_test()
print('Test data has shape',test_df.shape)
test_df.head()

Test data has shape (6928123, 4)


Unnamed: 0,session,aid,ts,type
0,12899779,59625,1661724000,0
1,12899780,1142000,1661724000,0
2,12899780,582732,1661724058,0
3,12899780,973453,1661724109,0
4,12899780,736515,1661724136,0


In [8]:
%%time
def pqt_to_dict(df):
    return df.groupby('aid_x').aid_y.apply(list).to_dict()

# LOAD THREE CO-VISITATION MATRICES
top_20_clicks = pqt_to_dict( pd.read_parquet(f'top_20_clicks_v{VER}_0.pqt') )

for k in range(1,DISK_PIECES): 
    top_20_clicks.update( pqt_to_dict( pd.read_parquet(f'top_20_clicks_v{VER}_{k}.pqt') ) )


top_20_buys = pqt_to_dict( pd.read_parquet(f'top_15_carts_orders_v{VER}_0.pqt') )

for k in range(1,DISK_PIECES): 
    top_20_buys.update( pqt_to_dict( pd.read_parquet(f'top_15_carts_orders_v{VER}_{k}.pqt') ) )

top_20_buy2buy = pqt_to_dict( pd.read_parquet(f'top_15_buy2buy_v{VER}_0.pqt') )

# TOP CLICKS AND ORDERS IN TEST
#top_clicks = test_df.loc[test_df['type']=='clicks','aid'].value_counts().index.values[:20]
#top_orders = test_df.loc[test_df['type']=='orders','aid'].value_counts().index.values[:20]

print('Here are size of our 3 co-visitation matrices:')
print( len( top_20_clicks ), len( top_20_buy2buy ), len( top_20_buys ) )

Here are size of our 3 co-visitation matrices:
691799 168826 691799
CPU times: total: 24.2 s
Wall time: 23.9 s


In [9]:
top_clicks = test_df.loc[test_df['type']== 0,'aid'].value_counts().index.values[:20] 
top_carts = test_df.loc[test_df['type']== 1,'aid'].value_counts().index.values[:20]
top_orders = test_df.loc[test_df['type']== 2,'aid'].value_counts().index.values[:20]

In [10]:
#type_weight_multipliers = {'clicks': 1, 'carts': 5, 'orders': 4}
type_weight_multipliers = {0: 1, 1: 5, 2: 4}

def suggest_clicks(df):
    # USER HISTORY AIDS AND TYPES
    aids=df.aid.tolist()
    types = df.type.tolist()
    unique_aids = list(dict.fromkeys(aids[::-1] ))
    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=20:
        weights=np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return sorted_aids
    # USE "CLICKS" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[top_20_clicks[aid] for aid in unique_aids if aid in top_20_clicks]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2).most_common(20) if aid2 not in unique_aids]    
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    # USE TOP20 TEST CLICKS
    return result + list(top_clicks)[:20-len(result)]

In [11]:
def suggest_carts(df):
    # User history aids and types
    aids = df.aid.tolist()
    types = df.type.tolist()
    
    # UNIQUE AIDS AND UNIQUE BUYS
    unique_aids = list(dict.fromkeys(aids[::-1] ))
    df = df.loc[(df['type'] == 0)|(df['type'] == 1)]
    unique_buys = list(dict.fromkeys(df.aid.tolist()[::-1]))
    
    # Rerank candidates using weights
    if len(unique_aids) >= 20:
        weights=np.logspace(0.5,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        
        # Rerank based on repeat items and types of items
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        
        # Rerank candidates using"top_20_carts" co-visitation matrix
        aids2 = list(itertools.chain(*[top_20_buys[aid] for aid in unique_buys if aid in top_20_buys]))
        for aid in aids2: aids_temp[aid] += 0.1
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return sorted_aids
    
    # Use "cart order" and "clicks" co-visitation matrices
    aids1 = list(itertools.chain(*[top_20_clicks[aid] for aid in unique_aids if aid in top_20_clicks]))
    aids2 = list(itertools.chain(*[top_20_buys[aid] for aid in unique_aids if aid in top_20_buys]))
    
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids1+aids2).most_common(20) if aid2 not in unique_aids] 
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    
    # USE TOP20 TEST ORDERS
    return result + list(top_carts)[:20-len(result)]

In [12]:
def suggest_buys(df):
    # USER HISTORY AIDS AND TYPES
    aids=df.aid.tolist()
    types = df.type.tolist()
    # UNIQUE AIDS AND UNIQUE BUYS
    unique_aids = list(dict.fromkeys(aids[::-1] ))
    df = df.loc[(df['type']==1)|(df['type']==2)]
    unique_buys = list(dict.fromkeys( df.aid.tolist()[::-1] ))
    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=20:
        weights=np.logspace(0.5,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        # RERANK CANDIDATES USING "BUY2BUY" CO-VISITATION MATRIX
        aids3 = list(itertools.chain(*[top_20_buy2buy[aid] for aid in unique_buys if aid in top_20_buy2buy]))
        for aid in aids3: aids_temp[aid] += 0.1
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return sorted_aids
    # USE "CART ORDER" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[top_20_buys[aid] for aid in unique_aids if aid in top_20_buys]))
    # USE "BUY2BUY" CO-VISITATION MATRIX
    aids3 = list(itertools.chain(*[top_20_buy2buy[aid] for aid in unique_buys if aid in top_20_buy2buy]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2+aids3).most_common(20) if aid2 not in unique_aids] 
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    # USE TOP20 TEST ORDERS
    return result + list(top_orders)[:20-len(result)]

In [13]:
%%time

pred_df_clicks = test_df.sort_values(["session", "ts"]).groupby(["session"]).apply(
    lambda x: suggest_clicks(x)
)

pred_df_carts = test_df.sort_values(["session", "ts"]).groupby(["session"]).apply(
    lambda x: suggest_carts(x)
)

pred_df_buys = test_df.sort_values(["session", "ts"]).groupby(["session"]).apply(
    lambda x: suggest_buys(x)
)

CPU times: total: 22min 38s
Wall time: 22min 38s


In [14]:
clicks_pred_df = pd.DataFrame(pred_df_clicks.add_suffix("_clicks"), columns=["labels"]).reset_index()
orders_pred_df = pd.DataFrame(pred_df_buys.add_suffix("_orders"), columns=["labels"]).reset_index()
carts_pred_df = pd.DataFrame(pred_df_carts.add_suffix("_carts"), columns=["labels"]).reset_index()

In [15]:
pred_df = pd.concat([clicks_pred_df, orders_pred_df, carts_pred_df])
pred_df.columns = ["session_type", "labels"]
pred_df["labels"] = pred_df.labels.apply(lambda x: " ".join(map(str,x)))


In [16]:
pred_df.head()

Unnamed: 0,session_type,labels
0,12899779_clicks,59625 1460571 485256 108125 986164 1551213 754...
1,12899780_clicks,1142000 736515 973453 582732 889686 1758603 12...
2,12899781_clicks,918667 199008 194067 57315 141736 754412 14605...
3,12899782_clicks,834354 595994 740494 889671 987399 779477 8291...
4,12899783_clicks,1817895 607638 1754419 1216820 1729553 300127 ...


In [17]:
print(pred_df.shape)
type(pred_df)

(5015409, 2)


pandas.core.frame.DataFrame

In [18]:
pred_df.to_csv("submission_576.csv", index=False)
pred_df.head()

Unnamed: 0,session_type,labels
0,12899779_clicks,59625 1460571 485256 108125 986164 1551213 754...
1,12899780_clicks,1142000 736515 973453 582732 889686 1758603 12...
2,12899781_clicks,918667 199008 194067 57315 141736 754412 14605...
3,12899782_clicks,834354 595994 740494 889671 987399 779477 8291...
4,12899783_clicks,1817895 607638 1754419 1216820 1729553 300127 ...
