<a href="https://colab.research.google.com/github/RMichae1/PyroStudies/blob/master/M5_helper_implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyro-ppl



In [2]:
import torch
import pyro

import pyro.distributions as dist
from pyro.contrib.forecast import ForecastingModel
from pyro.contrib.forecast import Forecaster
from pyro.ops.tensor_utils import periodic_repeat
from pyro.nn import PyroModule
from pyro.nn import PyroParam
import pyro.poutine as poutine
from pyro.infer.reparam import LocScaleReparam, StableReparam

import os

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

  import pandas.util.testing as tm


In [0]:
torch.cuda.is_available()
pyro.set_rng_seed(1001)

In [4]:
# set parameters for runs
CUDA = torch.cuda.is_available()
if CUDA:
  torch.set_default_tensor_type(torch.cuda.FloatTensor)

DRIVE = "drive/My Drive/kaggle_m5/"
OUTPUT = os.path.join(DRIVE, "results")
OUTFILE = os.path.join(OUTPUT, "submission.csv")

if not os.path.exists(OUTPUT):
    os.makedirs(OUTPUT)

print(OUTFILE)

drive/My Drive/kaggle_m5/results/submission.csv


In [0]:
### UTILITY TO LOAD M5 DATA
import math
import os
import zipfile

import numpy as np
import pandas as pd
import torch


