# Prepare cross-validation
This notebook prepares the cross-validation datasets for the OTTO competition. It mostly uses code provided by organizers. They've used the code to produce test dataset. I use the same code to produce two cross-validation datasets.
Also, the notebook transform all the datasets, both test and cross-validation from json to dataframe and saves these dataset as parquet files to be used by all other notebooks of the project.

## Imports and definitions

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import gc
import argparse
import json
import random
from copy import deepcopy
from pathlib import Path

from pandas.io.json._json import JsonReader
from tqdm.auto import tqdm

In [None]:
!pip install beartype
from beartype import beartype

In [None]:
# Code provided by competition organizers. 
# They used it to prepare test dataset, and I use it to prepare the cross-validation dataset.
class setEncoder(json.JSONEncoder):

    def default(self, obj):
        return list(obj)

    
@beartype
#def ground_truth(events: list[dict]):
def ground_truth(events: list):
    prev_labels = {"clicks": None, "carts": set(), "orders": set()}

    for event in reversed(events):
        event["labels"] = {}

        for label in ['clicks', 'carts', 'orders']:
            if prev_labels[label]:
                if label != 'clicks':
                    event["labels"][label] = prev_labels[label].copy()
                else:
                    event["labels"][label] = prev_labels[label]

        if event["type"] == "clicks":
            prev_labels['clicks'] = event["aid"]
        if event["type"] == "carts":
            prev_labels['carts'].add(event["aid"])
        elif event["type"] == "orders":
            prev_labels['orders'].add(event["aid"])

    return events[:-1]

@beartype
#def split_events(events: list[dict], split_idx=None):
def split_events(events: list, split_idx=None):
    test_events = ground_truth(deepcopy(events))
    if not split_idx:
        split_idx = random.randint(1, len(test_events))
    test_events = test_events[:split_idx]
    labels = test_events[-1]['labels']
    for event in test_events:
        del event['labels']
    return test_events, labels


@beartype
def create_kaggle_testset(sessions: pd.DataFrame, sessions_output: Path, labels_output: Path):
    last_labels = []
    splitted_sessions = []

    for _, session in tqdm(sessions.iterrows(), desc="Creating trimmed testset", total=len(sessions)):
        session = session.to_dict()
        splitted_events, labels = split_events(session['events'])
        last_labels.append({'session': session['session'], 'labels': labels})
        splitted_sessions.append({'session': session['session'], 'events': splitted_events})

    with open(sessions_output, 'w') as f:
        for session in splitted_sessions:
            f.write(json.dumps(session) + '\n')

    with open(labels_output, 'w') as f:
        for label in last_labels:
            f.write(json.dumps(label, cls=setEncoder) + '\n')


@beartype
def trim_session(session: dict, max_ts: int) -> dict:
    session['events'] = [event for event in session['events'] if event['ts'] < max_ts]
    return session


@beartype
def get_max_ts(sessions_path: Path) -> int:
    max_ts = float('-inf')
    with open(sessions_path) as f:
        for line in tqdm(f, desc="Finding max timestamp"):
            session = json.loads(line)
            max_ts = max(max_ts, session['events'][-1]['ts'])
    return max_ts


@beartype
#def filter_unknown_items(session_path: Path, known_items: set[int]):
def filter_unknown_items(session_path: Path, known_items: set):
    filtered_sessions = []
    with open(session_path) as f:
        for line in tqdm(f, desc="Filtering unknown items"):
            session = json.loads(line)
            session['events'] = [event for event in session['events'] if event['aid'] in known_items]
            if len(session['events']) >= 2:
                filtered_sessions.append(session)
    with open(session_path, 'w') as f:
        for session in filtered_sessions:
            f.write(json.dumps(session) + '\n')


@beartype
def train_test_split(session_chunks: JsonReader, train_path: Path, test_path: Path, max_ts: int, test_days: int):
    split_millis = test_days * 24 * 60 * 60 * 1000
    split_ts = max_ts - split_millis
    train_items = set()
    Path(train_path).parent.mkdir(parents=True, exist_ok=True)
    train_file = open(train_path, "w")
    Path(test_path).parent.mkdir(parents=True, exist_ok=True)
    test_file = open(test_path, "w")
    for chunk in tqdm(session_chunks, desc="Splitting sessions"):
        for _, session in chunk.iterrows():
            session = session.to_dict()
            if session['events'][0]['ts'] > split_ts:
                test_file.write(json.dumps(session) + "\n")
            else:
                session = trim_session(session, split_ts)
                if len(session['events']) >= 2:
                    train_items.update([event['aid'] for event in session['events']])
                    train_file.write(json.dumps(session) + "\n")
    train_file.close()
    test_file.close()
    filter_unknown_items(test_path, train_items)


@beartype
def main(train_set: Path, output_path: Path, days: int, seed: int):
    random.seed(seed)
    max_ts = get_max_ts(train_set)

    session_chunks = pd.read_json(train_set, lines=True, chunksize=100000)
    train_file = output_path / 'train_sessions.jsonl'
    test_file_full = output_path / 'test_sessions_full.jsonl'
    train_test_split(session_chunks, train_file, test_file_full, max_ts, days)

    test_sessions = pd.read_json(test_file_full, lines=True)
    test_sessions_file = output_path / 'test_sessions.jsonl'
    test_labels_file = output_path / 'test_labels.jsonl'
    create_kaggle_testset(test_sessions, test_sessions_file, test_labels_file)

