In [4]:
%load_ext autoreload 
%autoreload 2

In [5]:
cd ../src

/tmp/kaggle/kaggle_otto_rs/src


### Initialization

In [6]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from pathlib import Path
from tqdm import tqdm
from collections import Counter
from joblib import Parallel, delayed

In [7]:
def process_data(chunk, save_folder=""):
    arrays = c['events'].apply(lambda x: np.array([[c['aid'], c['ts'], CLASSES.index(c['type'])] for c in x]).astype(int)).values
    
    paths = []
    for session, array in zip(chunk['session'], arrays):
        paths.append(save_folder + f"session_{session}.npy")
        if save_folder:
            np.save(save_folder + f"session_{session}.npy", array)

    start_times = [a[0, 1] for a in arrays]
    end_times = [a[-1, 1] for a in arrays]
    df = pd.DataFrame({"path": paths, "start_time": start_times, "end_time": end_times})
    
    return arrays, df

### Val Split

In [8]:
from recsys.testset import *

@beartype
def val_split(train_set: Path, output_path: Path, days: int, seed: int):
    random.seed(seed)
    max_ts = get_max_ts(train_set)

    session_chunks = pd.read_json(train_set, lines=True, chunksize=100000)
    train_file = output_path / 'train_sessions.jsonl'
    test_file_full = output_path / 'val_sessions.jsonl'
    train_test_split(session_chunks, train_file, test_file_full, max_ts, days)

In [9]:
# val_split(
#     Path('../input/train.jsonl'),
#     Path("../output/"),
#     7,
#     42
# )

### Data

In [10]:
DATA_PATH = Path('../input/')
OUT_DIR = Path("../output/")

In [11]:
TRAIN_PATH = DATA_PATH / 'train.jsonl'
TEST_PATH = DATA_PATH / 'test.jsonl'

TRAIN_PATH = DATA_PATH / 'train_sessions.jsonl'
VAL_PATH = DATA_PATH / 'val_sessions.jsonl'
# train_sessions

CLASSES = ['clicks', 'carts', 'orders']

## NPY

In [12]:
# SAVE_DIR = "../input/processed/"
# os.makedirs(SAVE_DIR, exist_ok=True)

# sample_size = 10000
# n_chunks = 12899779 // sample_size + 1

# chunks = pd.read_json(TRAIN_PATH, lines=True, chunksize=sample_size)

In [13]:
# %%time

# ids = set()
# for c in chunks:
#     arrays = process_data(c, "")

#     ids_ = np.concatenate([a[:, 0] for a in arrays])
#     ids.update(ids_.tolist())
#     break

In [14]:
# chunks = pd.read_json(TRAIN_PATH, lines=True, chunksize=sample_size)

# ids = set()
# dfs = []

# for c in tqdm(chunks, total=1110):
#     arrays, df = process_data(c, SAVE_DIR)
#     dfs.append(df)

# df = pd.concat(dfs, ignore_index=True)
# df.to_csv(OUT_DIR / "train.csv", index=False)

### Val

In [15]:
# SAVE_DIR = "../input/processed_val/"
# os.makedirs(SAVE_DIR, exist_ok=True)

# sample_size = 10000
# n_chunks = 12899779 // sample_size + 1

# chunks = pd.read_json(TRAIN_PATH, lines=True, chunksize=sample_size)

In [16]:
# ids = set()
# dfs = []

# for c in tqdm(chunks, total=180):
#     arrays, df = process_data(c, SAVE_DIR)
#     dfs.append(df)

# df = pd.concat(dfs, ignore_index=True)
# df.to_csv(OUT_DIR / "val.csv", index=False)

### Test

In [17]:
# SAVE_DIR = "../input/processed_test/"
# os.makedirs(SAVE_DIR, exist_ok=True)

# sample_size = 10000
# n_chunks = 1671803 // sample_size + 1

# chunks = pd.read_json(TEST_PATH, lines=True, chunksize=sample_size)

In [18]:
# ids = set()
# dfs = []

# for c in tqdm(chunks, total=n_chunks):
#     arrays, df = process_data(c, SAVE_DIR)
#     dfs.append(df)
# #     ids_ = np.concatenate([a[:, 0] for a in arrays])
# #     ids.update(ids_.tolist())
# #     break

# df = pd.concat(dfs, ignore_index=True)
# df.to_csv(OUT_DIR / "test.csv", index=False)

In [19]:
# chunks = pd.read_json(TRAIN_PATH, lines=True, chunksize=sample_size)