class M5Data:
    """
    A helper class to read M5 source files and create submissions.
    :param str data_path: Path to the folder that contains M5 data files, which is
        either a single `.zip` file or some `.csv` files extracted from that zip file.
    """
    num_states = 3
    num_stores = 10
    num_cats = 3
    num_depts = 7
    num_items = 3049
    num_stores_by_state = [4, 3, 3]
    num_depts_by_cat = [2, 2, 3]
    num_items_by_cat = [565, 1047, 1437]
    num_items_by_dept = [416, 149, 532, 515, 216, 398, 823]
    num_timeseries = 30490  # store x item
    num_aggregations = 42840
    num_aggregations_by_level = [1, 3, 10, 3, 7, 9, 21, 30, 70, 3049, 9147, 30490]
    num_quantiles = 9
    quantiles = [0.005, 0.025, 0.165, 0.25, 0.5, 0.75, 0.835, 0.975, 0.995]
    aggregation_levels = [[],
                          ["state_id"],
                          ["store_id"],
                          ["cat_id"],
                          ["dept_id"],
                          ["state_id", "cat_id"],
                          ["state_id", "dept_id"],
                          ["store_id", "cat_id"],
                          ["store_id", "dept_id"],
                          ["item_id"],
                          ["state_id", "item_id"],
                          ["store_id", "item_id"]]
    event_types = ["Cultural", "National", "Religious", "Sporting"]

    def __init__(self, data_path=None):
        self.data_path = os.path.abspath("data") if data_path is None else data_path
        if not os.path.exists(self.data_path):
            raise FileNotFoundError(f"There is no folder '{self.data_path}'.")

        acc_path = os.path.join(self.data_path, "m5-forecasting-accuracy.zip")
        unc_path = os.path.join(self.data_path, "m5-forecasting-uncertainty.zip")
        self.acc_zipfile = zipfile.ZipFile(acc_path) if os.path.exists(acc_path) else None
        self.unc_zipfile = zipfile.ZipFile(unc_path) if os.path.exists(unc_path) else None

        self._sales_df = None
        self._calendar_df = None
        self._prices_df = None

    @property
    def num_days(self):
        return self.calendar_df.shape[0]

    @property
    def num_train_days(self):
        return self.sales_df.shape[1] - 5

    @property
    def sales_df(self):
        if self._sales_df is None:
            self._sales_df = self._read_csv("sales_train_validation.csv", index_col=0)
        return self._sales_df

    @property
    def calendar_df(self):
        if self._calendar_df is None:
            self._calendar_df = self._read_csv("calendar.csv", index_col=0)
        return self._calendar_df

    @property
    def prices_df(self):
        if self._prices_df is None:
            df = self._read_csv("sell_prices.csv")
            df["id"] = df.item_id + "_" + df.store_id + "_validation"
            df = pd.pivot_table(df, values="sell_price", index="id", columns="wm_yr_wk")
            self._prices_df = df.fillna(float('nan')).loc[self.sales_df.index]
        return self._prices_df

    def listdir(self):
        """
        List all files in `self.data_path` folder.
        """
        files = set(os.listdir(self.data_path))
        if self.acc_zipfile:
            files |= set(self.acc_zipfile.namelist())
        if self.unc_zipfile:
            files |= set(self.unc_zipfile.namelist())
        return files

    def _read_csv(self, filename, index_col=None, use_acc_file=True):
        """
        Returns the dataframe from csv file ``filename``.
        :param str filename: name of the file with trailing `.csv`.
        :param int index_col: indicates which column from csv file is considered as index.
        :param bool acc_file: whether to load data from accuracy.zip file or uncertainty.zip file.
        """
        assert filename.endswith(".csv")
        if filename not in self.listdir():
            raise FileNotFoundError(f"Cannot find either '{filename}' "
                                    "or 'm5-forecasting-*.zip' file "
                                    f"in '{self.data_path}'.")

        if use_acc_file and self.acc_zipfile and filename in self.acc_zipfile.namelist():
            return pd.read_csv(self.acc_zipfile.open(filename), index_col=index_col)

        if self.unc_zipfile and filename in self.unc_zipfile.namelist():
            return pd.read_csv(self.unc_zipfile.open(filename), index_col=index_col)

        return pd.read_csv(os.path.join(self.data_path, filename), index_col=index_col)

    def get_sales(self):
        """
        Returns `sales` torch.Tensor with shape `num_timeseries x num_train_days`.
        """
        return torch.from_numpy(self.sales_df.iloc[:, 5:].values).type(torch.get_default_dtype())

    def get_prices(self, fillna=0.):
        """
        Returns `prices` torch.Tensor with shape `num_timeseries x num_days`.
        In some days, there are some items not available, so their prices will be NaN.
        :param float fillna: a float value to replace NaN. Defaults to 0.
        """
        x = torch.from_numpy(self.prices_df.values).type(torch.get_default_dtype())
        x[torch.isnan(x)] = fillna
        x = x.repeat_interleave(7, dim=-1)[:, :self.calendar_df.shape[0]]
        assert x.shape == (self.num_timeseries, self.num_days)
        return x

    def get_snap(self):
        """
        Returns a `num_days x 3` boolean tensor which indicates whether
        SNAP purchases are allowed at a state in a particular day. The order
        of the first dimension indicates the states "CA", "TX", "WI" respectively.
        Usage::
            >>> m5 = M5Data()
            >>> snap = m5.get_snap()
            >>> assert snap.shape == (m5.num_states, m5.num_days)
            >>> snap = snap.repeat_interleave(torch.tensor(m5.num_stores_by_state), dim=0)
            >>> assert snap.shape == (m5.num_stores, m5.num_days)
        """
        snap = self.calendar_df[["snap_CA", "snap_TX", "snap_WI"]].values
        x = torch.from_numpy(snap).type(torch.get_default_dtype())
        assert x.shape == (self.num_days, 3)
        return x

    def get_event(self, by_types=False):
        """
        Returns a tensor with length `num_days` indicating whether there are
        special events on a particular day.
        There are 4 types of events: "Cultural", "National", "Religious", "Sporting".
        :param bool by_types: if True, returns a `num_days x 4` tensor indicating
            special event by type. Otherwise, only returns a `num_days x 1` tensor indicating
            whether there is a special event.
        """
        if not by_types:
            event = self.calendar_df["event_type_1"].notnull().values[..., None]
            x = torch.from_numpy(event).type(torch.get_default_dtype())
            assert x.shape == (self.num_days, 1)
            return x

        types = self.event_types
        event1 = pd.get_dummies(self.calendar_df["event_type_1"])[types].astype(bool)
        event2 = pd.DataFrame(columns=types)
        types2 = ["Cultural", "Religious"]
        event2[types2] = pd.get_dummies(self.calendar_df["event_type_2"])[types2].astype(bool)
        event2.fillna(False, inplace=True)
        x = torch.from_numpy(event1.values | event2.values).type(torch.get_default_dtype())
        assert x.shape == (self.num_days, 4)
        return x

    def get_dummy_day_of_month(self):
        """
        Returns dummy day of month tensor with shape `num_days x 31`.
        """
        dom = pd.get_dummies(pd.to_datetime(self.calendar_df.index).day).values
        x = torch.from_numpy(dom).type(torch.get_default_dtype())
        assert x.shape == (self.num_days, 31)
        return x

    def get_dummy_month_of_year(self):
        """
        Returns dummy month of year tensor with shape `num_days x 12`.
        """
        moy = pd.get_dummies(pd.to_datetime(self.calendar_df.index).month).values
        x = torch.from_numpy(moy).type(torch.get_default_dtype())
        assert x.shape == (self.num_days, 12)
        return x

    def get_dummy_day_of_week(self):
        """
        Returns dummy day of week tensor with shape `num_days x 7`.
        """
        dow = pd.get_dummies(self.calendar_df.wday).values
        x = torch.from_numpy(dow).type(torch.get_default_dtype())
        assert x.shape == (self.num_days, 7)
        return x

    def get_dummy_year(self):
        """
        Returns dummy year tensor with shape `num_days x 6`.
        """
        year = pd.get_dummies(pd.to_datetime(self.calendar_df.index).year).values
        x = torch.from_numpy(year).type(torch.get_default_dtype())
        assert x.shape == (self.num_days, 6)
        return x

    def get_christmas(self):
        """
        Returns a boolean 2D tensor with shape `num_days x 1` indicating
        if that day is Chrismas.
        """
        christmas = self.calendar_df.index.str.endswith("12-25")[..., None]
        x = torch.from_numpy(christmas).type(torch.get_default_dtype())
        assert x.shape == (self.num_days, 1)
        return x

    def get_dummy_state(self):
        """
        Returns dummy state tensor with shape `num_timeseries x num_states`.
        """
        state = pd.get_dummies(self.sales_df.state_id)[self.sales_df.state_id.unique()].values
        x = torch.from_numpy(state).type(torch.get_default_dtype())
        assert x.shape == (self.num_timeseries, self.num_states)
        return x

    def get_dummy_cat(self):
        """
        Returns dummy cat tensor with shape `num_timeseries x num_states`.
        """
        cat = pd.get_dummies(self.sales_df.cat_id)[self.sales_df.cat_id.unique()].values
        x = torch.from_numpy(cat).type(torch.get_default_dtype())
        assert x.shape == (self.num_timeseries, self.num_cats)
        return x

    def get_dummy_dept(self):
        """
        Returns dummy dept tensor with shape `num_timeseries x num_states`.
        """
        dept = pd.get_dummies(self.sales_df.dept_id)[self.sales_df.dept_id.unique()].values
        x = torch.from_numpy(dept).type(torch.get_default_dtype())
        assert x.shape == (self.num_timeseries, self.num_depts)
        return x

    def get_aggregated_sales(self, level):
        """
        Returns aggregated sales at a particular aggregation level.
        The result will be a tensor with shape `num_timeseries x num_train_days`.
        """
        if level == self.aggregation_levels[-1]:
            x = self.sales_df.iloc[:, 5:].values
        elif level == self.aggregation_levels[0]:
            x = self.sales_df.iloc[:, 5:].sum().values[None, :]
        else:
            df = self.sales_df.groupby(level, sort=False).sum()
            x = df.values

        return torch.from_numpy(x).type(torch.get_default_dtype())

    def get_aggregated_ma_dollar_sales(self, level):
        """
        Returns aggregated "moving average" dollar sales at a particular aggregation level
        during the last 28 days.
        The result can be used as `weight` for evaluation metrics.
        """
        prices = self.prices_df.fillna(0.).values.repeat(7, axis=1)[:, :self.sales_df.shape[1] - 5]
        df = (self.sales_df.iloc[:, 5:] * prices).T.rolling(28, min_periods=1).mean().T

        if level == self.aggregation_levels[-1]:
            x = df.values
        elif level == self.aggregation_levels[0]:
            x = df.sum().values[None, :]
        else:
            for g in level:
                df[g] = self.sales_df[g]

            df = df.groupby(level, sort=False).sum()
            x = df.values

        return torch.from_numpy(x).type(torch.get_default_dtype())

    def get_all_aggregated_sales(self):
        """
        Returns aggregated sales for all aggregation levels.
        """
        xs = []
        for level in self.aggregation_levels:
            xs.append(self.get_aggregated_sales(level))
        xs = torch.cat(xs, 0)
        assert xs.shape[0] == self.num_aggregations
        return xs

    def get_all_aggregated_ma_dollar_sales(self):
        """
        Returns aggregated "moving average" dollar sales for all aggregation levels.
        """
        xs = []
        for level in self.aggregation_levels:
            xs.append(self.get_aggregated_ma_dollar_sales(level))
        xs = torch.cat(xs, 0)
        assert xs.shape[0] == self.num_aggregations
        return xs

    def aggregate_samples(self, samples, level, *extra_levels):
        """
        Aggregates samples (at the lowest level) to a specific level.
        Usage::
            >>> m5 = M5Data()
            >>> o = []
            >>> for level in m5.aggregation_levels:
            ...     print("Level", level)
            ...     o.append(m5.aggregate_samples(samples, level))
            >>> o = torch.cat(o, 1)
            >>> q = np.quantile(o.numpy(), m5.quantiles, axis=0)  # compute quantiles
            >>> m5.make_uncertainty_submission("foo.csv", q)
        :param torch.Tensor samples: a tensor with shape `num_samples x num_timeseries x num_days`
        :param list level: which level to aggregate
        :param extra_levels: additional levels to aggregate; the results for all levels will be
            concatenated together.
        :returns: a tensor with shape `num_samples x num_aggregated_timeseries x num_days`.
        """
        assert torch.is_tensor(samples)
        assert samples.dim() == 3
        assert samples.size(1) == self.num_timeseries
        num_samples, duration = samples.size(0), samples.size(-1)
        x = samples.reshape(num_samples, self.num_stores, self.num_items, duration)

        if "state_id" in level:
            tmp = []
            pos = 0
            for n in self.num_stores_by_state:
                tmp.append(x[:, pos:pos + n].sum(1, keepdim=True))
                pos = pos + n
            x = torch.cat(tmp, dim=1)
        elif "store_id" in level:
            pass
        else:
            x = x.sum(1, keepdim=True)

        if "cat_id" in level:
            tmp = []
            pos = 0
            for n in self.num_items_by_cat:
                tmp.append(x[:, :, pos:pos + n].sum(2, keepdim=True))
                pos = pos + n
            x = torch.cat(tmp, dim=2)
        elif "dept_id" in level:
            tmp = []
            pos = 0
            for n in self.num_items_by_dept:
                tmp.append(x[:, :, pos:pos + n].sum(2, keepdim=True))
                pos = pos + n
            x = torch.cat(tmp, dim=2)
        elif "item_id" in level:
            pass
        else:
            x = x.sum(2, keepdim=True)

        n = self.num_aggregations_by_level[self.aggregation_levels.index(level)]
        x = x.reshape(num_samples, n, duration)
        if extra_levels:
            tmp = [x]
            for level in extra_levels:
                tmp.append(self.aggregate_samples(samples, level))
            x = torch.cat(tmp, 1)
        return x

    def make_accuracy_submission(self, filename, prediction):
        """
        Makes submission file given prediction result.
        :param str filename: name of the submission file.
        :param torch.Tensor predicition: the prediction tensor with shape `num_timeseries x 28`.
        """
        df = self._read_csv("sample_submission.csv", index_col=0)
        if torch.is_tensor(prediction):
            prediction = prediction.detach().cpu().numpy()
        assert isinstance(prediction, np.ndarray)
        assert prediction.shape == (self.num_timeseries, 28)
        # the later 28 days only available 1 month before the deadline
        assert df.shape[0] == prediction.shape[0] * 2
        df.iloc[:prediction.shape[0], :] = prediction
        df.to_csv(filename)

    def make_uncertainty_submission(self, filename, prediction, float_format='%.3g'):
        """
        Makes submission file given prediction result.
        :param str filename: name of the submission file.
        :param torch.Tensor predicition: the prediction tensor with shape
            `9 x num_aggregations x 28`. The first dimension indicates
            9 quantiles defined in `self.quantiles`. The second dimension
            indicates aggreated series defined in `self.aggregation_levels`,
            with corresponding order. This is also the order of
            submission file.
        """
        df = self._read_csv("sample_submission.csv", index_col=0, use_acc_file=False)
        if torch.is_tensor(prediction):
            prediction = prediction.detach().cpu().numpy()
        assert isinstance(prediction, np.ndarray)
        assert prediction.shape == (9, self.num_aggregations, 28)

        # correct the messy index in submission file
        tmp = []
        pos = 0
        for level, n in zip(self.aggregation_levels, self.num_aggregations_by_level):
            if level == self.aggregation_levels[0] or level == self.aggregation_levels[-1]:
                tmp.append(prediction[:, pos:pos+n])
            else:
                tmp_df = self.sales_df.groupby(level, sort=False)[["item_id"]].count()
                tmp_df["id"] = range(tmp_df.shape[0])
                tmp_df = tmp_df.sort_index()
                if level == self.aggregation_levels[-2]:
                    tmp_df = tmp_df.reindex(["WI", "CA", "TX"], level=0)
                new_index = tmp_df["id"].values
                tmp.append(prediction[:, pos:pos+n][:, new_index])
            pos = pos + n
        prediction = np.concatenate(tmp, axis=1)

        prediction = prediction.reshape(-1, 28)
        # the later 28 days only available 1 month before the deadline
        assert df.shape[0] == prediction.shape[0] * 2
        df.iloc[:prediction.shape[0], :] = prediction
        # use float_format to reduce the size of output file,
        # recommended at https://www.kaggle.com/c/m5-forecasting-uncertainty/discussion/135049
        df.to_csv(filename, float_format=float_format)


