In [None]:
from kaggle_datasets import KaggleDatasets
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from tensorflow.keras.layers import DenseFeatures, RepeatVector, Dense
from tensorflow.keras.layers import ZeroPadding2D, GlobalAveragePooling2D
from tensorflow.keras import backend as K
from tensorflow.python.feature_column import feature_column_v2 as fc
from tensorflow.python.feature_column import sequence_feature_column as seq_fc
import pandas as pd
from random import randint

In [None]:
BATCH_SIZE = 32 #1024
SEQ_LEN = 1941
SET_SIZE = 42840
TRAIN_SIZE = 16 * SET_SIZE // 25
VAL_SIZE = 4 * SET_SIZE // 25
GCS_PATH = KaggleDatasets().get_gcs_path("complete-m5-uncertainty-validation-records")

### Prep for Data Import

In [None]:
cal_dict = pd.read_csv(
    "../input/m5-forecasting-uncertainty/calendar.csv"
).rename(
    columns={"event_name_1": "event",
             "wday": "day"}
).fillna(
    {"event": "none"}
).astype(
    {"event": pd.StringDtype()}
).assign(
    day=lambda x: x.day - 1,
    month=lambda x: x.month - 1,
    year=lambda x: x.year - 2011
).loc[:, "day":"event"].to_dict(orient="list")
del cal_dict["d"]

def to_apropros_tensor(array):
    tensor = tf.constant(array)
    shaped_tensor = tensor[tf.newaxis, ...]
    repeat_tensor = tf.repeat(shaped_tensor[:, :SEQ_LEN - 28],
                             [BATCH_SIZE], axis=0)
    
    # this conditional is necessary because `tf.sparse.from_dense`
    # only supports numeric types
    if repeat_tensor.dtype == tf.string:
        #bool_tensor = repeat_tensor != "none"
        return tf.sparse.SparseTensor(
            tf.where(tf.ones(repeat_tensor.shape)),
            tf.reshape(repeat_tensor, [-1]),
            repeat_tensor.shape)
    
    else:
        return tf.sparse.from_dense(repeat_tensor)

cal_dict = {col: to_apropros_tensor(array)
                for col, array in cal_dict.items()}

The following produces the feature descriptions that will be used for parsing the Example protos, and implementing the feature column transformations.

In [None]:
# Defines the context feature column specifications.
keys_and_buckets = {
    "dept_id": 3,
    "cat_id": 3,
    "store_id": 4,
    "state_id": 3
}

columns = [tf.feature_column.categorical_column_with_identity(
    key, num_buckets
) for key, num_buckets in keys_and_buckets.items()]

columns = [tf.feature_column.indicator_column(col)
                for col in columns] # FUCKED WITH ALL OF THIS

item = fc.categorical_column_with_hash_bucket("item_id", 1000)
cntx_ftr.append(fc.embedding_column(item, 5))
                
# Defines the sequence feature column specifications.
seq_ftr = [seq_fc.sequence_categorical_column_with_identity(
    key, num_buckets
) for key, num_buckets in zip(["day", "month", "year"], [7, 12, 6])]

seq_ftr = [fc.indicator_column(col) for col in seq_ftr]

event = seq_fc.sequence_categorical_column_with_hash_bucket("event", 30)
seq_ftr.extend([    
    fc.embedding_column(event, 2),
    seq_fc.sequence_numeric_column("units")
])

In the next block, the `cntx_ftr_prs` and `seq_ftr_prs` dicts are for parsing the context and sequence features respectively.

In [None]:
cntx_ftr_prs = tf.feature_column.make_parse_example_spec(cntx_ftr)
cntx_ftr_prs["weights"] = tf.io.FixedLenFeature([], tf.float32)
cntx_ftr_prs["id"] = tf.io.FixedLenFeature([], tf.string)

seq_ftr_prs = tf.feature_column.make_parse_example_spec(seq_ftr[-1:])

### Custom layers