# _ = Parallel(n_jobs=os.cpu_count())(
#     delayed(process_data)(c, OUT_DIR)
#     for c in tqdm(chunks, total=n_chunks)
# )

## Parquet

In [20]:
from recsys.labels import ground_truth

In [21]:
def jsonl_to_df(fn, total=1290, test=False):
    
    chunks = pd.read_json(fn, lines=True, chunksize=10000)

    sessions, aids, tss, types, labels_clicks, labels_carts, labels_orders = [], [], [], [], [], [], []
    for chunk in tqdm(chunks, total=total):
        for row_idx, session_data in chunk.iterrows():
            aids_, tss_, types_, labels_clicks_, labels_carts_, labels_orders_ = [], [], [], [], [], []
            if len(session_data['events']) > 1 and not test:
                events = ground_truth(session_data.events)
                for event in events:
                    aids_.append(event['aid'])
                    tss_.append(event['ts'])
                    types_.append(event['type'])
                    labels_clicks_.append(event['labels'].get("clicks", None))
                    labels_carts_.append(list(event['labels'].get("carts", [])))
                    labels_orders_.append(list(event['labels'].get("orders", [])))
            else:
                for event in session_data.events:
                    aids_.append(event['aid'])
                    tss_.append(event['ts'])
                    types_.append(event['type'])

            sessions.append(session_data.session)
            aids.append(aids_)
            tss.append(tss_)
            types.append(types_)
            labels_clicks.append(labels_clicks_)
            labels_carts.append(labels_carts_)
            labels_orders.append(labels_orders_)

    df = pd.DataFrame(data={
        'session': sessions,
        'aid': aids,
        'ts': tss,
        'type': types,
        "labels_clicks": labels_clicks,
        "labels_carts": labels_carts,
        "labels_orders": labels_orders,
    })
    df['target'] = df['type'].apply(lambda x: [CLASSES.index(c) for c in x])
    
    return df

In [22]:
def jsonl_to_df_train(fn, total=1290, test=False):
    
    chunks = pd.read_json(fn, lines=True, chunksize=200000)

    for i, chunk in tqdm(enumerate(chunks), total=total):
        sessions, aids, tss, types, labels_clicks, labels_carts, labels_orders = [], [], [], [], [], [], []

        for row_idx, session_data in chunk.iterrows():
            aids_, tss_, types_, labels_clicks_, labels_carts_, labels_orders_ = [], [], [], [], [], []
            if len(session_data['events']) > 1 and not test:
                events = ground_truth(session_data.events)
                for event in events:
                    aids_.append(event['aid'])
                    tss_.append(event['ts'])
                    types_.append(event['type'])
                    labels_clicks_.append(event['labels'].get("clicks", None))
                    labels_carts_.append(list(event['labels'].get("carts", [])))
                    labels_orders_.append(list(event['labels'].get("orders", [])))
            else:
                for event in session_data.events:
                    aids_.append(event['aid'])
                    tss_.append(event['ts'])
                    types_.append(event['type'])

            sessions.append(session_data.session)
            aids.append(aids_)
            tss.append(tss_)
            types.append(types_)
            labels_clicks.append(labels_clicks_)
            labels_carts.append(labels_carts_)
            labels_orders.append(labels_orders_)

        df = pd.DataFrame(data={
            'session': sessions,
            'aid': aids,
            'ts': tss,
            'type': types,
            "labels_clicks": labels_clicks,
            "labels_carts": labels_carts,
            "labels_orders": labels_orders,
        })
        df['target'] = df['type'].apply(lambda x: [CLASSES.index(c) for c in x])
        
        df.to_parquet(f'../output/train_{i}.parquet', index=False)

In [23]:
%%time
jsonl_to_df_train(TRAIN_PATH, total=56)

  5%|▌         | 56/1110 [1:14:39<23:25:03, 79.98s/it]


CPU times: user 1h 9min 43s, sys: 4min 40s, total: 1h 14min 23s
Wall time: 1h 14min 40s


In [23]:
%%time
df_val = jsonl_to_df(VAL_PATH, total=181)

 96%|█████████▌| 181/189 [06:54<00:18,  2.29s/it]


CPU times: user 6min 39s, sys: 10.1 s, total: 6min 50s
Wall time: 7min 2s


In [24]:
df_val.to_parquet('../output/val.parquet', index=False)

In [29]:
%%time
df_test = jsonl_to_df(TEST_PATH, total=168, test=True)

168it [03:53,  1.39s/it]                         


CPU times: user 3min 39s, sys: 4.34 s, total: 3min 44s
Wall time: 3min 59s


In [30]:
df_test.to_parquet('../output/test.parquet', index=False)