class BatchDataLoader:
    """
    DataLoader class which iterates over the dataset (data_x, data_y) in batch.
    Usage::
        >>> data_loader = BatchDataLoader(data_x, data_y, batch_size=1000)
        >>> for batch_x, batch_y in data_loader:
        ...     # do something with batch_x, batch_y
    """
    def __init__(self, data_x, data_y, batch_size, shuffle=True):
        super().__init__()
        self.data_x = data_x
        self.data_y = data_y
        self.batch_size = batch_size
        self.shuffle = shuffle
        assert self.data_x.size(0) == self.data_y.size(0)
        assert len(self) > 0

    @property
    def size(self):
        return self.data_x.size(0)

    def __len__(self):
        # XXX: should we remove or include the tailing data (which has len < batch_size)?
        return math.ceil(self.size / self.batch_size)

    def _sample_batch_indices(self):
        if self.shuffle:
            idx = torch.randperm(self.size)
        else:
            idx = torch.arange(self.size)
        return idx, len(self)

    def __iter__(self):
        idx, n_batches = self._sample_batch_indices()
        for i in range(n_batches):
            _slice = idx[i * self.batch_size: (i + 1) * self.batch_size]
            yield self.data_x[_slice], self.data_y[_slice]

**See Model 1**

In [0]:
class TopDownModel(ForecastingModel):
  """
  Forecasting Model 1
  """
  def model(self, zero_data, covariates):
    # check univariate data
    assert zero_data.size(-1) == 1 
    duration = zero_data.size(-2)

    time, feature = covariates[..., 0], covariates[..., 1:]

    bias = pyro.sample("bias", dist.Normal(0, 10))
    trend_coef = pyro.sample("trend", dist.LogNormal(-2, 1))
    trend = trend_coef * time

    weight = pyro.sample("weight", dist.Normal(0, 1).expand(
        [feature.size(-1)]).to_event(1))
    regressor = (weight * feature).sum(-1)

    # weekly seasonality as indpendent events
    with pyro.plate("day_of_week", 7, dim=-1):
      seasonal = pyro.sample("seasonal", dist.Normal(0, 5))
    seasonal = periodic_repeat(seasonal, duration, dim=-1)

    # predict
    prediction = bias + trend + seasonal + regressor
    # Pyro Forecast is multivariate - univariate timeseries is needed
    prediction = prediction.unsqueeze(-1)

    # heavy tail nose to account for outliers
    stability = pyro.sample("noise_stability", dist.Uniform(1, 2).expand([1]).to_event(1))
    skew = pyro.sample("noise_skew", dist.Uniform(-1, 1).expand([1]).to_event(1))
    scale = pyro.sample("noise_scale", dist.LogNormal(-5, 5).expand([1]).to_event(1))
    noise_dist = dist.Stable(stability, skew, scale)
    with poutine.reparam(config={"residual": StableReparam()}):
      self.predict(noise_dist, prediction)

