<a href="https://colab.research.google.com/github/hissain/mlworks/blob/main/codes/Simple_RNN_Implementation_M2O.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import
%matplotlib inline
import math
import torch
from torch import nn
from torch.nn import functional as F

In [None]:
torch.manual_seed(0)

<torch._C.Generator at 0x7dd5243219f0>

In [None]:
class RNNLayer(torch.nn.Module):
    def __init__(self, input_size, hidden_size):
        super(RNNLayer, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # Weight matrices for input and hidden layer connections
        self.W_xh = torch.nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_hh = torch.nn.Parameter(torch.randn(hidden_size, hidden_size))
        # Bias term for hidden layer
        self.b_h = torch.nn.Parameter(torch.zeros(hidden_size))

    def forward(self, input_data, hidden_state=None):
        """
        Performs a forward pass through the RNN layer.

        Args:
            input_data: A tensor of shape (batch_size, seq_len, input_size) representing the input sequence.
            hidden_state: A tensor of shape (batch_size, hidden_size) representing the initial hidden state (optional).

        Returns:
            prediction: A tensor of shape (batch_size, output_size) representing the prediction from this model.
            hidden_state: A tensor of shape (batch_size, hidden_size) representing the final hidden state.
        """
        batch_size, seq_len, _ = input_data.size()

        # Initialize hidden state if not provided
        if hidden_state is None:
            hidden_state = torch.zeros(batch_size, self.hidden_size)

        # Loop through the sequence
        for t in range(seq_len):
            # Calculate current hidden state
            hidden_state = torch.tanh(
                # (batch_size, input_size) x (input_size, hidden_size)
                # = (batch_size, hidden_size)
                torch.mm(input_data[:, t, :], self.W_xh) + \
                # (batch_size, hidden_size) x (hidden_size, hidden_size)
                # = (batch_size, hidden_size)
                torch.mm(hidden_state, self.W_hh) + \
                # hidden_size
                self.b_h
            )

        return hidden_state


In [None]:
class RNNModel(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNNModel, self).__init__()
        self.rnn = RNNLayer(input_size, hidden_size)
        self.fc = torch.nn.Linear(hidden_size, output_size)

    def forward(self, inputs):
        """
        Performs a forward pass through the RNN model.

        Args:
            inputs: A tensor of shape (batch_size, seq_len, input_size) representing the input sequence.

        Returns:
            prediction: A tensor of shape (batch_size, seq_len, output_size) representing the model output at each time step.
        """
        # Pass data through RNN layer
        hidden_state = self.rnn(inputs)

        # Apply final linear layer
        prediction = self.fc(hidden_state)

        return prediction


In [None]:
# Define hyperparameters
input_size =  3  # Vocabulary size (assuming one-hot encoded words)
hidden_size = 20
output_size = 3

In [None]:
# Create model instance
model = RNNModel(input_size, hidden_size, output_size)

In [None]:
# Example input sequence (one-hot encoded)
inputs = torch.tensor([
    [[1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0]],
    [[0, 1, 0], [1, 0, 0], [0, 0, 1], [1, 0, 0]]
    ], dtype=torch.float
)
print(inputs.size())

torch.Size([2, 4, 3])


In [None]:
outputs = torch.tensor([1, 2], dtype=torch.int64)

In [None]:
# Pass the sequence through the model
with torch.no_grad():
    predictions = model(inputs)
    print(predictions.size())
    predictions = torch.argmax(predictions, dim=1)
    print(predictions.size())

torch.Size([2, 3])
torch.Size([2])


In [None]:
outputs.tolist()

[1, 2]

In [None]:
predictions.tolist()

[2, 2]

In [None]:
# Accuracy before training
acc = (torch.sum(predictions == outputs) / predictions.numel()).item()
print(f"Accuracy: ", acc)

Accuracy:  0.5


In [None]:
# Define hyperparameters
learning_rate = 0.001
num_epochs = 50

In [None]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Training loop
for epoch in range(num_epochs):
    total_loss = 0
    for i in range(len(inputs)):
        input, output = inputs[i:i+1], outputs[i:i+1]

        optimizer.zero_grad()
        prediction = model(input)

        loss = criterion(
            prediction.float(),
            F.one_hot(output, num_classes=3).float()
        )

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {total_loss}")

Epoch 1, Loss: 1.8650309443473816
Epoch 2, Loss: 1.7577070593833923
Epoch 3, Loss: 1.6827844977378845
Epoch 4, Loss: 1.6224281787872314
Epoch 5, Loss: 1.569210708141327
Epoch 6, Loss: 1.5195696949958801
Epoch 7, Loss: 1.4719154834747314
Epoch 8, Loss: 1.4255220293998718
Epoch 9, Loss: 1.380059391260147
Epoch 10, Loss: 1.3353986144065857
Epoch 11, Loss: 1.2915146052837372
Epoch 12, Loss: 1.248432219028473
Epoch 13, Loss: 1.2061977088451385
Epoch 14, Loss: 1.1648686528205872
Epoch 15, Loss: 1.1245061755180359
Epoch 16, Loss: 1.0851654708385468
Epoch 17, Loss: 1.0468702912330627
Epoch 18, Loss: 1.0095640420913696
Epoch 19, Loss: 0.973041832447052
Epoch 20, Loss: 0.9369140565395355
Epoch 21, Loss: 0.9006874263286591
Epoch 22, Loss: 0.8640312254428864
Epoch 23, Loss: 0.8271685838699341
Epoch 24, Loss: 0.7911084294319153
Epoch 25, Loss: 0.7572945356369019
Epoch 26, Loss: 0.7266700863838196
Epoch 27, Loss: 0.699016660451889
Epoch 28, Loss: 0.6733288466930389
Epoch 29, Loss: 0.6487050354480743

In [None]:
# Pass the sequence through the model
with torch.no_grad():
    predictions = model(inputs)
    print(predictions.size())
    predictions = torch.argmax(predictions, dim=1)
    print(predictions.size())

torch.Size([2, 3])
torch.Size([2])


In [None]:
predictions.tolist()

[1, 2]

In [None]:
# Accuracy after training
acc = (torch.sum(predictions == outputs) / predictions.numel()).item()
print(f"Accuracy: ", acc)

Accuracy:  1.0
