### Notebook used to replicate the GRU model used for time series data.

##### The original publication is:
##### Cho, K., et al. (2014). Learning Phrase Representations using RNN Encoder-Decoder for Statistical Machine Translation. https://doi.org/10.48550/arXiv.1406.1078

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.tensorboard import SummaryWriter

In [2]:
writer = SummaryWriter('runs/GRU_1')
%load_ext tensorboard
%tensorboard --logdir runs

In [3]:
# Custom GRU cell implementation
class GRUCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(GRUCell, self).__init__()
        self.hidden_size = hidden_size
        self.W_ir = nn.Linear(input_size, hidden_size)
        self.W_hr = nn.Linear(hidden_size, hidden_size)
        self.W_iz = nn.Linear(input_size, hidden_size)
        self.W_hz = nn.Linear(hidden_size, hidden_size)
        self.W_in = nn.Linear(input_size, hidden_size)
        self.W_hn = nn.Linear(hidden_size, hidden_size)

    def forward(self, input, hidden):
        r = torch.sigmoid(self.W_ir(input) + self.W_hr(hidden))
        z = torch.sigmoid(self.W_iz(input) + self.W_hz(hidden))
        n = torch.tanh(self.W_in(input) + r * self.W_hn(hidden))
        hidden_next = (1 - z) * n + z * hidden
        return hidden_next

# Encoder and Decoder with custom GRU cell
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru_cell = GRUCell(hidden_size, hidden_size)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        hidden = self.gru_cell(embedded, hidden)
        return hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size)

class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru_cell = GRUCell(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        output = self.embedding(input).view(1, 1, -1)
        output = torch.relu(output)
        hidden = self.gru_cell(output, hidden)
        output = self.softmax(self.out(hidden[0]))
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size)


In [4]:
# Parameters
input_size = 10  # Vocabulary size
hidden_size = 16  # Size of hidden state
output_size = 10  # Output vocabulary size
max_length = 5  # Maximum sequence length

# Instantiate the model
encoder = EncoderRNN(input_size, hidden_size)
decoder = DecoderRNN(hidden_size, output_size)

# Loss and optimizer
criterion = nn.NLLLoss()
encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.01)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.01)

# Training
for epoch in range(100):
    for _ in range(10):
        # Generate a random sequence
        input_seq = torch.tensor(np.random.choice(input_size, max_length), dtype=torch.long)
        target = input_seq.numpy()[::-1].copy()
        target_seq = torch.tensor(target, dtype=torch.long)

        # Initialize hidden state and loss
        encoder_hidden = encoder.initHidden()
        loss = 0

        # Encoder
        for i in range(input_seq.size(0)):
            encoder_hidden = encoder(input_seq[i], encoder_hidden)

        # Decoder
        decoder_input = torch.tensor([input_seq.size(0) - 1])  # Start token
        decoder_hidden = encoder_hidden
        for i in range(target_seq.size(0)):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            loss += criterion(decoder_output, target_seq[i].unsqueeze(0))
            decoder_input = target_seq[i]  # Teacher forcing

        # Backpropagation
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()

    print(f'Epoch {epoch + 1}, Loss: {loss.item() / max_length:.4f}')


Epoch 1, Loss: 2.3165
Epoch 2, Loss: 2.4133
Epoch 3, Loss: 2.3337
Epoch 4, Loss: 1.9344
Epoch 5, Loss: 2.4961
Epoch 6, Loss: 2.1230
Epoch 7, Loss: 2.3756
Epoch 8, Loss: 2.0419
Epoch 9, Loss: 2.2257
Epoch 10, Loss: 2.1194
Epoch 11, Loss: 2.0342
Epoch 12, Loss: 1.8471
Epoch 13, Loss: 1.8658
Epoch 14, Loss: 1.6227
Epoch 15, Loss: 2.2170
Epoch 16, Loss: 1.6445
Epoch 17, Loss: 1.9594
Epoch 18, Loss: 2.0793
Epoch 19, Loss: 1.9135
Epoch 20, Loss: 1.6522
Epoch 21, Loss: 2.1921
Epoch 22, Loss: 1.4983
Epoch 23, Loss: 1.3690
Epoch 24, Loss: 1.5570
Epoch 25, Loss: 2.0763
Epoch 26, Loss: 1.5262
Epoch 27, Loss: 1.6865
Epoch 28, Loss: 2.0822
Epoch 29, Loss: 1.4526
Epoch 30, Loss: 1.4956
Epoch 31, Loss: 1.5620
Epoch 32, Loss: 1.7888
Epoch 33, Loss: 1.5322
Epoch 34, Loss: 1.3774
Epoch 35, Loss: 1.5272
Epoch 36, Loss: 1.0284
Epoch 37, Loss: 1.0567
Epoch 38, Loss: 1.4462
Epoch 39, Loss: 1.6150
Epoch 40, Loss: 1.4043
Epoch 41, Loss: 1.6527
Epoch 42, Loss: 0.9711
Epoch 43, Loss: 1.3923
Epoch 44, Loss: 1.73

In [5]:
x = torch.randn(5)
encoder_hidden = encoder.initHidden()
# encoder_hidden = encoder(input_seq[i], encoder_hidden)
# print(hasattr(encoder, "forward"))
writer.add_graph(encoder, (input_seq[i], encoder_hidden))
# writer.add_graph(encoder,x)

In [6]:
# Test the model
with torch.no_grad():
    input_seq = torch.tensor([1, 2, 3, 4, 5], dtype=torch.long)
    print(f'Input sequence: {input_seq.numpy()}')

    # Encode the input sequence
    encoder_hidden = encoder.initHidden()
    for i in range(input_seq.size(0)):
        encoder_hidden = encoder(input_seq[i], encoder_hidden)

    # Decode the encoded sequence
    decoder_input = torch.tensor([input_seq.size(0) - 1])  # Start token
    decoder_hidden = encoder_hidden
    output_seq = []
    for i in range(max_length):
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
        topv, topi = decoder_output.topk(1)
        decoder_input = topi.squeeze().detach()
        output_seq.append(decoder_input.item())
        if decoder_input.item() == 0:
            break

    print(f'Output sequence: {output_seq}')

Input sequence: [1 2 3 4 5]
Output sequence: [5, 2, 3, 4, 4]