In [0]:
def transform(pred, truth):
  """
  Helper function to undo transformation
  """
  return pred.exp(), truth.exp()

In [0]:
def bounded_exp(x, bound=1e3):
  return (x-math.log(bound)).sigmoid() * bound

In [0]:
def transform_agg(pred, truth):
  num_samples, duration = pred.size(0), pred.size(-2)
  pred = pred.reshape(num_sample, -1, duration)
  truth = truth.round().reshape(-1, duration).cpu()
  agg_pred = m5_data.aggregate_samples(pred, *m5_data.aggregation_levels)
  agg_truth = m5_data.aggregate_samples(truth.unsqueeze(0),
                                        *m5_data.aggregation_levels).squeeze(0)
  return agg_pred.unsqueeze(-1), agg_truth.unsqueeze(-1)

In [0]:
def forecaster_opt_fn(t0=None, t1=None, t2=None):
  forecaster_opt = {
      "create_plates": create_plates,
      "learning_rate": 0.1,
      "learning_rate_decay": 0.1,
      "clip_norm": 10.,
      "num_steps": 1001,
      "log_every": 100,
      "guide": NormalGuide(create_plates),
  }
  return forecaster_opt

In [11]:
m5_data = M5Data(data_path=DRIVE)
data = m5_data.get_aggregated_sales(m5_data.aggregation_levels[0])[0].unsqueeze(-1)

