# Problem: Implement an LSTM Model

### Problem Statement

You are tasked with implementing a simple **LSTM (Long Short-Term Memory)** model in PyTorch. The model should process sequential data using an LSTM layer followed by a fully connected (FC) layer. Your goal is two-fold: one is to implement a LSTM layer from scratch and another using inbuilt pytorch LSTM layer. Compare the results implementing the forward passes for both the LSTM models.

### Requirements

1. **Define the LSTM Model using Custom LSTM layer**:

   - Add a `Custom` LSTM layer to the model. The layer must take care of the hidden and cell states
   - Add a **fully connected (FC) layer** that maps the output of the LSTM to the final predictions.
   - Implement the `forward` method to:
     - Pass the input sequence through the LSTM.
     - Feed the output of the LSTM into the fully connected layer for the final output.

2. **Define the LSTM Model using in-built LSTM layer**:

- Same as `1` with only difference that this time define the LSTM layer using pytorch `nn.Module`

### Constraints

- The LSTM layer should be implemented with a single hidden layer.
- Use a suitable number of input features, hidden units, and output size for the task.
- Make sure the `forward` method returns the output of the fully connected layer after processing the LSTM output.

<details>
  <summary>💡 Hint</summary>
  Add the LSTM layer and FC layer in LSTMModel.__init__.
  <br>
  Implement the forward pass to process sequences using the LSTM and FC layers.
  <br> Review Hidden and cell states computation here: [D2l.ai](https://d2l.ai/chapter_recurrent-modern/lstm.html)
</details>


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
# Generate synthetic sequential data
torch.manual_seed(42)
sequence_length = 10
num_samples = 100

# Create a sine wave dataset
X = torch.linspace(0, 4 * 3.14159, steps=num_samples).unsqueeze(1)
y = torch.sin(X)


# Prepare data for LSTM
def create_in_out_sequences(data, seq_length):
    in_seq = []
    out_seq = []
    for i in range(len(data) - seq_length):
        in_seq.append(data[i : i + seq_length])
        out_seq.append(data[i + seq_length])
    return torch.stack(in_seq), torch.stack(out_seq)


X_seq, y_seq = create_in_out_sequences(y, sequence_length)

In [6]:
X_seq.shape

torch.Size([90, 10, 1])

In [12]:
class CustomLSTMModel(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=50):
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.W_fh = nn.Parameter(torch.randn(hidden_dim, hidden_dim))
        self.W_fx = nn.Parameter(torch.randn(input_dim, hidden_dim))
        self.b_f = nn.Parameter(torch.zeros(hidden_dim))
        self.W_ih = nn.Parameter(torch.randn(hidden_dim, hidden_dim))
        self.W_ix = nn.Parameter(torch.randn(input_dim, hidden_dim))
        self.b_i = nn.Parameter(torch.zeros(hidden_dim))
        self.W_ch = nn.Parameter(torch.randn(hidden_dim, hidden_dim))
        self.W_cx = nn.Parameter(torch.randn(input_dim, hidden_dim))
        self.b_c = nn.Parameter(torch.zeros(hidden_dim))
        self.W_oh = nn.Parameter(torch.randn(hidden_dim, hidden_dim))
        self.W_ox = nn.Parameter(torch.randn(input_dim, hidden_dim))
        self.b_o = nn.Parameter(torch.zeros(hidden_dim))
        self.fc = nn.Linear(hidden_dim, 1)
        self.h = nn.Parameter(torch.randn(1, hidden_dim))
        self.c = nn.Parameter(torch.randn(1, hidden_dim))

    def forward(self, x):
        batch_size, seq_len, _ = x.shape
        c = self.c
        h = self.h
        for i in range(seq_len):
            x_t = x[:, i, :]
            f = torch.sigmoid(
                torch.matmul(x_t, self.W_fx)
                + torch.matmul(self.h, self.W_fh)
                + self.b_f
            )
            i = torch.sigmoid(
                torch.matmul(x_t, self.W_ix)
                + torch.matmul(self.h, self.W_ih)
                + self.b_i
            )
            c_d = torch.tanh(
                torch.matmul(x_t, self.W_cx)
                + torch.matmul(self.h, self.W_ch)
                + self.b_c
            )
            o = torch.sigmoid(
                torch.matmul(x_t, self.W_ox)
                + torch.matmul(self.h, self.W_oh)
                + self.b_o
            )
            c = f * c + i * c_d
            h = o * torch.tanh(c)

        return self.fc(h)


In [18]:
# Define the LSTM Model
# TODO: Add LSTM layer, forward implementation
class LSTMModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.lstm = nn.LSTM(1, 50, batch_first=True)
        self.fc = nn.Linear(50, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out


In [20]:
# Initialize the model, loss function, and optimizer
model = CustomLSTMModel()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [21]:
# Training loop
epochs = 500
for epoch in range(epochs):
    # Forward pass
    predictions = model(X_seq)
    loss = criterion(predictions, y_seq)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Log progress every 50 epochs
    if (epoch + 1) % 50 == 0:
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")

Epoch [50/500], Loss: 0.0043
Epoch [100/500], Loss: 0.0001
Epoch [150/500], Loss: 0.0000
Epoch [200/500], Loss: 0.0003
Epoch [250/500], Loss: 0.0000
Epoch [300/500], Loss: 0.0029
Epoch [350/500], Loss: 0.0001
Epoch [400/500], Loss: 0.0000
Epoch [450/500], Loss: 0.0000
Epoch [500/500], Loss: 0.0000


In [22]:
# Testing on new data
test_steps = 20  # Ensure this is greater than sequence_length
X_test = torch.linspace(4 * 3.14159, 5 * 3.14159, steps=test_steps).unsqueeze(1)
y_test = torch.sin(X_test)

# Create test input sequences
X_test_seq, _ = create_in_out_sequences(y_test, sequence_length)

with torch.no_grad():
    predictions = model(X_test_seq)
    print(f"Predictions for new sequence: {predictions.squeeze().tolist()}")


Predictions for new sequence: [1.0245596170425415, 1.002208948135376, 0.9538735747337341, 0.8793268799781799, 0.7788960337638855, 0.6545459628105164, 0.5104139447212219, 0.35212674736976624, 0.18525683879852295, 0.01429332047700882]


In [19]:
# Initialize the model, loss function, and optimizer
model = LSTMModel()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training loop
epochs = 500
for epoch in range(epochs):
    # Forward pass
    predictions = model(X_seq)
    loss = criterion(predictions, y_seq)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Log progress every 50 epochs
    if (epoch + 1) % 50 == 0:
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")

# Testing on new data
test_steps = 20  # Ensure this is greater than sequence_length
X_test = torch.linspace(4 * 3.14159, 5 * 3.14159, steps=test_steps).unsqueeze(1)
y_test = torch.sin(X_test)

# Create test input sequences
X_test_seq, _ = create_in_out_sequences(y_test, sequence_length)

with torch.no_grad():
    predictions = model(X_test_seq)
    print(f"Predictions for new sequence: {predictions.squeeze().tolist()}")


Epoch [50/500], Loss: 0.0008
Epoch [100/500], Loss: 0.0000
Epoch [150/500], Loss: 0.0000
Epoch [200/500], Loss: 0.0000
Epoch [250/500], Loss: 0.0000
Epoch [300/500], Loss: 0.0000
Epoch [350/500], Loss: 0.0000
Epoch [400/500], Loss: 0.0000
Epoch [450/500], Loss: 0.0000
Epoch [500/500], Loss: 0.0000
Predictions for new sequence: [1.0424132347106934, 1.02143394947052, 0.9727498888969421, 0.8970206379890442, 0.7955341935157776, 0.6703622937202454, 0.5246646404266357, 0.36295509338378906, 0.19100329279899597, 0.015172097831964493]
