# Elman RNN
Ashok Kumar Pant | AI Solution Architect | CTO and Co-founder at Treeleaf/Anydone


**1. Introduction**
Elman RNN is a type of recurrent neural network (RNN) proposed by Jeffrey Elman in 1990. It is one of the simplest RNN architectures and is often used to model sequential data.

**2. Architecture of Elman RNN**

Elman RNN consists of three layers:

- **Input Layer $x_t$** - Takes in sequential data at each time step.
- **Hidden Layer $h_t$** - Has **recurrent connections**, meaning it receives inputs from both:
   - The **current input** $x_t$
   - The **previous hidden state** $h_{t-1}$
- **Output Layer $y_t$** - Produces the network's output at each time step.

The **key feature** of Elman RNN is the presence of **context units**, which store past information and help in sequential learning.

**3. Mathematical Formulation**

At each time step $t$:

- **Hidden state update**:
   
   $h_t = f(W_{xh} x_t + W_{hh} h_{t-1} + b_h)$

   where:
   - $W_{xh} $ = Weight matrix for input to hidden layer
   - $W_{hh} $ = Weight matrix for hidden-to-hidden recurrence
   - $b_h $ = Bias term
   - $f $ = Activation function (e.g., tanh or ReLU)

- **Output computation**:
   
   $y_t = g(W_{hy} h_t + b_y)$
   where:
   - $W_{hy} $ = Weight matrix from hidden to output
   - $b_y $ = Bias term
   - $g $ = Activation function (e.g., softmax for classification)

- **Loss function (for training using Backpropagation Through Time - BPTT)**:
  
   $L = \sum_{t=1}^{T} \mathcal{L}(y_t, \hat{y}_t)$
   where $\mathcal{L} $ is the loss function (e.g., cross-entropy for classification, mean squared error for regression).

- **Gradient computation (for weight updates using BPTT)**:

   $\frac{\partial L}{\partial W_{xh}}, \frac{\partial L}{\partial W_{hh}}, \frac{\partial L}{\partial W_{hy}}$
   These gradients are calculated by unrolling the network in time and applying the chain rule.

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim


class ElmanRNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(ElmanRNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.rnn_cell = nn.RNNCell(input_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, inputs, hidden):
        for t in range(inputs.size(1)):  # Iterate over sequence length
            hidden = self.rnn_cell(inputs[:, t, :], hidden)
        output = self.fc(hidden)
        return output, hidden

    def init_hidden(self, batch_size, device):
        return torch.zeros(batch_size, self.hidden_size, device=device)


class RNNModelTrainer:
    def __init__(self, input_size=1, hidden_size=10, output_size=1, sequence_length=5, batch_size=10, num_epochs=200,
                 lr=0.01):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.sequence_length = sequence_length
        self.batch_size = batch_size
        self.num_epochs = num_epochs
        self.lr = lr
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.model = ElmanRNNModel(input_size, hidden_size, output_size).to(self.device)
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)

    def train(self, inputs, targets):
        self.model.train() # Set the model to training mode
        inputs, targets = inputs.to(self.device), targets.to(self.device)
        dataset_size = len(inputs)
        for epoch in range(self.num_epochs):
            epoch_loss = 0.0
            for i in range(0, dataset_size, self.batch_size):
                batch_inputs = inputs[i:i + self.batch_size]
                batch_targets = targets[i:i + self.batch_size]
                batch_size = batch_inputs.shape[0]
                batch_hidden = self.model.init_hidden(batch_size, self.device)

                self.optimizer.zero_grad()
                output, _ = self.model(batch_inputs, batch_hidden)
                loss = self.criterion(output, batch_targets)
                loss.backward()
                self.optimizer.step()
                epoch_loss += loss.item()

            if epoch % 20 == 0:
                print(f'Epoch {epoch}, Loss: {epoch_loss / (dataset_size // self.batch_size):.4f}')

    def save_model(self, path='model.bin'):
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'hidden_size': self.hidden_size,
            'input_size': self.input_size,
            'output_size': self.output_size,
            'sequence_length': self.sequence_length
        }, path)
        print(f"Model saved to {path}")

    @classmethod
    def load_model(cls, path='model.bin'):
        checkpoint = torch.load(path)
        self = cls()
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.sequence_length = checkpoint['sequence_length']
        self.input_size = checkpoint['input_size']
        self.output_size = checkpoint['output_size']
        print(f"Model loaded from {path}")
        return self

    def infer(self, input_sequence):
        self.model.eval() # Set the model to evaluation mode
        with torch.no_grad():
            input_sequence = input_sequence.to(self.device)
            batch_size = input_sequence.shape[0]
            hidden = self.model.init_hidden(batch_size, self.device)
            output, _ = self.model(input_sequence, hidden)
        return output.cpu()

