**<div style="font-size:200%">Create gluonts DataSet</div>**

This notebook demonstrates how to

1. create a gluonts dataset which consists of train split, test split, and metadata.
2. add label encodings as a custom metadata to the gluonts dataset.

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2
%config InlineBackend.figure_format = 'retina'

from IPython.display import Markdown

import json
from pathlib import Path
from typing import Any, Callable, Dict, Sequence

import pandas as pd
from gluonts.dataset.common import (
    CategoricalFeatureInfo,
    ListDataset,
    MetaData,
    TrainDatasets,
    load_datasets,
)

from gluonts_nb_utils import fill_dt_all

# Global Config

In [None]:
# This is the .csv file produced by notebook 00-setup-env.ipynb.
csv_fname = '../data/input_to_forecast.csv'

# Constants used by this example.
freq, fcast_length = "D", 30
min_date = '2014-01-01'
max_date = '2014-12-31'

# Output
bucket = 'OUTPUT_BUCKET'
prefix = 'gluonts-examples-dataset'
dataset_name = f'synthetic-dataset'

%set_env BUCKET=$bucket
%set_env PREFIX=$prefix
%set_env DATASET_NAME=$dataset_name

# Load and Filter Timeseries

In this step, we load the synthetic data generated in notebook `00-setup-env.ipynb` with the following steps:

1. Read .csv file.
2. Select specific period.
3. Apply mid-padding and right-padding (with zero) to ensure gluonts see contiguous timeseries with the same end timestamp.

In [None]:
%%time

# Load .csv whose timeseries can be fragmented.
ts = pd.read_csv('../data/input_to_forecast.csv', parse_dates=['timestamp'], infer_datetime_format=True)
ts.rename(columns={'timestamp': 'x', 'quantity': 'y'}, inplace=True)

# Select only raw data within a specific year.
ts = ts[(ts['x'] >= min_date) & (ts['x'] <= max_date)]

# For each SKU, perform mid-padding and right-padding
ts_filled = fill_dt_all(ts, ts_id=['sku'], dates=("min", max_date, "D"), freq=freq)

# Generate gluonts TRAIN dataset

Implementation notes:

- For simplicity, this implementation keeps all the generated gluonts timeseries in memory.

- We use `gluonts.dataset.common.TrainDatasets.save()` which writes to local filesystem, hence needs a follow-up step to upload to s3.

## TRAIN: in-memory gluonts data

Define helper functions.

In [None]:
def encode_cat(cats):
    return {c: i for i, c in enumerate(cats)}


def df2gluonts(
    df,
    cat_idx,
    fcast_len: int,
    freq: str = "D",
    ts_id: Sequence[str] = ["cat", "cc"],
    static_cat: Sequence[str] = ["cat", "cc"],
    item_id_fn: Callable = None,
) -> None:
    """Convert a dataframe of multiple timeseries to json lines.

    This function supports gluonts static features, but not the dynamic features.

    Args:
        df (pd.DataFrame): Dataframe of multiple timeseries, where target variable must be called column `y`.
        cat_idx (Dict[str, Dict[str, int]]): Mapper for static categories.
        fcast_len (int, optional): Forecast horizon. Defaults to 12.
        freq (str, optional): Frequency of timeseries. Defaults to 'W'.
        ts_id (Sequence[str], optional): Identifier columns in the dataframe. Defaults to ['cat', 'cc'].
        static_cat (Sequence[str], optional): Columns that denotes static category features of each timeseries.
            Defaults to ['cat', 'cc'].
        item_id_fn ([type], optional): Function to format `item_id`. Defaults to None.
    """
    data_iter = []

    # Build the payload
    for item_id, dfg in df.groupby(ts_id, as_index=False):
        if len(ts_id) < 2:
            item_id = [item_id]

        if fcast_len > 0:
            # Train split exclude the last fcast_len timestamps
            ts_len = len(dfg) - fcast_len
            target = dfg["y"][:-fcast_len]
        else:
            # Test split include all timeseries. During backtesting,
            # gluonts will treat the fcast_len as groundtruth.
            target = dfg["y"]

        feat_static_cat = []
        for col in static_cat:
            # Construct all static category features of current timeseries.
            assert dfg[col].nunique() == 1
            cat_value = dfg[col].iloc[0]
            # Encode sku to zero-based number for feat_static_cat.
            feat_static_cat.append(cat_idx[col][cat_value])

        if item_id_fn is None:
            # NOTE: our sm-glounts entrypoint will interpret '|' as '\n'
            # in the plot title.
            item_id = "|".join(item_id)
        else:
            item_id = item_id_fn(*item_id)

        data_iter.append(
            {"start": dfg.iloc[0]["x"], "target": target, "feat_static_cat": feat_static_cat, "item_id": item_id}
        )

    # Finally we call gluonts API to convert data_iter with frequency of
    # the observation in the time series
    data = ListDataset(data_iter, freq=freq)
    return data

Generate train and test splits.

In [None]:
# Zero-based SKU encoding.
# TS is a pd.DataFrame that contains time series for each SKU,
# where all timeseries have the same frequency.
cat_inverted_idx = {'sku': encode_cat(ts_filled['sku'].unique())}

# Drop the final fcast_length from train data.
train_data= df2gluonts(ts_filled,
                       cat_inverted_idx,
                       fcast_len=fcast_length,
                       freq=freq,
                       ts_id=['sku'],
                       static_cat=['sku']
)

# Test data include fcast_length which are ground truths.
test_data = df2gluonts(ts_filled,
                       cat_inverted_idx,
                       fcast_len=0,
                       freq=freq,
                       ts_id=['sku'],
                       static_cat=['sku']
)

## TRAIN: write to local fs, then s3

In [None]:
gluonts_datasets = TrainDatasets(
    metadata=MetaData(
                freq=freq,
                target={'name': 'quantity'},
                feat_static_cat=[
                    CategoricalFeatureInfo(name=k, cardinality=len(v)+1)   # Add 'unknown'.
                    for k,v in cat_inverted_idx.items()
                ],
                prediction_length = fcast_length
    ),
    train=train_data,
    test=test_data
)

# Setting `overwrite=True` means rm -fr path_str, mkdir path_str, then write individual files.
local_path=f'../data/processed/{dataset_name}'
gluonts_datasets.save(path_str=local_path, overwrite=True)

# Save also our indexes
with open(Path(local_path) / 'metadata' / 'cat.json', 'w') as f:
    json.dump(cat_inverted_idx, f)

# Preview the generated json files.
# NOTE: you can safely ignore 'Broken pipe' errors in the cell's output.
%set_env LOCAL_PATH=$local_path
!cat $LOCAL_PATH/metadata/metadata.json | head -1 | jq
!cat $LOCAL_PATH/train/data.json | head -1 | jq
!cat $LOCAL_PATH/test/data.json | head -1 | jq
!cat $LOCAL_PATH/metadata/cat.json | jq '.' | head

Verify that we can re-read the output files

In [None]:
import os
reloaded_dataset = load_datasets(
                        metadata=os.path.join(local_path, "metadata"),
                        train=os.path.join(local_path, "train"),
                        test=os.path.join(local_path, "test")
                   )
display(
    reloaded_dataset.metadata,
    reloaded_dataset.train,
    reloaded_dataset.test
)

Everything looks ok. Now, let's upload to S3.

In [None]:
!aws s3 cp --recursive $LOCAL_PATH s3://$BUCKET/$PREFIX/$DATASET_NAME/ --storage-class ONEZONE_IA

List the uploaded files.

In [None]:
!aws s3 ls s3://$BUCKET/$PREFIX/$DATASET_NAME/ --recursive