'''
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-set', type=Path, required=True)
    parser.add_argument('--output-path', type=Path, required=True)
    parser.add_argument('--days', type=int, default=2)
    parser.add_argument('--seed', type=int, default=42)
    args = parser.parse_args()
    main(args.train_set, args.output_path, args.days, args.seed)
'''

In [None]:
# Transfrorm cross-validation labels from json to dataframe.
def save_labels(data_path):
    chunks = pd.read_json(data_path, lines=True, chunksize=100_000)
    labels_df =  pd.DataFrame()
    
    for chunk in tqdm(chunks):
        #df_labels = pd.DataFrame(chunk)
        label_dict = {
            'session': [],
            'clicks': [],
            'carts': [],
            'orders': [],
            }
        for session, labels in zip(chunk['session'].tolist(), chunk['labels'].tolist()):
            label_dict['session'].append(session)
            if 'clicks' in labels:
                label_dict['clicks'].append(labels['clicks'])
            else:
                label_dict['clicks'].append(-1)
            if 'carts' in labels:
                label_dict['carts'].append(labels['carts'])
            else:
                label_dict['carts'].append([])
            if 'orders' in labels:
                label_dict['orders'].append(labels['orders'])
            else:
                label_dict['orders'].append([])
        chunk_labels = pd.DataFrame(label_dict)
        chunk_labels['session'] = chunk_labels['session'].astype(np.int32)
        chunk_labels['clicks'] = chunk_labels['clicks'].astype(np.int32)
        labels_df = pd.concat([labels_df, chunk_labels])
    labels_df = labels_df.reset_index(drop=True)
    return labels_df


In [None]:
# Transfrorm all other json files to dataframes and reduce memory usage.

def load_transform_json(sessions_df, file_path):
    chunks = pd.read_json(file_path, lines=True, chunksize=100_000)
    for e, chunk in enumerate(chunks):
        print(e)
        event_dict = {
            'session': [],
            'aid': [],
            'ts': [],
            'type': [],
        }
        
        for session, events in zip(chunk['session'].tolist(), chunk['events'].tolist()):
            for event in events:
                event_dict['session'].append(session)
                event_dict['aid'].append(event['aid'])
                event_dict['ts'].append(event['ts'])
                event_dict['type'].append(event['type'])
        chunk_session = pd.DataFrame(event_dict)
        chunk_session['session'] = chunk_session['session'].astype(np.int32)
        chunk_session['aid'] = chunk_session['aid'].astype(np.int32)
        chunk_session['ts'] = chunk_session['ts']/1000
        chunk_session['ts'] = chunk_session['ts'].astype(np.int32)
        chunk_session['type'] = chunk_session['type'].map({'clicks': 0, 'carts': 1,  'orders': 2}).astype(np.int8)
        sessions_df = pd.concat([sessions_df, chunk_session])
        del chunk_session, event_dict
        gc.collect()
    sessions_df = sessions_df.reset_index(drop=True)
    return sessions_df

In [None]:
# A simple procedure to load a json file, convert it to tabular data and save as a parquet file.
def load_save(input_path, output_name):
    df_load_save =  pd.DataFrame()
    df_load_save = load_transform_json(df_load_save, input_path)
    df_load_save.to_parquet(output_name)


## Create cross-validation datasets and convert all the files from json to parquet

In [None]:
# Use code, provided by organizers to produce the first cross-validation daraset.
train = Path('/kaggle/input/otto-recommender-system/train.jsonl')
output = Path('../')
main(train, output, 7, 367)

In [None]:
# Convert cv1 labels into tabular data.
cv_labels_path = Path('../test_labels.jsonl')
df_cv_labels = save_labels(cv_labels_path)
export_file_cv_labels = 'cv_labels.parquet'
df_cv_labels.to_parquet(export_file_cv_labels)

del df_cv_labels
gc.collect()

In [None]:
# Converting cv1 and test trancated sessions and full sessions into tabular data and saving them as parquet files.
load_save(Path('../train_sessions.jsonl'), 'cv_train.parquet')
load_save(Path('../test_sessions.jsonl'), 'cv_inputs.parquet')
load_save(Path('/kaggle/input/otto-recommender-system/train.jsonl'), 'train_full.parquet')
load_save(Path('/kaggle/input/otto-recommender-system/test.jsonl'), 'test.parquet')


In [None]:
# Repeat the same process with different random seed to get an alternative cross-validation set (second cross-validation set).
main(train, output, 7, 203)
load_save(Path('../test_sessions.jsonl'), 'cv_inputs2.parquet')

In [None]:
# Convert alternative cross-validation set labels (cv2 labels) into tabular data and save them as parquet files.
cv_labels_path = Path('../test_labels.jsonl')
df_cv_labels = save_labels(cv_labels_path)
export_file_cv_labels = 'cv_labels2.parquet'
df_cv_labels.to_parquet(export_file_cv_labels)

del df_cv_labels
gc.collect()