print("Tensor Shape aggregated data: {}".format(data.shape))

Tensor Shape aggregated data: torch.Size([1913, 1])


In [0]:
# log transform the sh*t out of it
data = data.log()

T0 = 0
T2 = data.size(-2) + 28 # end + submission
time = torch.arange(T0, float(T2), device="cpu") / 365
covariates = torch.cat([
                        time.unsqueeze(-1),
                        # dummy months as features
                        m5_data.get_dummy_day_of_month()[T0:T2],], dim=-1)

if CUDA:
  data = data.cuda()
  covariates = covariates.cuda()
  torch.set_default_tensor_type(torch.cuda.FloatTensor)

# forecaster_opt = {
#     "learning_rate": 0.045,
#     "learning_rate_decay": 0.09,
#     "clip_norm": 10,
#     "num_steps": 1001,
#     "log_every": 100,
# }

forecaster_opt = {
    "learning_rate": 0.05,
    "learning_rate_decay": 0.08,
    "clip_norm": 5,
    "num_steps": 1001,
    "log_every": 100,
}




In [13]:
# do the forecasting and prediction
forecaster = Forecaster(TopDownModel(), data, covariates[:-28], **forecaster_opt)
samples = forecaster(data, covariates, num_samples=1000).exp().squeeze(-1).cpu()
pred = samples.mean(0)

	add_(Number alpha, Tensor other)