In [2]:
class Dataset:
    def __init__(self, eq="x**2", num_samples=1000, sequence_length=5):
        self.eq = eq
        self.num_samples = num_samples
        self.sequence_length = sequence_length
        self.x_values, self.y_values = self.generate_data()
        self.input_dim = 1
        self.output_dim = 1

    def generate_data(self):
        X = range(self.num_samples)
        Y = [eval(self.eq, {"x": x}) for x in X]
        x_values = np.array(X, dtype=np.float32).reshape(-1, 1)
        y_values = np.array(Y, dtype=np.float32).reshape(-1, 1)
        return x_values, y_values

    def prepare_data(self):
        inputs = np.array(
            [self.x_values[i:i + self.sequence_length] for i in range(len(self.x_values) - self.sequence_length)],
            dtype=np.float32)
        targets = np.array(
            [self.y_values[i + self.sequence_length] for i in range(len(self.y_values) - self.sequence_length)],
            dtype=np.float32)
        inputs = torch.tensor(inputs, dtype=torch.float32)  # Shape: (batch_size, seq_len, input_size)
        targets = torch.tensor(targets, dtype=torch.float32).reshape(-1, 1)  # Shape: (batch_size, output_size)
        return inputs, targets

    def generate_input_sequence(self, x):
        """Generate a sequence of numbers to be used as input for inference."""
        x_values = np.array([x - self.sequence_length + i + 1 for i in range(self.sequence_length)],
                            dtype=np.float32).reshape(1, self.sequence_length, self.input_dim)
        return torch.tensor(x_values, dtype=torch.float32)


In [6]:
# Explore dataset
d = Dataset(eq="2*x+1", num_samples=100, sequence_length=3)
x,y = d.prepare_data()
print("Training Data (Input -> Target):")
for i in range(min(10, len(x))):
    print(f"Input: {list(x[i])} -> Target: {list(y[i])}")

Training Data (Input -> Target):
Input: [tensor([0.]), tensor([1.]), tensor([2.])] -> Target: [tensor(7.)]
Input: [tensor([1.]), tensor([2.]), tensor([3.])] -> Target: [tensor(9.)]
Input: [tensor([2.]), tensor([3.]), tensor([4.])] -> Target: [tensor(11.)]
Input: [tensor([3.]), tensor([4.]), tensor([5.])] -> Target: [tensor(13.)]
Input: [tensor([4.]), tensor([5.]), tensor([6.])] -> Target: [tensor(15.)]
Input: [tensor([5.]), tensor([6.]), tensor([7.])] -> Target: [tensor(17.)]
Input: [tensor([6.]), tensor([7.]), tensor([8.])] -> Target: [tensor(19.)]
Input: [tensor([7.]), tensor([8.]), tensor([9.])] -> Target: [tensor(21.)]
Input: [tensor([8.]), tensor([9.]), tensor([10.])] -> Target: [tensor(23.)]
Input: [tensor([9.]), tensor([10.]), tensor([11.])] -> Target: [tensor(25.)]


In [7]:
dataset = Dataset(num_samples=1000, sequence_length=5)
model = RNNModelTrainer(input_size=dataset.input_dim, hidden_size=10, output_size=dataset.output_dim,
                 sequence_length=dataset.sequence_length, batch_size=32, num_epochs=1000, lr=0.01)
inputs, targets = dataset.prepare_data()

model.train(inputs, targets)
model.save_model('elmanrnn.bin')

Epoch 0, Loss: 230107832875.1633
Epoch 20, Loss: 230068437014.3377
Epoch 40, Loss: 230029939044.1391
Epoch 60, Loss: 229991530483.3286
Epoch 80, Loss: 229953145283.8508
Epoch 100, Loss: 229914777111.4753
Epoch 120, Loss: 229876427701.5670
Epoch 140, Loss: 229838081461.1220
Epoch 160, Loss: 229799757704.1593
Epoch 180, Loss: 229761430635.7896
Epoch 200, Loss: 229723119812.0096
Epoch 220, Loss: 229684896921.6326
Epoch 240, Loss: 229646511451.4000
Epoch 260, Loss: 229608230779.5515
Epoch 280, Loss: 229569945025.5795
Epoch 300, Loss: 229531673080.3809
Epoch 320, Loss: 229493405073.0851
Epoch 340, Loss: 229455152433.7860
Epoch 360, Loss: 229416896395.1168
Epoch 380, Loss: 229378659062.9243
Epoch 400, Loss: 229340416548.1173
Epoch 420, Loss: 229302185908.5614
Epoch 440, Loss: 229263969219.6527
Epoch 460, Loss: 229225758506.0052
Epoch 480, Loss: 229187555442.3069
Epoch 500, Loss: 229149367272.9478
Epoch 520, Loss: 229111173912.3839
Epoch 540, Loss: 229072993874.9788
Epoch 560, Loss: 229034826

In [8]:
# Inference
model = RNNModelTrainer.load_model(path='elmanrnn.bin')
dataset = Dataset(sequence_length=model.sequence_length)
test_seq = dataset.generate_input_sequence(50)
predicted_output = model.infer(test_seq)

print(f"Input Sequence: {test_seq.flatten().tolist()}")
print(f"Predicted Output: {predicted_output.item():.4f}")

Model loaded from elmanrnn.bin
Input Sequence: [46.0, 47.0, 48.0, 49.0, 50.0]
Predicted Output: 2622.9868
