In [1]:
import os, numpy as np
os.environ["CUDA_VISIBLE_DEVICES"]="0"
import cudf, glob, gc, pickle

VER = 232
#PART = 'a'

In [2]:
files = glob.glob('../../data/train_data/*_parquet/*')
len( files )

120

In [3]:
files[:4]

['/raid/Kaggle/otto/valid/test_parquet/004.parquet',
 '/raid/Kaggle/otto/valid/test_parquet/019.parquet',
 '/raid/Kaggle/otto/valid/test_parquet/008.parquet',
 '/raid/Kaggle/otto/valid/test_parquet/011.parquet']

In [4]:
type_weight = {0:1, 1:1, 2:1}

In [5]:
def read_file(f):
    df = cudf.read_parquet(f)
    #df['d'] = cudf.to_datetime(df.ts*1e9).dt.day.astype('int8')
    #df.ts = (df.ts/1000).astype('int32')
    #df['type'] = df['type'].map(type_labels).astype('int8')
    return df

In [6]:
%%time
PIECES = 1
SIZE = 1.86e6/PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(PIECES):
    print()
    print('### PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for a,b in [(0,20),(20,40),(40,60),(60,80),(80,100),(100,120)]:
        print(f'Processing {b-a} files...')
        
        # => INNER CHUNKS
        READ_CT = 1
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,True])
            df['k'] = np.arange(len(df))
            # USE TAIL OF SESSION
            #df = df.reset_index(drop=True)
            #df['n'] = df.groupby('session').cumcount()
            #df = df.loc[df.n<100].drop('n',axis=1)
            # CREATE PAIRS
            df = df.merge(df.drop_duplicates(['session','aid']),on=['session'])
            df = df.loc[ ((df.k_y - df.k_x).abs()>=1) & ((df.k_y - df.k_x).abs()<=3) & (df.aid_x != df.aid_y) ]
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            #df = df.sort_values('ts_x',ascending=False)
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','ts_x','ts_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            w = (1/2)**( (df.ts_x - df.ts_y).abs() /60/60)
            df['wgt'] = w #df.type_y.map(type_weight)            
            #df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
            
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<80].drop('n',axis=1)
    # SAVE PART TO DISK
    df = tmp.to_pandas().groupby('aid_x').aid_y.apply(list)
    with open(f'../../data/covisit_matrices/top_40_aids_v{VER}_{PART}.pkl', 'wb') as f:
        pickle.dump(df.to_dict(), f)
        
# Wall time: 1min 48s
# Wall time: 1min 39s


### PART 1
Processing 20 files...
0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 , 
Processing 20 files...
20 , 21 , 22 , 23 , 24 , 25 , 26 , 27 , 28 , 29 , 30 , 31 , 32 , 33 , 34 , 35 , 36 , 37 , 38 , 39 , 
Processing 20 files...
40 , 41 , 42 , 43 , 44 , 45 , 46 , 47 , 48 , 49 , 50 , 51 , 52 , 53 , 54 , 55 , 56 , 57 , 58 , 59 , 
Processing 20 files...
60 , 61 , 62 , 63 , 64 , 65 , 66 , 67 , 68 , 69 , 70 , 71 , 72 , 73 , 74 , 75 , 76 , 77 , 78 , 79 , 
Processing 20 files...
80 , 81 , 82 , 83 , 84 , 85 , 86 , 87 , 88 , 89 , 90 , 91 , 92 , 93 , 94 , 95 , 96 , 97 , 98 , 99 , 
Processing 20 files...
100 , 101 , 102 , 103 , 104 , 105 , 106 , 107 , 108 , 109 , 110 , 111 , 112 , 113 , 114 , 115 , 116 , 117 , 118 , 119 , 
CPU times: user 1min 28s, sys: 42.7 s, total: 2min 11s
Wall time: 2min 14s
