In [1]:
import os

os.environ["CUDA_VISIBLE_DEVICES"]="0"

In [2]:
!nvidia-smi

Tue Dec 27 18:51:53 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.119.04   Driver Version: 450.119.04   CUDA Version: 11.7     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla V100-SXM2...  On   | 00000000:06:00.0 Off |                    0 |
| N/A   37C    P0    43W / 163W |      0MiB / 32510MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla V100-SXM2...  On   | 00000000:07:00.0 Off |                    0 |
| N/A   39C    P0    44W / 163W |      0MiB / 32510MiB |      0%      Default |
|       

In [3]:
import cudf
import glob
import gc

from tqdm import tqdm

In [4]:
lb = False
path = '../../data/'
type_weight = {0:1, 1:6, 2:3}
no_files = 5
    
df_type = cudf.DataFrame({
    'type': ['clicks', 'carts', 'orders'],
    'type_': [0, 1, 2]
})

def list_in_chunks(files, no_chunks=10):
    out = [[] for _ in range(no_chunks)]
    for i, file in enumerate(files):
        out[i%no_chunks].append(file)
    return(out)

In [5]:
# !rm -rf ./data/
!mkdir -p ./data/sub/

In [6]:
files = sorted(
    glob.glob(path + '/train/interim/*.parquet')
)
files_split = [glob.glob('../../data/test.parquet')] + list_in_chunks(files, no_chunks=len(files)//no_files)

In [7]:
len(files_split)

26

### 1) "Carts Orders" Co-visitation Matrix - Type Weighted

In [8]:
!rm -r ./data/tmp
!mkdir -p ./data/tmp/split

In [9]:
out = []
for e, file in tqdm(enumerate(files_split)):
    df = cudf.read_parquet(file)
    df = df.merge(
        df_type,
        how='left',
        on='type'
    )
    df['session'] = df['session'].astype('int32')
    df['aid'] = df['aid'].astype('int32')
    df.ts = (df.ts/1000).astype('int32')
    df.drop(['type'], axis=1, inplace=True)
    df = df.rename(columns={'type_': 'type'})
    df = df.sort_values(['session','ts'],ascending=[True,False])
    # USE TAIL OF SESSION
    df = df.reset_index(drop=True)
    df['n'] = df.groupby('session').cumcount()
    df = df.loc[df.n<30].drop('n',axis=1)
    df = df.merge(
        df, 
        how='left',
        on='session'
    )
    gc.collect()
    df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
    gc.collect()
    # ASSIGN WEIGHTS
    df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
    gc.collect()
    df['wgt'] = df.type_y.map(type_weight)
    df = df[['aid_x','aid_y','wgt']]
    gc.collect()
    df.wgt = df.wgt.astype('float32')
    df = df.groupby(['aid_x','aid_y']).wgt.sum().reset_index()
    df.to_parquet('./data/tmp/split/split_' + str(e) + '.parquet')
    del df
    gc.collect()

26it [00:45,  1.76s/it]


In [11]:
import dask as dask, dask_cudf
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

import glob

cluster = LocalCUDACluster()

files = glob.glob('./data/tmp/split/split_*.parquet')
ddf = dask_cudf.read_parquet(files)
ddf = ddf.groupby(['aid_x', 'aid_y']).wgt.sum()
df = ddf.compute()

2022-12-27 18:53:56,917 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


In [12]:
df = df.reset_index()
df = df.sort_values(['aid_x','wgt'],ascending=[True,False])
df = df.reset_index(drop=True)
gc.collect()
df['n'] = df.groupby('aid_x').aid_y.cumcount()
df = df.loc[df.n<15].drop('n',axis=1)
df.to_parquet('./data/sub/top_15_carts_orders_v3.parquet')

In [13]:
del df, out
gc.collect()

48

### 2)  "Buy2Buy" Co-visitation Matrix

In [7]:
out = []
for e, file in tqdm(enumerate(files_split)):
    df = cudf.read_parquet(file)
    df = df.merge(
        df_type,
        how='left',
        on='type'
    )
    df['session'] = df['session'].astype('int32')
    df['aid'] = df['aid'].astype('int32')
    df.ts = (df.ts/1000).astype('int32')
    df.drop(['type'], axis=1, inplace=True)
    df = df.rename(columns={'type_': 'type'})
    df = df.loc[df['type'].isin([1,2])]
    df = df.sort_values(['session','ts'],ascending=[True,False])
    # USE TAIL OF SESSION
    df = df.reset_index(drop=True)
    df['n'] = df.groupby('session').cumcount()
    df = df.loc[df.n<30].drop('n',axis=1)
    df = df.merge(
        df, 
        how='left',
        on='session'
    )
    gc.collect()
    df = df.loc[ ((df.ts_x - df.ts_y).abs()< 14 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
    gc.collect()
    # ASSIGN WEIGHTS
    df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
    df['wgt'] = 1
    df = df[['aid_x','aid_y','wgt']]
    df.wgt = df.wgt.astype('float32')
    df = df.groupby(['aid_x','aid_y']).wgt.sum()
    out.append(df.to_pandas())
    del df
    gc.collect()

26it [00:54,  2.11s/it]


In [8]:
df = cudf.concat([cudf.from_pandas(x) for x in out])
gc.collect()
df = df.reset_index().groupby(['aid_x','aid_y']).wgt.sum()
gc.collect()
df = df.reset_index()
df = df.sort_values(['aid_x','wgt'],ascending=[True,False])
df = df.reset_index(drop=True)
gc.collect()
df['n'] = df.groupby('aid_x').aid_y.cumcount()
df = df.loc[df.n<15].drop('n',axis=1)
df.to_parquet('./data/sub/top_15_buy2buy_v3.parquet')

In [9]:
del out, df
gc.collect()

48

### 3) "Clicks" Co-visitation Matrix - Time Weighted

In [8]:
!mkdir -p ./data/tmp/split/

In [9]:
out = []
for e, file in tqdm(enumerate(files_split)):
    df = cudf.read_parquet(file)
    df = df.merge(
        df_type,
        how='left',
        on='type'
    )
    df['session'] = df['session'].astype('int32')
    df['aid'] = df['aid'].astype('int32')
    df.ts = (df.ts/1000).astype('int32')
    df.drop(['type'], axis=1, inplace=True)
    df = df.rename(columns={'type_': 'type'})
    df = df.sort_values(['session','ts'],ascending=[True,False])
    # USE TAIL OF SESSION
    df = df.reset_index(drop=True)
    df['n'] = df.groupby('session').cumcount()
    df = df.loc[df.n<30].drop('n',axis=1)
    df = df.merge(
        df, 
        how='left',
        on='session'
    )
    gc.collect()
    df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
    gc.collect()
    # ASSIGN WEIGHTS
    df = df[['session', 'aid_x', 'aid_y','type_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
    gc.collect()
    df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
    df = df[['aid_x','aid_y','wgt']]
    gc.collect()
    df.wgt = df.wgt.astype('float32')
    df = df.groupby(['aid_x','aid_y']).wgt.sum().reset_index()
    df.to_parquet('./data/tmp/split/split_' + str(e) + '.parquet')
    del df
    gc.collect()

26it [00:45,  1.75s/it]


In [1]:
import dask as dask, dask_cudf
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

import glob

cluster = LocalCUDACluster()

files = glob.glob('./data/tmp/split/split_*.parquet')
ddf = dask_cudf.read_parquet(files)
ddf = ddf.groupby(['aid_x', 'aid_y']).wgt.sum()
df = ddf.compute()

In [11]:
df.head()

aid_x    aid_y  
789261   1264588    75.119576
1445014  1255942     2.433980
593721   1710383     2.274707
1305442  517499      2.019257
372644   1191428     2.342825
Name: wgt, dtype: float32

In [12]:
df = df.reset_index()
df = df.sort_values(['aid_x','wgt'],ascending=[True,False])
df = df.reset_index(drop=True)

NameError: name 'gc' is not defined

In [13]:
import gc
gc.collect()

685

In [14]:
df['n'] = df.groupby('aid_x').aid_y.cumcount()
df = df.loc[df.n<20].drop('n',axis=1)
df.to_parquet('./data/sub/top_20_clicks_v3.parquet')