In [1]:
import numpy as np
import pandas as pd
# import polars as pl
import cudf
from collections import Counter
import itertools
import gc

from pathlib import Path
from contextlib import contextmanager
import math
import os
import subprocess
import sys
import time
import psutil
import torch


cudf.set_option("default_integer_bitwidth", 32)
cudf.set_option("default_float_bitwidth", 32)

In [2]:
# config
INPUT_DIR = Path("../../input")
EXP_ID = "007"

In [3]:
# =================================
# Utils
# =================================
def get_input_data(input_dir: Path, phase: str):
    type_labels = {"clicks": 0, "carts": 1, "orders": 2}
    
    dfs = []
    for path in sorted(list(input_dir.glob(f"{phase}_parquet/*.parquet"))):
        chunk = cudf.read_parquet(path)
        chunk["session"] = chunk["session"].astype("int32")
        chunk["aid"] = chunk["aid"].astype("int32")
        chunk["ts"] = (chunk["ts"] / 1000).astype("int32")
        chunk["type"] = chunk["type"].map(type_labels).astype("int8")
        dfs.append(chunk)
    
    return cudf.concat(dfs, axis=0, ignore_index=True)


def get_gpu_memory(cmd_path="nvidia-smi",
                   target_properties=("memory.total", "memory.used")):
    """
    ref: https://www.12-technology.com/2022/01/pythongpu.html
    Returns
    -------
    gpu_total : ndarray,  "memory.total"
    gpu_used: ndarray, "memory.used"
    """

    # format option
    format_option = "--format=csv,noheader,nounits"

    cmd = '%s --query-gpu=%s %s' % (cmd_path, ','.join(target_properties), format_option)

    # Command execution in sub-processes
    cmd_res = subprocess.check_output(cmd, shell=True)

    gpu_lines = cmd_res.decode().split('\n')[0].split(', ')

    gpu_total = int(gpu_lines[0]) / 1024
    gpu_used = int(gpu_lines[1]) / 1024

    gpu_total = np.round(gpu_used, 1)
    gpu_used = np.round(gpu_used, 1)
    return gpu_total, gpu_used


class Trace():
    cuda = torch.cuda.is_available()

    @contextmanager
    def timer(self, title):
        t0 = time.time()
        p = psutil.Process(os.getpid())
        cpu_m0 = p.memory_info().rss / 2. ** 30
        if self.cuda: gpu_m0 = get_gpu_memory()[0]
        yield
        cpu_m1 = p.memory_info().rss / 2. ** 30
        if self.cuda: gpu_m1 = get_gpu_memory()[0]

        cpu_delta = cpu_m1 - cpu_m0
        if self.cuda: gpu_delta = gpu_m1 - gpu_m0

        cpu_sign = '+' if cpu_delta >= 0 else '-'
        cpu_delta = math.fabs(cpu_delta)

        if self.cuda: gpu_sign = '+' if gpu_delta >= 0 else '-'
        if self.cuda: gpu_delta = math.fabs(gpu_delta)

        cpu_message = f'{cpu_m1:.1f}GB({cpu_sign}{cpu_delta:.1f}GB)'
        if self.cuda: gpu_message = f'{gpu_m1:.1f}GB({gpu_sign}{gpu_delta:.1f}GB)'

        if self.cuda:
            message = f"[cpu: {cpu_message}, gpu: {gpu_message}: {time.time() - t0:.1f}sec] {title} "
        else:
            message = f"[cpu: {cpu_message}: {time.time() - t0:.1f}sec] {title} "

        print(message, file=sys.stderr)
        
trace = Trace()

In [4]:
INPUT_DIR = Path("../../input")
files = sorted(list(INPUT_DIR.glob("train_parquet/*.parquet")))
CHUNK = int(np.ceil(len(files)/6))
READ_CT = 5
type_labels = {"clicks": 0, "carts": 1, "orders": 2}

In [5]:
# co-visitation matrix
def read_file(f):
    df = cudf.read_parquet(f)
    df["session"] = df["session"].astype("int32")
    df["aid"] = df["aid"].astype("int32")
    df["ts"] = (df["ts"] / 1000).astype("int32")
    df["type"] = df["type"].map(type_labels).astype("int8")
    
    return df


