In [None]:
import pandas as pd
import numpy as np
import torch
import matplotlib.pyplot as plt

## Load Data

In [None]:
df = pd.read_csv("csv_agile_H_Southern_England.csv", header=None)
df = df.rename(columns={
    0:"date",
    1:"time",
    4:"cost"
})
df = df[["date","time", "cost"]]


In [None]:
#Training dataset
df_train = df[df.date<='2022-01-01'] #before 2022
#Test dataset
df_test =  df[df.date>'2022-01-01'] # after 2022

# print("test", len(df_test) / (len(df_train)+len(df_train)))

In [None]:
df_test

## Pre-process

In [None]:
length = 128 #length of sequence

#Sliding Window
def sliding_window(array):
    window_step = 1
    window_size = 128 #2.6 days
    return np.lib.stride_tricks.sliding_window_view(array, window_size)

train = sliding_window(df_train["cost"].to_numpy())
test = sliding_window(df_test["cost"].to_numpy())

print(train.shape)
print(test.shape)

## Define RNN

In [None]:
#Define RNN Model
#https://pytorch.org/docs/stable/generated/torch.nn.RNN.html

input_size = 2
output_size = 1
hidden_size = 5
num_layers = 2
activation = "relu" #relu or tanh
bidirectional = False

class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        self.rnn = torch.nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=0,
            nonlinearity=activation,
            bidirectional=bidirectional,
            batch_first=False
        )
        # compress output to the same dim as y
        self.linear = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden_prev):
        out, hidden_prev = self.rnn(x, hidden_prev) # [1, seq, h] => [seq, h]  (batch=1)
        out = out.reshape(-1, hidden_size)  # stack batch and seq

        # linear layer so that output is not [seq,h] but [seq, 1]
        # so it is comparable with y, for loss calculation
        out = self.linear(out)  # [seq, h] => [seq, 1]
        out = out.unsqueeze(dim=0)  # => [1, seq, 1]
        return out, hidden_prev

rnn = Net()
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(rnn.parameters(), lr=0.001)

# rnn= torch.nn.RNN(input_size=input_size, hidden_size=hidden_size,num_layers=num_layers,
#                   dropout=0, nonlinearity="relu", bidirectional=bidirectional)



## Train

In [None]:
batch_size = 1 #number of batches for training

def train_rnn(rnn, n_steps, print_every):

    # initialize the hidden state
    if bidirectional:
        hidden = torch.zeros(2*num_layers,batch_size,hidden_size)
    else:
        hidden = torch.zeros(num_layers,batch_size,hidden_size)

    for batch_i, step in enumerate(range(n_steps)):
        # ; y: 49 points 1-50
        data=train[batch_i]
        print(data[:-1].shape)
        break
        x = torch.tensor(data[:-1]).float().reshape(length - 1, batch_size, input_size) # x: 127 points 0-126 [length,batch_size,input_size]
        # x = torch.randn(length-1,batch_size,input_size)
        y = torch.tensor(data[1:]).float().reshape(length - 1, batch_size, input_size)  # y: 127 points 1-127  [length,batch_size,output_size]

        # outputs from the rnn
        prediction, hidden = rnn(x, hidden)
        # prediction = x

        # Representing Memory #
        hidden = hidden.data # make a new variable for hidden and detach the hidden state from its history
                            # this way, we don't backpropagate through the entire history

        # calculate the loss
        loss = criterion(prediction, y)

        optimizer.zero_grad()# zero gradients: maps 0 to None to save memory

        # perform backprop and update weights
        loss.backward()
        optimizer.step()

        # display loss and predictions
        if batch_i%print_every == 0:
            print('Loss: ', loss.item())
    return rnn

rnn = train_rnn(rnn, 2000, 10)

print(rnn)

## Test

In [None]:
if bidirectional:
    hidden = torch.zeros(2*num_layers,batch_size,hidden_size)
else:
    hidden = torch.zeros(num_layers,batch_size,hidden_size)

total_loss = 0

for batch_i in range(len(test)):#
    data=test[batch_i]
    x = torch.tensor(data[:-1]).float().reshape(length - 1, batch_size, input_size)  # [seq_len, b, fea_len]
    y = torch.tensor(data[1:]).float().reshape(length - 1, batch_size, input_size)  # [seq_len, b, fea_len]

    prediction, hidden = rnn(x, hidden)
    hidden = hidden.data
    total_loss += criterion(prediction, y)

print(total_loss / len(test))

In [None]:
plt.plot(np.linspace(0,127,127), data[:-1], label="Test")
plt.plot(np.linspace(0,127,127), prediction.detach().reshape(127), label="Prediction")
plt.legend()