In [None]:
# CHECK
class CausalConv2D(keras.layers.Conv2D):
    """This is an extremely ad hoc layer designed to perform causal 2d
    convolutions during the phase when the width of the layer is expanded.
    As such, there is limited accessibility; for instance, use of bias is
    necessary."""
    
    def call(self, inputs):
        outputs = tf.nn.conv2d(
            inputs, filters=self.kernel, strides=self.strides,
            padding=[[0, 0], [self.kernel_size[0] // 2 + 1] * 2,
                     [self.kernel_size[1] - 1, 0], [0, 0]],
            dilations=self.dilation_rate)
        
        outputs = tf.nn.bias_add(outputs, self.bias)
        return self.activation(outputs)
    
    def compute_output_shape(self, input_shape):
        output_shape = tf.TensorShape(input_shape).as_list()
        output_shape[1] += 2
        output_shape[-1] = self.filters
        return tf.TensorShape(output_shape)

In [None]:
# CHECK
class CausalSeperableConv2D(keras.layers.SeparableConv2D):
    """This is an extremely ad hoc layer. It was construted to perform
    causal, seperable 2d convolutions. As such, there is limited
    customizability; for instance, use of bias is necessary."""
    
    def call(self, inputs):
        outputs = K.spatial_2d_padding(
            inputs, padding=((1, 1), (2 * self.dilation_rate[1], 0)))
        outputs = tf.nn.depthwise_conv2d(
            outputs, self.depthwise_kernel, strides=(1, *self.strides, 1),
            padding="VALID",
            dilations=self.dilation_rate)
        outputs = tf.nn.conv2d(
            inputs, self.pointwise_kernel, strides=1, padding="VALID")
        outputs = tf.nn.bias_add(outputs, self.bias)
        return self.activation(outputs)
    
    def compute_output_shape(self, input_shape):
        output_shape = tf.TensorShape(input_shape).as_list()
        output_shape[-1] = self.filters
        return tf.TensorShape(output_shape)

### Custom Blocks

In [None]:
# CHECK
class SEBlock(keras.layers.Layer):
    """A simple squeeze and excitation block as originally described
    in..."""
    
    def __init__(self, n_filters, context=True, squeeze_factor=16, **kwargs):
        super().__init__(**kwargs)      
        self.context = context
        self.global_avg_pool = GlobalAveragePooling2D()
        self.dense_1 = Dense(n_filters // squeeze_factor, activation="swish",
                             kernel_initializer="he_normal")
        
        self.dense_2 = Dense(n_filters, activation="sigmoid",
                             kernel_initializer="glorot_normal")
    
    def call(self, inputs):
        if self.context:
            inputs, context = inputs
            
        outputs = self.global_avg_pool(inputs)
        if self.context:
            context = keras.layers.add([outputs, context])
            outputs = self.dense_1(context)
            return self.dense_2(outputs), context
        else:
            outputs = self.dense_1(outputs)
            return self.dense_2(outputs)
    
    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config,
            "n_filters": self.dense_2.units,
            "context": self.context,
            "squeeze_factor": self.dense_2.units // self.dense_1.units # yes, this may be larger than the passed
                                                                       # squeeze_factor, and no it does not
                                                                       # matter because we will have the same
                                                                       # implemented squeeze_factor
        }

In [None]:
# CHECK
# should add ability to stop context flow
class BeachModule(keras.layers.Layer):
    """Implements the `CausalSeperableConv2D` class defined earlier
    which applies a seperable convolution, with SAME padding in the
    height (quantiles) dimension and causal padding in the width
    (time) dimension. It then applies a squeeze and excitation
    layer, due to the information loss induced by global average
    pooling, and the squeeze layer, this block should not break
    causality during training.
    
    MAKE A NOTE ABOUT CONTEXT."""
    
    def __init__(self, n_filters, dilation_rate,
                 squeeze_factor=16,
                 context=True, **kwargs):
        super().__init__(**kwargs)
        self.causal_sep_atrous_conv = CausalSeperableConv2D(
            n_filters, kernel_size=3,
            dilation_rate=(1, dilation_rate),
            activation= "swish",
            depthwise_initializer="he_normal",
            pointwise_initializer="he_normal")
        
        self.context = context
        self.se_block = SEBlock(n_filters, squeeze_factor=squeeze_factor,
                                context=context)
        if self.context:
            self.dense = Dense(n_filters, activation="swish",
                               kernel_initializer="he_normal")
    
    def call(self, inputs):
        inputs, context = inputs
        outputs = self.causal_sep_atrous_conv(inputs)
        if self.context:
            context = self.dense(context)
            se, context = self.se_block([outputs, context])
        else:
            se = self.se_block(outputs)
            
        outputs = keras.layers.multiply([outputs, se])
        return [outputs + inputs, context, outputs]
    
    #def compute_output_shape(self, batch_input_shape):
    #    return [*batch_input_shape, batch_input_shape[0]]
    
    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config,
            "n_filters": self.causal_sep_atrous_conv.filters,
            "dilation_rate": self.causal_sep_atrous_conv.dilation_rate[1],
            "context": self.context,
            "squeeze_factor": self.se_block.get_config()["squeeze_factor"]
        }

