In [None]:
import torch
import torch.nn as nn

import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

In [None]:
class BiDirectionalRecurrentNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.1):
        super(BiDirectionalRecurrentNetwork, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        self.Wx_forward = nn.Parameter(torch.randn(hidden_size, input_size) * torch.sqrt(torch.tensor(2.0 / (input_size + hidden_size))))
        self.Wh_forward = nn.Parameter(torch.randn(hidden_size, hidden_size) * torch.sqrt(torch.tensor(1.0 / hidden_size)))
        self.bh_forward = nn.Parameter(torch.zeros(hidden_size, 1))

        self.Wx_backward = nn.Parameter(torch.randn(hidden_size, input_size) * torch.sqrt(torch.tensor(2.0 / (input_size + hidden_size))))
        self.Wh_backward = nn.Parameter(torch.randn(hidden_size, hidden_size) * torch.sqrt(torch.tensor(1.0 / hidden_size)))
        self.bh_backward = nn.Parameter(torch.zeros(hidden_size, 1))

        self.Wy_forward = nn.Parameter(torch.randn(output_size, hidden_size) * torch.sqrt(torch.tensor(2.0 / (hidden_size + output_size))))
        self.Wy_backward = nn.Parameter(torch.randn(output_size, hidden_size) * torch.sqrt(torch.tensor(2.0 / (hidden_size + output_size))))
        self.by = nn.Parameter(torch.zeros(output_size, 1))

        self.learning_rate = learning_rate
        self.sigmoid = nn.Sigmoid()
        self.tanh = nn.Tanh()
        self.mse_loss = nn.MSELoss()
        
    def forward(self, x):
        self.h_forward_hist = [torch.zeros(self.hidden_size, 1)]
        for t in range(len(x)):
            x_t = x[t].view(-1, 1)
            h_forward_t_1 = self.h_forward_hist[-1]
            h_forward_t = self.tanh(self.Wx_forward @ x_t + self.Wh_forward @ h_forward_t_1 + self.bh_forward)
            self.h_forward_hist.append(h_forward_t)
        self.h_forward_hist = self.h_forward_hist[1:]
        
        self.h_backward_hist = [torch.zeros(self.hidden_size, 1)]
        for t in reversed(range(len(x))):
            x_t = x[t].view(-1, 1)
            h_backward_t_1 = self.h_backward_hist[-1]
            h_forward_t = self.tanh(self.Wx_backward @ x[t].view(-1, 1) + self.Wh_backward @ h_backward_t_1 + self.bh_backward)
            self.h_backward_hist.append(h_forward_t)
        self.h_backward_hist = self.h_backward_hist[1:][::-1]
        
        self.y_pred = [self.sigmoid(self.Wy_forward @ h_forward_t + self.Wy_backward @ h_backward_t + self.by) for h_forward_t, h_backward_t in zip(self.h_forward_hist, self.h_backward_hist)]
        return self.y_pred

    def print_mse(self, X, y):
        y_pred = torch.cat(self.forward(X)).view(-1)
        mse = self.mse_loss(y.view(-1), y_pred)
        print(f"MSE: {mse.item()}")

    def backward(self, x, y):
        optimizer = torch.optim.SGD(self.parameters(), lr=self.learning_rate)
        optimizer.zero_grad()

        y_pred = self.forward(x)
        y_pred_tensor = torch.cat(y_pred).view(-1)
        loss = self.mse_loss(y.view(-1), y_pred_tensor)
        loss.backward()
        optimizer.step()


In [None]:
# Parameter split_percent defines the ratio of training examples
def get_train_test(url, split_percent=0.8):
    df = pd.read_csv(url, usecols=[1], engine='python')
    data = np.array(df.values.astype('float32'))
    scaler = MinMaxScaler(feature_range=(0, 1))
    data = scaler.fit_transform(data).flatten()
    n = len(data)
    # Point for splitting data into train and test
    split = int(n*split_percent)
    train_data = data[range(split)]
    test_data = data[split:]
    return train_data, test_data, data
 
sunspots_url = 'https://raw.githubusercontent.com/jbrownlee/Datasets/master/monthly-sunspots.csv'
train_data, test_data, data = get_train_test(sunspots_url)

In [None]:
input_size, hidden_size, output_size = 1, 5, 1
self = BiDirectionalRecurrentNetwork(input_size, hidden_size, output_size)

In [None]:
x = torch.tensor(train_data[:-1])  # Input data
y = torch.tensor(train_data[1:])  # Target data

In [None]:
for _ in range(10):
    self.forward(x)
    self.backward(x, y)
    self.print_mse(x,y)