# Streaming - Data Preparation 

In order to be able to train Faraday by streaming training data (necessary if your dataset is too large to fit in memory), then you need to prepare the dataset for streaming.

- For more information on the package we use to implement streaming (Litdata), please refer to the [docs](https://github.com/Lightning-AI/litdata)
- The litdata dataset transformation documentation can be found [here](https://github.com/Lightning-AI/litdata)

### Pre-requisites

1. If you haven't already, please download LCL dataset from [data.london.gov.uk](https://data.london.gov.uk/dataset/smartmeter-energy-use-data-in-london-households), or...
2. Use the cli app to download and prepare the data (see README)


In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from litdata import optimize
import pandas as pd
from pathlib import Path
import torch
from tqdm import tqdm_notebook as tqdm

from opensynth.data_modules.lcl_data_module import TrainingData

RANDOM_STATE = 0
OUTPUT_DIR = Path("../../data/processed/historical/stream")
SIZE_LIMIT = "100mb"   # size of each chunk pre-compression
COMPRESSION = "zstd"   # compression algorithm
NUM_WORKERS = 4        # number of workers for parallel processing

## 💿 Loading LCL Data

In [3]:
data_path = Path("../../data/processed/historical/train/lcl_data.csv")
stats_path = Path("../../data/processed/historical/train/mean_std.csv")
outlier_path = Path("../../data/processed/historical/train/outliers.csv")

data = pd.read_csv(data_path)
outliers = pd.read_csv(outlier_path)

# Combine and shuffle data
df = pd.concat([data, outliers])
df = df.sample(
    frac=1, random_state=RANDOM_STATE
).reset_index(drop=True)

In [4]:
# Work out some stats to partition the source data
# Note: this is not strictly necessary for the LCL data as it is so small
# but I'm showing the process here as it is necessary for larger datasets

NUM_GROUPS = 5
GROUP_SIZE = len(df) // NUM_GROUPS

In [5]:
# This is the function that will be called by litdata.optimize to yield the data in the correct format
# Some things to take into account:
# - Optimize supports parallel processing, so we pass in a range of values that allow us to slice the data
# - Our streaming dataloaders expect the data to be in a particular format (TrainingData)

def yield_data(idx: int):
    data_slice = df.iloc[idx * GROUP_SIZE: (idx + 1) * GROUP_SIZE]

    for row in data_slice.itertuples():
        features: dict[str, torch.Tensor] = {
            "month": getattr(row, "month"),
            "dayofweek": getattr(row, "dayofweek"),
        }
        yield TrainingData(
            kwh=getattr(row, "kwh"),
            features=features,
        )

## ⚙️ Optimize into streaming format

Docs for the optimize function are [here](https://github.com/Lightning-AI/litdata/blob/cedc6a663ace221a98aa422cbc095055cb9fd43e/src/litdata/processing/functions.py#L295)

In [6]:
optimize(
    fn=yield_data,
    inputs=list(range(NUM_GROUPS)),
    output_dir=str(OUTPUT_DIR),
    num_workers=NUM_WORKERS,
    chunk_bytes=SIZE_LIMIT,
    compression=COMPRESSION,
)

Create an account on https://lightning.ai/ to optimize your data faster using multiple nodes and large machines.
Setting multiprocessing start_method to fork. Tip: Libraries relying on lock can hang with `fork`. To use `spawn` in notebooks, move your code to files and import it within the notebook.
Storing the files under /Users/gus.chadney/dev/OpenSynth/data/processed/historical/stream
Setup started with fast_dev_run=False.
Setup finished in 0.004 seconds. Found 5 items to process.
Starting 4 workers with 5 items. The progress bar is only updated when a worker finishes.
Rank 0 inferred the following `['str', 'int', 'int']` data format.Rank 1 inferred the following `['str', 'int', 'int']` data format.

Rank 2 inferred the following `['str', 'int', 'int']` data format.Workers are ready ! Starting data processing...

Rank 3 inferred the following `['str', 'int', 'int']` data format.

Progress:   0%|          | 0/5 [00:00<?, ?it/s]


Worker 0 is terminating.
Worker 1 is terminating.
Worker 2 is terminating.
Worker 0 is done.
Worker 1 is done.
Worker 2 is done.
Worker 3 is terminating.
Worker 3 is done.
Workers are finished.
Finished data processing!