In [None]:
# CHECK
class XWideWave(keras.layers.Layer):
    
    def __init__(self, n_layers, n_filters, **kwargs):
        super().__init__(**kwargs)
        self.causal_conv = CausalConv2D(
            n_filters, kernel_size=3,
            activation="swish",
            kernel_initializer="he_normal")
        
        self.layers = [BeachModule(n_filters, 3 ** i)
                           for i in range(n_layers)]
    
    def call(self, inputs):
        inputs, context = inputs
        outputs = self.causal_conv(inputs)
        
        skips = tf.zeros([1, 1, 1, self.causal_conv.filters])
        for layer in self.layers:
            outputs, context, skip = layer([outputs, context])
            skips += skip
        
        return [outputs, context, skips]
    
    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config,
            "n_filters": self.causal_conv.filters,
            "n_layers": len(self.layers)
        }

### Other Training Stuff

In [None]:
# Check
class ScaledPinballLoss(keras.losses.Loss):
    
    def __init__(self, memory=1, **kwargs):
        super().__init__(**kwargs)
        self.memory = -memory
        self.taus = tf.constant(
            [0.005, 0.025, 0.165, 0.25, 0.5, 0.75, 0.835, 0.975, 0.995],
            shape=[9, 1, 1])
    
    def call(self, y_true, q_pred):
        abs_diff = tf.squeeze(tf.math.abs(y_true[:, 1:-27] - y_true[:, 0:-28]))
        if self.memory == -1:
            mad = tf.math.reduce_mean(abs_diff, axis=-1, keepdims=True)
        else:
            mad = tf.math.cumsum(abs_diff, axis=-1)
            mad /= tf.range(1, SEQ_LEN - 27, dtype=tf.float32)
            mad = tf.where(mad == 0, 1., mad)
            mad = mad[:, self.memory:]
                   
        # the following reshapes and copies `y_true` into the necessary
        # form for the evaluation of `q_pred`
        y_true = y_true[:, self.memory - 27:]
        q_pred = q_pred[..., self.memory:, :]
        
        if self.memory == -1:
            y_true = tf.transpose(y_true, perm=[0, 2, 1])
        else:
            stacks = [y_true[:, i: i - 27] for i in range(0, 27)]
            stacks.append(y_true[:, 27:])
            y_true = tf.concat(stacks, axis=-1)
                                          
        out = tf.expand_dims(y_true, axis=1) - q_pred
        out *= tf.where(out < 0, self.taus - 1., self.taus)
        return tf.math.reduce_mean(out, [1, -1]) / mad 
    
    def __call__(self, y_true, y_pred, sample_weight=None):
        if sample_weight is not None and self.memory == -1:
            sample_weight -= 1
        
        return super().__call__(y_true, y_pred, sample_weight=sample_weight)

### Training

In [None]:
# CHECK
def parse(batch):
    cntx_dict, seq_dict, _ = tf.io.parse_sequence_example(
        batch,
        context_features=cntx_ftr_prs,
        sequence_features=seq_ftr_prs)
    
    id_ = cntx_dict.pop("id")
    weights = cntx_dict.pop("weights") + 1 # allows us to glean information from
                                           # sequences that would otherwise be
                                           # so weightless as to have no effect
    units = seq_dict["units"]
    y_true = tf.sparse.to_dense(units)
    units = tf.sparse.slice(units, [0, 0, 0], [BATCH_SIZE, SEQ_LEN - 28, 1])
    seq_dict["units"] = units / tf.sparse.reduce_max(units, axis=[1, 2],
                                                     keepdims=True)
    
    seq_dict.update(cal_dict)
    return (cntx_dict, seq_dict), y_true, weights
    

def repeat_batch_and_parse(dataset, batch_size=BATCH_SIZE):
    return dataset.repeat(
    ).batch(batch_size, drop_remainder=True
    ).map(
        parse,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
lr_scheduler = tfa.optimizers.TriangularCyclicalLearningRate(
    initial_learning_rate=0.001,
    maximal_learning_rate=0.002,
    step_size=4)