## Implementation of Conformal RNN

In [17]:
import numpy as np
import typing
import torch

In [16]:
np.random.seed(0)
means = np.random.choice(range(0,100,1),size=2000)
sdv = np.random.choice(range(0,100,1),size=2000)

In [None]:
np.random.random(0,5)
ys = np.random.normal()

In [None]:
class AuxiliaryForecaster(torch.nn.Module):
    """
    The auxiliary RNN issuing point predictions.
    Point predictions are used as baseline to which the (normalised)
    uncertainty intervals are added in the main CFRNN network.
    """

    def __init__(self, embedding_size, input_size=1, output_size=1, horizon=1, rnn_mode="LSTM", path=None):
        """
        Initialises the auxiliary forecaster.
        Args:
            embedding_size: hyperparameter indicating the size of the latent
                RNN embeddings.
            input_size: dimensionality of the input time-series
            output_size: dimensionality of the forecast
            horizon: forecasting horizon
            rnn_mode: type of the underlying RNN network
            path: optional path where to save the auxiliary model to be used
                in the main CFRNN network
        """
        super(AuxiliaryForecaster, self).__init__()
        # input_size indicates the number of features in the time series
        # input_size=1 for univariate series.
        self.input_size = input_size
        self.embedding_size = embedding_size
        self.horizon = horizon
        self.output_size = output_size
        self.path = path

        self.rnn_mode = rnn_mode
        if self.rnn_mode == "RNN":
            self.forecaster_rnn = torch.nn.RNN(input_size=input_size, hidden_size=embedding_size, batch_first=True)
        elif self.rnn_mode == "GRU":
            self.forecaster_rnn = torch.nn.GRU(input_size=input_size, hidden_size=embedding_size, batch_first=True)
        else:  # self.mode == 'LSTM'
            self.forecaster_rnn = torch.nn.LSTM(input_size=input_size, hidden_size=embedding_size, batch_first=True)
        self.forecaster_out = torch.nn.Linear(embedding_size, horizon * output_size)

    def forward(self, x, state=None):
        if state is not None:
            h_0, c_0 = state
        else:
            h_0 = None

        # [batch, horizon, output_size]
        if self.rnn_mode == "LSTM":
            _, (h_n, c_n) = self.forecaster_rnn(x.float(), state)
        else:
            _, h_n = self.forecaster_rnn(x.float(), h_0)
            c_n = None

        out = self.forecaster_out(h_n).reshape(-1, self.horizon, self.output_size)

        return out, (h_n, c_n)

    def fit(self, train_dataset, batch_size, epochs, lr):
        """
        Trains the auxiliary forecaster to the training dataset.
        Args:
            train_dataset: a dataset of type `torch.utils.data.Dataset`
            batch_size: batch size
            epochs: number of training epochs
            lr: learning rate
        """
        train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        criterion = torch.nn.MSELoss()

        self.train()
        for epoch in range(epochs):
            train_loss = 0.0

            for sequences, targets, lengths in train_loader:
                optimizer.zero_grad()

                out, _ = self(sequences)
                valid_out = out * get_lengths_mask(sequences, lengths, self.horizon)

                loss = criterion(valid_out.float(), targets.float())
                loss.backward()

                train_loss += loss.item()

                optimizer.step()

            mean_train_loss = train_loss / len(train_loader)
            if epoch % 50 == 0:
                print("Epoch: {}\tTrain loss: {}".format(epoch, mean_train_loss))

        if self.path is not None:
            torch.save(self, self.path)

In [None]:
NUM_HORIZON = 5
ALPHA = .05
def conformal_rnn(model,calibration,test,H=NUM_HORIZON,alpha=ALPHA):
    X_t = calibration[:,:T] # torch tensor of size (m,T)
    Y_h = calibration[:,T:] # torch tensor of size (m, H)
    calibration_scores = [[] for i in range(H)]
    M = len(X_t.shape[0])
    new_epsilons = []
    for m in range():
        temp_output, _ = model(X_t[i,:]) #tensor of H values
        for h in range(H):
            epsilons.append([np.abs(temp_output[h] - Y_h[h])])
    
    for h in range(H):
        cut_point = np.ceil((M + 1)(1-alpha/H))/M
        new_epsilons.append(np.quantile(calibration_scores,cut_point))
    

    return new_epsilons

In [None]:
embedding_size = 20
learning_rate =0.01
batch_size = 100
epochs = 1000
LSTM = AuxiliaryForecaster(emdedding_size, )

In [None]:
## Adaptive ConformalRNN