# RNN
Implementation of a Elman RNN (the same as in PyTorch) and Jordan networks.

The goal is to train them on the same data and comapre the results.

[Paper1 - Elman](https://doi.org/10.1207/s15516709cog1402_1)

[Paper2 - Jordan](https://www.sciencedirect.com/science/article/abs/pii/S0166411597801112)

In [259]:
import torch
from torch import nn
import torch.optim as optim
from tqdm.auto import tqdm
import torch.nn.init as init  # Add this import to access the init module

In [None]:
class ElmanRNN(nn.Module):
    """
    Elman Recurrent Neural Network (ELM-RNN) is a type of recurrent neural network that includes a context memory unit. This class provides a complete implementation of the Elman RNN, including initialization, forward pass, and methods to reset the context units.

    Parameters:
    sequence_dim (int): Dimensionality of the input sequence.
    hidden_size (int): Number of units in the hidden layer.

    Attributes:
    inputs_weight (torch.Tensor): Weight matrix for input connection.
    context_weight (torch.Tensor): Weight matrix for context connection.
    output_weight (torch.Tensor): Weight matrix for output connection.
    hidden_bias (torch.Tensor): Bias vector for the hidden layer.
    input_bias (torch.Tensor): Bias vector for the input layer.
    output_bias (torch.Tensor): Bias vector for the output layer.
    context_units (torch.Tensor): Tensor to store context units, used only within a single sequence.

    Methods:
    reset_context(batch_size, hidden_size): Resets the context units before processing the next batch.
    forward(input_units): Performs the forward pass of the Elman RNN on the input sequence. Returns the output tensor.
    """    
    def __init__(self, sequence_dim, hidden_size) -> None:
        super().__init__()
        self.inputs_weight = nn.Parameter(torch.empty(sequence_dim,
                                                      hidden_size),
                                           requires_grad=True)
        self.context_weight = nn.Parameter(torch.empty(hidden_size,
                                                       hidden_size),
                                           requires_grad=True)
        self.output_weight = nn.Parameter(torch.empty(hidden_size,
                                                1),
                                    requires_grad=True)

        self.hidden_bias = nn.Parameter(torch.empty(hidden_size), requires_grad=True)
        self.input_bias = nn.Parameter(torch.empty(hidden_size), requires_grad=True)
        self.output_bias = nn.Parameter(torch.empty(1), requires_grad=True)

        self.context_units = None

        self.activation = nn.Tanh()

        # Initialize weights with Xavier (Glorot) initialization
        init.xavier_uniform_(self.inputs_weight)
        init.xavier_uniform_(self.context_weight)
        init.xavier_uniform_(self.output_weight)

        init.zeros_(self.hidden_bias)
        init.zeros_(self.input_bias)
        init.zeros_(self.output_bias)

    def reset_context(self, batch_size, hidden_size):
        """Remember reset context after each sample from batch.
        The idea is that the context is valid only for given sequence.
        """
        self.context_units = torch.zeros(batch_size, hidden_size, requires_grad=False)
        
    def forward(self, input_units):
        batch_size, _, sequence_length = input_units.size()
        hidden_size = self.hidden_bias.size(0)
        
        # Reset context units for the new batch
        self.reset_context(batch_size, hidden_size)
        
        output = []
        
        for t in range(sequence_length):
            # Extract the input at time step t
            x_t = input_units[:, :, t] # Shape: (batch_size, sequence_dim)
            
            h1 = x_t@self.inputs_weight + self.input_bias
            h2 = self.context_units@self.context_weight  + self.hidden_bias # Shape: (batch_size, hidden_size)
            hidden = self.activation(h1 + h2)
            self.context_units = hidden
            output.append(hidden)
            
        hidden_output = torch.stack(output, dim=2)
        output = nn.functional.sigmoid(torch.transpose(hidden_output, 1,2)@self.output_weight + self.output_bias)
        return torch.transpose(output,1,2)
            
batch_size = 64
sequence_length = 512
sequence_dim = 1
hidden_size = 10
 
layer = ElmanRNN(sequence_dim, hidden_size)
test_input = torch.randn(batch_size, sequence_dim, sequence_length)
print(test_input.shape)
output = layer(test_input)
print(output.shape)

torch.Size([64, 1, 512])
torch.Size([64, 1, 512])


# Prepare data

In [299]:
def generate_xor_sequence_data(batch_size, sequence_length):
    # Each sequence will have a structure of (input1, input2, output)
    seq_dim = 1  # Single dimensional input

    # Initialize the input and output data tensors
    input_data = torch.zeros(batch_size, sequence_length, seq_dim)
    output_data = torch.zeros(batch_size, sequence_length, seq_dim)

    for i in range(batch_size):
        sequence = []
        output_sequence = []

        for _ in range(sequence_length // 3):  # Divide by 3 because each (input1, input2, output) is 3 units
            input1 = torch.randint(0, 2, (1,))
            input2 = torch.randint(0, 2, (1,))
            xor_output = input1 ^ input2

            sequence.extend([input1, input2, xor_output])  # Add inputs and output to the sequence

        for t in range(1, sequence_length-1):
            output_data[i][t] = sequence[t-1] ^ sequence[t]

        # Store the input sequence
        input_data[i] = torch.tensor(sequence).view(-1, 1)  # Flatten to make it 2D


    return input_data.view(batch_size, 1, sequence_length), output_data.view(batch_size, 1, sequence_length)

# Example usage
batch_size = 64
sequence_length = 5*3  # This should be a multiple of 3 for the (input1, input2, output) structure

input_data, output_data = generate_xor_sequence_data(batch_size, sequence_length)
print("Input Data Shape:", input_data.shape)
print("Output Data Shape:", output_data.shape)
print("Sample Input Data:", input_data[0])
print("Sample Output Data:", output_data[0])

Input Data Shape: torch.Size([64, 1, 15])
Output Data Shape: torch.Size([64, 1, 15])
Sample Input Data: tensor([[1., 1., 0., 1., 0., 1., 1., 1., 0., 1., 0., 1., 1., 0., 1.]])
Sample Output Data: tensor([[0., 0., 1., 1., 1., 1., 0., 0., 1., 1., 1., 1., 0., 1., 0.]])


# train the model

In [300]:
# Training loop
num_epochs = 1000  # Number of epochs

#  model
layer = ElmanRNN(sequence_dim=1, hidden_size=16)
loss_function = nn.MSELoss()
optimizer = optim.Adam(layer.parameters(), lr=0.1)

# Generate XOR sequence data (make sure you have the generate_xor_sequence_data function)
batch_size = 64
sequence_length = 90  # This should be a multiple of 3 for the (input1, input2, output) structure
input_data, target_data = generate_xor_sequence_data(batch_size, sequence_length)

p_bar = tqdm(range(num_epochs))
for epoch in p_bar:
    optimizer.zero_grad()
    output = layer(input_data)
    loss = loss_function(output, target_data)
    loss.backward()
    optimizer.step()

    p_bar.set_description(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item()}")

  0%|          | 0/1000 [00:00<?, ?it/s]

Epoch 1000/1000, Loss: 0.007366681005805731: 100%|██████████| 1000/1000 [00:05<00:00, 181.08it/s]


In [301]:
with torch.no_grad():
    test_input = torch.tensor([[[1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0]]])
    test_output = layer(test_input)
    
    
def apply_threshold(x, threshold):
    # Apply threshold: 1 if element >= threshold, else 0
    return (x >= threshold).float()

print(test_input)
print(apply_threshold(test_output, 0.5))

tensor([[[1., 0., 1., 0., 0., 0., 1., 1., 1., 1., 0., 1., 0., 1.]]])
tensor([[[0., 1., 1., 1., 0., 0., 1., 0., 0., 0., 1., 1., 1., 1.]]])


In [302]:
layer.inputs_weight

Parameter containing:
tensor([[ 1.1417,  4.0154,  4.4737, -5.0839,  6.1151, -4.9540, -0.7877,  4.8677,
         -0.4399, -3.3194,  0.4632, -1.3784,  0.9087,  5.5866, -3.4849,  1.0458]],
       requires_grad=True)

In [303]:
layer.input_bias

Parameter containing:
tensor([ 0.4613, -1.7699,  0.4481,  0.6518,  0.9535,  0.2729, -0.9101, -0.4545,
        -1.0467,  1.5609,  1.0792,  1.2492,  1.3319, -1.3297, -0.1763, -1.3552],
       requires_grad=True)

In [304]:
layer.output_weight

Parameter containing:
tensor([[ 0.1729],
        [ 1.1632],
        [-1.2871],
        [ 4.1294],
        [-2.0377],
        [-2.0161],
        [ 1.4098],
        [ 2.0389],
        [ 0.2062],
        [ 1.4368],
        [-0.6120],
        [-0.9037],
        [-0.5836],
        [ 2.3551],
        [-0.5792],
        [ 0.5402]], requires_grad=True)