### Data Exploration

To make data loading faster, use the [Parquet version](https://www.kaggle.com/c/ubiquant-market-prediction/discussion/301724), example notebook can be found [here](https://www.kaggle.com/code/robikscube/fast-data-loading-and-low-mem-with-parquet-files/notebook).

In [6]:
import pandas as pd
import numpy as np
import gc
import sys
import os
import torch

In [3]:
# I downloaded the data and extracted it to a local_data/archive folder
dir_train = '/media/user/12TB1/HanLi/GitHub/CMU11785-project/local_data/archive'
train = pd.read_parquet(os.path.join(dir_train, 'train_low_mem.parquet')) # this should take 10~30 seconds.

In [21]:
train.info()
print(f"Unique investment_id: {len(train['investment_id'].unique())}")
print(f"Unique time_id: {len(train['time_id'].unique())}") # 1211 probabliy daily or monthly interval?

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3141410 entries, 0 to 3141409
Columns: 304 entries, row_id to f_299
dtypes: float32(301), object(1), uint16(2)
memory usage: 3.6+ GB
Unique investment_id: 3579
Unique time_id: 1211
Unique row_id: 3141410


In [102]:
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import torch
from torch.utils.data import DataLoader, random_split

# from constants import FEATURES
FEATURES = [f'f_{i}' for i in range(300)]

def collate_fn(datas):
    prems = [torch.randperm(data[0].size(0)) for data in datas]
    length = min(data[0].size(0) for data in datas)
    return [
        torch.stack([d[i][perm][:length] for d, perm in zip(datas, prems)])
        for i in range(3)
    ]


class MyDataset(torch.utils.data.Dataset):
    def __init__(self, *tensor_lists) -> None:
        assert all(len(tensor_lists[0]) == len(
            t) for t in tensor_lists), "Size mismatch between tensor_lists"
        self.tensor_lists = tensor_lists

    def __getitem__(self, index):
        return tuple(t[index] for t in self.tensor_lists)

    def __len__(self):
        return len(self.tensor_lists[0])

def df_to_input_id(df):
    return torch.tensor(df['investment_id'].to_numpy(dtype=np.int16),
                        dtype=torch.int)


def df_to_input_feat(df):
    return torch.tensor(df[FEATURES].to_numpy(),
                        dtype=torch.float32)


def df_to_target(df):
    return torch.tensor(df['target'].to_numpy(),
                        dtype=torch.float32)


def load_data(path):
    df = pd.read_parquet(path)
    groups = df.groupby('time_id')
    return [
        groups.get_group(v)
        for v in df.time_id.unique()
    ]

def split(df_groupby_time, split_ratios):
    ids = [df_to_input_id(df) for df in df_groupby_time]
    feats = [df_to_input_feat(df) for df in df_groupby_time]
    targets = [df_to_target(df) for df in df_groupby_time]

    dataset = MyDataset(ids, feats, targets)

    lengths = []
    for ratio in split_ratios[:-1]:
        lengths.append(int(len(dataset)*ratio))
    lengths.append(len(dataset) - sum(lengths))

    return random_split(dataset, lengths)


class UMPDataModule(pl.LightningDataModule):
    def __init__(self, args):
        super().__init__()
        self.args = args

        datasets = split(load_data(args.input), args.split_ratios)
        if len(datasets) == 3:
            self.tr, self.val, self.test = datasets
        else:
            self.tr, self.val = datasets
            self.test = self.val

    def train_dataloader(self):
        return DataLoader(self.tr, batch_size=self.args.batch_size,
                          num_workers=self.args.workers, shuffle=True,
                          collate_fn=collate_fn, drop_last=True,
                          pin_memory=True)

    def _val_dataloader(self, dataset):
        return DataLoader(dataset, batch_size=1,
                          num_workers=self.args.workers, pin_memory=True)

    def val_dataloader(self):
        return self._val_dataloader(self.val)

    def test_dataloader(self):
        return self._val_dataloader(self.test)