Consider using one of the following signatures instead:
	add_(Tensor other, *, Number alpha)
INFO 	 step    0 loss = 4.86266e+06
INFO 	 step  100 loss = 10.9222
INFO 	 step  200 loss = 4.59061
INFO 	 step  300 loss = 1.42875
INFO 	 step  400 loss = 0.0327811
INFO 	 step  500 loss = 0.0363195
INFO 	 step  600 loss = -0.229535
INFO 	 step  700 loss = -0.244174
INFO 	 step  800 loss = -0.262966
INFO 	 step  900 loss = -0.273359
INFO 	 step 1000 loss = -0.265945


In [14]:
print(OUTFILE)

drive/My Drive/kaggle_m5/results/submission.csv


In [0]:
# evaluate accuracy and uncertainty

# top down distributed aggregated forecast sales `pred`
sales_last28 = m5_data.get_aggregated_sales(
    m5_data.aggregation_levels[-1])[:, -28:]
proportion = sales_last28.sum(-1) / sales_last28.sum()
prediction = proportion.ger(pred)

print
m5_data.make_accuracy_submission(OUTFILE, 
                                 prediction)


In [0]:

# use top down for uncertainty prediction
non_agg_samples = samples.unsqueeze(1) * proportion.unsqueeze(-1)
# apply poisson distribution to account for low std and quantile values due to aggregate
non_agg_samples = torch.poisson(non_agg_samples)
agg_samples = m5_data.aggregate_samples(non_agg_samples, 
                                        *m5_data.aggregation_levels)