def get_type_weighted_co_visitation_matrix():
    # ref: https://www.kaggle.com/code/cdeotte/compute-validation-score-cv-565
    type_weight = {0: 1, 1: 6, 2: 3}
    # USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
    # OOM回避のため共起行列を分割して計算・保存
    DISK_PIECES = 4
    SIZE = 1.86e6 / DISK_PIECES
    # COMPUTE IN PARTS FOR MEMORY MANGEMENT
    for part in range(DISK_PIECES):
        print('### DISK PART', part+1)
        
        # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
        # 共起行列を2層のチャンクで計算
        # => OUTER CHUNKS
        for j in range(6):
            a = j * CHUNK
            b = min((j+1) * CHUNK, len(files))
            print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
            
            # => INNER CHUNKS
            # CHUNKを更に分割して計算
            for k in range(a, b, READ_CT):
                # READ FILE
                dfs = [read_file(files[k])]
                for i in range(1, READ_CT):
                    if k+i < b:
                        dfs.append(read_file(files[k+i]))
                
                df = cudf.concat(dfs, ignore_index=True, axis=0)
                df = df.sort_values(by=["session", "ts"], ascending=[True, False]) # cumcountで最新30行を抽出するためにsessionを降順にしている
                # USE TAIL OF SESSION
                df = df.reset_index(drop=True)
                df['n'] = df.groupby('session').cumcount()
                df = df.loc[df.n<30].drop('n',axis=1)
                # CREATE PAIRS
                df = df.merge(df, on='session')
                df = df.loc[((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y)]
                # MEMORY MANAGEMENT COMPUTE IN PARTS
                df = df.loc[(df.aid_x >= part*SIZE) & (df.aid_x < (part+1)*SIZE)]
                # ASSIGN WEIGHTS
                df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
                df['wgt'] = df.type_y.map(type_weight)
                df = df[['aid_x','aid_y','wgt']]
                df.wgt = df.wgt.astype('float32')
                df = df.groupby(['aid_x','aid_y']).wgt.sum()
                # COMBINE INNER CHUNKS
                if k==a:
                    tmp2 = df
                else:
                    tmp2 = tmp2.add(df, fill_value=0)
                    
             # COMBINE OUTER CHUNKS
            if a==0:
                tmp = tmp2
            else:
                tmp = tmp.add(tmp2, fill_value=0)
            
            del tmp2, df
            gc.collect()

        # CONVERT MATRIX TO DICTIONARY
        tmp = tmp.reset_index()
        tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
        # SAVE TOP 15
        tmp = tmp.reset_index(drop=True)
        tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
        tmp = tmp.loc[tmp.n<15].drop('n',axis=1)
        # SAVE PART TO DISK (convert to pandas first uses less memory)
        # tmp.to_pandas().to_parquet(f'top_15_carts_orders_exp{EXP_ID}_{part}.pqt')

In [16]:
with trace.timer("cart_co_matirx"):
    get_type_weighted_co_visitation_matrix()

### DISK PART 1
Processing files 0 thru 21 in groups of 5...


KeyboardInterrupt: 

In [6]:
def get_buy2buy_co_visitation_matrix():
    # ref: https://www.kaggle.com/code/cdeotte/compute-validation-score-cv-565
    # USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
    # OOM回避のため共起行列を分割して計算・保存
    DISK_PIECES = 1
    SIZE = 1.86e6 / DISK_PIECES
    # COMPUTE IN PARTS FOR MEMORY MANGEMENT
    for part in range(DISK_PIECES):
        print('### DISK PART', part+1)
        
        # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
        # 共起行列を2層のチャンクで計算
        # => OUTER CHUNKS
        for j in range(6):
            a = j * CHUNK
            b = min((j+1) * CHUNK, len(files))
            print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
            
            # => INNER CHUNKS
            # CHUNKを更に分割して計算
            for k in range(a, b, READ_CT):
                # READ FILE
                dfs = [read_file(files[k])]
                for i in range(1, READ_CT):
                    if k+i < b:
                        dfs.append(read_file(files[k+i]))
                
                df = cudf.concat(dfs, ignore_index=True, axis=0)
                df = df.loc[df["type"].isin([1, 2])] # ONLY WANT CARTS AND ORDERS
                df = df.sort_values(by=["session", "ts"], ascending=[True, False]) # cumcountで最新30行を抽出するためにsessionを降順にしている
                # USE TAIL OF SESSION
                df = df.reset_index(drop=True)
                df['n'] = df.groupby('session').cumcount()
                df = df.loc[df.n<30].drop('n',axis=1)
                # CREATE PAIRS
                df = df.merge(df, on='session')
                df = df.loc[((df.ts_x - df.ts_y).abs()< 14 * 24 * 60 * 60) & (df.aid_x != df.aid_y)] # 14days
                # MEMORY MANAGEMENT COMPUTE IN PARTS
                df = df.loc[(df.aid_x >= part*SIZE) & (df.aid_x < (part+1)*SIZE)]
                # ASSIGN WEIGHTS
                df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
                df['wgt'] = 1
                df = df[['aid_x','aid_y','wgt']]
                df.wgt = df.wgt.astype('float32')
                df = df.groupby(['aid_x','aid_y']).wgt.sum()
                # COMBINE INNER CHUNKS
                if k==a:
                    tmp2 = df
                else:
                    tmp2 = tmp2.add(df, fill_value=0)
                    
             # COMBINE OUTER CHUNKS
            if a==0:
                tmp = tmp2
            else:
                tmp = tmp.add(tmp2, fill_value=0)
            
            del tmp2, df
            gc.collect()

        # CONVERT MATRIX TO DICTIONARY
        tmp = tmp.reset_index()
        tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
        # SAVE TOP 15
        tmp = tmp.reset_index(drop=True)
        tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
        tmp = tmp.loc[tmp.n<15].drop('n',axis=1)
        # SAVE PART TO DISK (convert to pandas first uses less memory)
        # tmp.to_pandas().to_parquet(f'top_15_buy2buy_exp{EXP_ID}_{part}.pqt')
        
        return tmp

In [7]:
with trace.timer("buy_co_matirx"):
    mat = get_buy2buy_co_visitation_matrix()

### DISK PART 1
Processing files 0 thru 21 in groups of 5...
Processing files 22 thru 43 in groups of 5...
Processing files 44 thru 65 in groups of 5...
Processing files 66 thru 87 in groups of 5...
Processing files 88 thru 109 in groups of 5...
Processing files 110 thru 128 in groups of 5...


[cpu: 1.4GB(+0.9GB), gpu: 2.4GB(+2.1GB): 47.4sec] buy_co_matirx 


In [9]:
def get_clicks_co_visitation_matrix():
    # ref: https://www.kaggle.com/code/cdeotte/compute-validation-score-cv-565
    # USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
    # OOM回避のため共起行列を分割して計算・保存
    DISK_PIECES = 4
    SIZE = 1.86e6 / DISK_PIECES
    # COMPUTE IN PARTS FOR MEMORY MANGEMENT
    for part in range(DISK_PIECES):
        print('### DISK PART', part+1)
        
        # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
        # 共起行列を2層のチャンクで計算
        # => OUTER CHUNKS
        for j in range(6):
            a = j * CHUNK
            b = min((j+1) * CHUNK, len(files))
            print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
            
            # => INNER CHUNKS
            # CHUNKを更に分割して計算
            for k in range(a, b, READ_CT):
                # READ FILE
                dfs = [read_file(files[k])]
                for i in range(1, READ_CT):
                    if k+i < b:
                        dfs.append(read_file(files[k+i]))
                
                df = cudf.concat(dfs, ignore_index=True, axis=0)
                df = df.sort_values(by=["session", "ts"], ascending=[True, False]) # cumcountで最新30行を抽出するためにsessionを降順にしている
                # USE TAIL OF SESSION
                df = df.reset_index(drop=True)
                df['n'] = df.groupby('session').cumcount()
                df = df.loc[df.n<30].drop('n',axis=1)
                # CREATE PAIRS
                df = df.merge(df, on='session')
                df = df.loc[((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y)]
                # MEMORY MANAGEMENT COMPUTE IN PARTS
                df = df.loc[(df.aid_x >= part*SIZE) & (df.aid_x < (part+1)*SIZE)]
                # ASSIGN WEIGHTS
                df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
                df["wgt"] = 1 + 3 * (df["ts_x"] - 1659304800) / (1662328791 - 1659304800)
                df = df[['aid_x','aid_y','wgt']]
                df.wgt = df.wgt.astype('float32')
                df = df.groupby(['aid_x','aid_y']).wgt.sum()
                # COMBINE INNER CHUNKS
                if k==a:
                    tmp2 = df
                else:
                    tmp2 = tmp2.add(df, fill_value=0)
                    
             # COMBINE OUTER CHUNKS
            if a==0:
                tmp = tmp2
            else:
                tmp = tmp.add(tmp2, fill_value=0)
            
            del tmp2, df
            gc.collect()

        # CONVERT MATRIX TO DICTIONARY
        tmp = tmp.reset_index()
        tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
        # SAVE TOP 15
        tmp = tmp.reset_index(drop=True)
        tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
        tmp = tmp.loc[tmp.n<15].drop('n',axis=1)
        # SAVE PART TO DISK (convert to pandas first uses less memory)
        # tmp.to_pandas().to_parquet(f'top_15_clicks_exp{EXP_ID}_{part}.pqt')

In [20]:
with trace.timer("click_co_matirx"):
    get_clicks_co_visitation_matrix()

NameError: name 'get_clicks_co_visitation_matrix' is not defined

### Rerank

In [8]:
def suggest_clicks(df):
    # USE USER HISTORY AIDS AND TYPES
    aids = df["aid"].to_list()
    types = df["type"].to_list()
    unique_aids = list(dict.fromkeys(aids[::-1]))
    # RERANK CANDIDATES USING WEIGHTS
    if len(unique_aids)>=20:
        weights=np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
        aids_temp = Counter() 
        # RERANK BASED ON REPEAT ITEMS AND TYPE OF ITEMS
        for aid,w,t in zip(aids,weights,types): 
            aids_temp[aid] += w * type_weight_multipliers[t]
        sorted_aids = [k for k,v in aids_temp.most_common(20)]
        return sorted_aids
    # USE "CLICKS" CO-VISITATION MATRIX
    aids2 = list(itertools.chain(*[top_20_clicks[aid] for aid in unique_aids if aid in top_20_clicks]))
    # RERANK CANDIDATES
    top_aids2 = [aid2 for aid2, cnt in Counter(aids2).most_common(20) if aid2 not in unique_aids]    
    result = unique_aids + top_aids2[:20 - len(unique_aids)]
    # USE TOP20 TEST CLICKS
    return result + list(top_clicks)[:20-len(result)]

In [9]:
def get_polars_input_data(input_dir: Path, phase: str):
    type_labels = {"clicks": 0, "carts": 1, "orders": 2}
    
    dfs = []
    for path in sorted(list(input_dir.glob(f"{phase}_parquet/*.parquet"))):
        df = (
            pl.read_parquet(path)
            .with_columns([
                pl.col("session").cast(pl.Int32), 
                pl.col("aid").cast(pl.Int32), 
                (pl.col("ts") / 1000).cast(pl.Int32),
                pl.col("type").apply(lambda x: type_labels[x]).cast(pl.Int8)
            ]
            )
        )
        dfs.append(df)
    
    return pl.concat(dfs)

In [9]:
with trace.timer("load test set"):
    test_df = get_input_data(INPUT_DIR, "test")
    test_df = test_df.to_pandas()
    # test_df = get_polars_input_data(INPUT_DIR, "test")

[cpu: 1.5GB(+0.1GB), gpu: 0.8GB(+0.0GB): 2.1sec] load test set 


In [10]:
top_20_clicks = mat.to_pandas().groupby("aid_x")["aid_y"].apply(list).to_dict()
top_clicks = test_df.loc[test_df["type"] == 0, "aid"].value_counts().index.to_numpy()[:20]
type_weight_multipliers = {0: 1, 1: 6, 2: 3}

In [11]:
from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True)

INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


In [12]:
with trace.timer("parallel apply"):
    pred = test_df.sort_values(["session", "ts"]).groupby(["session"]).parallel_apply(suggest_clicks)

  iterator = iter(dataframe_groupby)


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=208976), Label(value='0 / 208976')…

[cpu: 4.2GB(+1.9GB), gpu: 0.8GB(+0.0GB): 141.0sec] parallel apply 


In [13]:
pd.DataFrame(pred.add_suffix("_clicks"), columns=["labels"]).reset_index()

Unnamed: 0,session,labels
0,12899779_clicks,"[59625, 1460571, 485256, 108125, 986164, 15512..."
1,12899780_clicks,"[1142000, 736515, 973453, 582732, 487136, 7605..."
2,12899781_clicks,"[918667, 199008, 194067, 57315, 141736, 295859..."
3,12899782_clicks,"[834354, 595994, 740494, 889671, 987399, 77947..."
4,12899783_clicks,"[1817895, 607638, 1754419, 1216820, 1729553, 3..."
...,...,...
1671798,14571577_clicks,"[1141710, 1276792, 43517, 60347, 140522, 24273..."
1671799,14571578_clicks,"[519105, 6523, 70712, 131884, 184410, 188503, ..."
1671800,14571579_clicks,"[739876, 1550479, 1209992, 785544, 1750859, 50..."
1671801,14571580_clicks,"[202353, 14540, 32322, 74627, 81251, 154774, 2..."