print("Calculate quantiles...")
q = np.quantile(agg_samples.numpy(), m5_data.quantiles, axis=0)
print("Make Uncertainty Submission...")
filename, ext = os.path.splitext(OUTFILE)
m5_data.make_uncertainty_submission(OUTFILE + "_uncertainty" + ext, q, 
                                    float_format="%.3f")

In [0]:
## Model2 hierarchical 
class HierarchyModel(ForecastingModel):
  def __init__(self, snap, dept, saled, log_ma):
    """
    Store covariates in the constructor, reshaping is expensive
    """
    super().__init__()
    # boolean for indicating SNAP
    assert snap.shape == (10, 1, snap.size(2), 1)
    self.snap = snap
    # one-hot encoding for department of each product
    assert dept.shape == (10, 3049, 1, 7)
    self.dept = dept
    # product availability
    assert saled.shape == (10, 3049, snap.size(2), 3)
    self.saled = saled
    # moving average features - covariate as time-local feature
    assert log_ma.shape == (10, 3049, log_ma.size(2), 3)
    self.log_ma = log_ma

  def model(self, zero_data, covariates):
    # univariate data
    assert zero_data.size(-1) == 1 
    time_index = covariates.squeeze(-1)

    store_plate = pyro.plate("store", num_stores, dim=-3)
    product_plate = pyro.plate("product", num_products, dim=-2)
    day_of_week_plate = pyro.plate("day_of_week", 7, dim=-1)

    snap = self.snap[..., time_index, :]
    # subsample data
    with product_plate:
      dept = pyro.subsample(self.dept, event_dim=1)
      saled = pyro.subsample(self.saled, event_dim=1)[..., time_index, :]
      log_ma = pyro.subsample(self.log_ma, event_dim=1)[..., time_index, :]

    # latent variable for each store and dept
    with store_plate:
      ma_weight = pyro.sample("ma_weight", 
                              dist.Normal(0, 1).expand([2, log_ma.size(-1), 7]).to_event(3))
      ma_weight = ma_weight.matmul(dept.unsqueeze(-2).unsqueeze(-1)).squeeze(-1)
      moving_average = ma_weight.matmul(log_ma.unsqueeze(-1)).squeeze(-1)

      snap_weight = pyro.sample("snap_weight",
                                dist.Normal(0, 1).expand([2, 7]).to_event(2))
      snap_weight = snap_weight.matmul(dept.unsqueeze(-1)).squeeze(-1)
      snap_effect = snap_weight * snap

      with day_of_week_plate:
        seasonal = pyro.sample("seasonal", dist.Normal(0, 1).expand([2, 7]).to_event(2))
      seasonal = seasonal.matmul(dept.unsqueeze(-1)).squeeze(-1)
      seasonal = periodic_repeat(seasonal, duration, dim=-2)
    
    prediction = moving_average + snap_effect + seasonal
    log_mean, log_scale = prediction[..., :1], prediction[..., 1:]
    # add small bias 1e-3 to avoid mean=scale=0
    mean = bounded_exp(log_mean) * saled + 1e-3
    scale = bounded_exp(log_scale) * saled + 1e-3

    rate = scale.reciprocal()
    concentration = mean * rate
    # alt GammaPoisson or NegativeBinomial
    noise_dist = dist.Gamma(concentration, rate)

    with store_plate, product_plate:
      self.predict(noise_dist, mean.new_zeros(mean.shape))



In [0]:
class NormalGuide(PyroModule):
  """
  Custom guide as to not use the AutoNormal default guide 
  """
  def __init__(self, create_plates=None):
    super().__init__()
    # define shapes of sample sites
    self.ma_weight_loc = PyroParam(torch.zeros(10, 1, 1, 2, 3, 7), event_dim=3)
    self.ma_weight_scale = PyroParam(torch.ones(10, 1, 1, 2, 3, 7) * 0.1,
                                     dist.constraints.positive, event_dim=3)
    self.snap_weight_loc = PyroParam(torch.zeros(10, 1, 1, 2, 7), event_dim=2)
    self.snap_weight_scale = PyroParam(torch.ones(10, 1, 1, 2, 7) * 0.1,
                                       dist.constraints.positive, event_dim=2)
    self.seasonal_loc = PyroParam(torch.zeros(10, 1, 7, 2, 7), event_dim=2)
    self.seasonal_scale = PyroParam(torch.ones(10, 1, 7, 2, 7) * 0.1, 
                                    dist.constraints.positive, event_dim=2)
    self.create_plates = create_plates

  def forward(self, data, covariates):
    num_stores = data.size(0)
    if self.create_plates is not None:
      product_plate = self.create_plates(data, covatiates)
      store_plate = pyro.plate("store", num_stores, dim=-3)
      day_of_week_plate = pyro.plate("day_of_week", 7, dim=-1)

    with store_plate:
      pyro.sample("ma_weight", dist.Normal(self.ma_weight_loc, 
                                           self.ma_weight_scale).to_event(3))
      pyro.sample("snap_weight", dist.Normal(self.snap_weight_loc, 
                                             self.snap_weight_scale).to_event(2))
      with day_of_week_plate:
        pyro.sample("seasonal", 
                    dist.Normal(self.seasonal_loc, self.seasonal_scale).to_event(2))

def create_plates(zero_data, covariates):
  return pyro.plate("product", zero_data.shape[1], subsample_size=60, dim=-2)

In [0]:
class M5Forecaster(Forecaster):
  """
  Forecaster that draws subsamples and casts batches to CPU
  will skip unnecessary training data
  """
  def forward(self, data, covariates, num_samples, batch_size=None):
    if batch_size is not None:
      batches = []
      while num_samples > 0:
        batch = self.forward(data, covariates, min(num_samples, batch_size))
        batches.append(batch)
        num_samples -= batch_size
      return torch.cat(batches)
    # skip part that has no conflict with weekly seasonal patterns
    skip = data.size(-2) // 7 * 7
    return super().forward(data[..., skip:, :], covariates[skip:], num_samples).cpu()

In [0]:
data = m5_data.get_aggregated_sales(m5_data.aggregation_levels[-1])
data = data.reshape(10, 3049, -1, 1)

T0 = 37 + 28*3 # skip small begin to calc moving average
T2 = data.size(-2) + 28 
T1 = T2 - 28 # train/test split
assert (T2 - T0) % 28 == 0

covariates = torch.arange(T2).unsqueeze(-1)
snap = m5_data.get_snap().repeat_interleave(torch.tensor([4, 3, 3]), dim=-1)
snap = snap.t().unsqueeze(1).unsqueeze(-1)
dept = m5_data.get_dummy_dept().reshape(10, -1, 7).unsqueeze(-2)
saled = (m5_data.get_prices() != 0).type(torch.get_default_dtype()).reshape(
    10, 3049, -1, 1)

ma28x1 = data.unfold(-2, 28 * 1, 1).mean(-1)
ma28x1 = torch.nn.functional.pad(ma28x1, (0, 0, 27+28*1, 0))
ma28x2 = data.unfold(-2, 28 * 2, 1).mean(-1)
ma28x2 = torch.nn.functional.pad(ma28x2, (0, 0, 27+28*2, 0))
ma28x3 = data.unfold(-2, 28 * 3, 1).mean(-1)
ma28x3 = torch.nn.functional.pad(ma28x3, (0, 0, 27+28*3, 0))
log_ma = torch.cat([ma28x1, ma28x2, ma28x3], -1).clamp(min=1e-3).log()

del ma28x1, ma28x2, ma28x3

DEVICE = "gpu"
data = data.clamp(min=1e-3).to(DEVICE)
covariates = covariates.to(DEVICE)
snap = snap.to(DEVICE)
dept = dept.to(DEVICE)
saled = saled.to(DEVICE)
log_ma = log_ma.to(DEVICE)

if CUDA:
  torch.set_default_tensor_type(torch.cuda.FloatTensor)



RuntimeError: ignored