# Chapter 6 Bonus: Practical RNN & LSTM Lab

This bonus notebook extends **Chapter 6 – Recurrent Neural Networks** with hands-on sequence modeling experiments.

We will focus on:

1. **Sequence Generation (Toy Text)** – character-level RNN/LSTM that generates text
2. **Sequence Prediction (Sine Wave)** – RNN/LSTM predicting the next step in a time series
3. **RNN vs LSTM on Long Sequences** – illustrating vanishing gradients and long-term dependencies
4. **Bidirectional RNN for Classification** – simple palindrome (or structured) sequence classification
5. **Hidden State Visualization (Optional)** – seeing how hidden states evolve over time

These experiments bring the abstract concepts from Chapter 6 to life and give you intuition for how recurrent networks behave on real sequence tasks.

## Setup and Imports

We will use **PyTorch** for RNNs/LSTMs and **NumPy** / **matplotlib** for data generation and visualization.

In [None]:
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

# Plot style
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 11

# Reproducibility
np.random.seed(42)
torch.manual_seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# 1. Sequence Generation (Toy Character-Level RNN)

RNNs (and LSTMs) are often used for **language modeling** – predicting the next character or word in a sequence.

In this section we:

- Take a small text corpus (e.g., a nursery rhyme or short paragraph)
- Build a **character-level vocabulary**
- Train a small LSTM to predict the next character
- Use the trained model to **generate new text** one character at a time

This demonstrates a simple many-to-many generation setup, directly connected to the sequence modeling ideas in Chapter 6.

In [None]:
# Toy text corpus (you can replace this with any short text you like)
text = (
    "Twinkle, twinkle, little star,\n"
    "How I wonder what you are!\n"
    "Up above the world so high,\n"
    "Like a diamond in the sky.\n"
)

chars = sorted(list(set(text)))
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for ch, i in stoi.items()}

vocab_size = len(chars)
print("Vocab size:", vocab_size)
print("Characters:", chars)

# Encode entire text as indices
encoded = np.array([stoi[ch] for ch in text], dtype=np.int64)

class CharDataset(Dataset):
    def __init__(self, data, seq_len=40):
        self.data = data
        self.seq_len = seq_len
    
    def __len__(self):
        return len(self.data) - self.seq_len
    
    def __getitem__(self, idx):
        x = self.data[idx:idx+self.seq_len]
        y = self.data[idx+1:idx+self.seq_len+1]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

seq_len = 40
batch_size = 32

dataset = CharDataset(encoded, seq_len=seq_len)
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


class CharLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim=32, hidden_dim=64, num_layers=1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, x, hidden=None):
        x = self.embed(x)
        out, hidden = self.lstm(x, hidden)
        logits = self.fc(out)
        return logits, hidden


model = CharLSTM(vocab_size).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()


def train_char_model(model, loader, epochs=20):
    model.train()
    losses = []
    for epoch in range(epochs):
        epoch_loss = 0.0
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            logits, _ = model(xb)
            loss = criterion(logits.view(-1, vocab_size), yb.view(-1))
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item() * xb.size(0)
        epoch_loss /= len(loader.dataset)
        losses.append(epoch_loss)
        print(f"Epoch {epoch+1}/{epochs} - loss={epoch_loss:.3f}")
    return losses

losses_char = train_char_model(model, loader, epochs=20)

plt.figure(figsize=(8,4))
plt.plot(losses_char)
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Char-level LSTM Training Loss")
plt.tight_layout()
plt.show()


def generate_text(model, start_text="Twinkle, ", length=200):
    model.eval()
    chars_out = list(start_text)
    hidden = None
    x = torch.tensor([[stoi[ch] for ch in start_text]], dtype=torch.long, device=device)
    
    with torch.no_grad():
        for i in range(length):
            logits, hidden = model(x[:, -1:].contiguous(), hidden)
            probs = F.softmax(logits[:, -1, :], dim=-1)
            idx = torch.multinomial(probs, num_samples=1).item()
            ch = itos[idx]
            chars_out.append(ch)
            x = torch.cat([x, torch.tensor([[idx]], dtype=torch.long, device=device)], dim=1)
    return "".join(chars_out)

sample = generate_text(model, start_text="Twinkle, ", length=200)
print("\nGenerated text:\n", sample)

Even with a tiny dataset and a small model, you should see that the generated text gradually starts to mimic the structure and vocabulary of the input rhyme (though it may still be noisy). This is a concrete example of **sequence generation** with RNNs/LSTMs.

# 2. Sequence Prediction: Sine Wave Forecasting

RNNs are natural models for **time series**.

Here we:

- Generate noisy sine waves
- Build input–target pairs where the model sees a window of $T$ steps and predicts the **next value**
- Train a small LSTM to perform this one-step-ahead prediction
- Plot the model’s predictions vs the true sine wave

This demonstrates how RNNs/LSTMs maintain a summary of recent history to make future predictions.

In [None]:
# Generate sine wave data

def generate_sine_data(n_samples=1000, noise_std=0.1):
    x = np.linspace(0, 40 * np.pi, n_samples)
    y = np.sin(x) + noise_std * np.random.randn(n_samples)
    return y

series = generate_sine_data()

plt.figure(figsize=(10, 3))
plt.plot(series[:400])
plt.title("Noisy Sine Wave (first 400 points)")
plt.xlabel("t")
plt.ylabel("y")
plt.tight_layout()
plt.show()


def create_sequences(series, seq_len=50):
    X, Y = [], []
    for i in range(len(series) - seq_len):
        X.append(series[i:i+seq_len])
        Y.append(series[i+seq_len])
    X = np.array(X, dtype=np.float32)
    Y = np.array(Y, dtype=np.float32)
    return X, Y

seq_len_sine = 50
X, Y = create_sequences(series, seq_len=seq_len_sine)

# Train/test split
split = int(0.8 * len(X))
X_train_s, X_test_s = X[:split], X[split:]
Y_train_s, Y_test_s = Y[:split], Y[split:]

class SineDataset(Dataset):
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return torch.tensor(self.X[idx]).unsqueeze(-1), torch.tensor(self.Y[idx])

train_ds_s = SineDataset(X_train_s, Y_train_s)
test_ds_s = SineDataset(X_test_s, Y_test_s)

train_loader_s = DataLoader(train_ds_s, batch_size=32, shuffle=True)
test_loader_s = DataLoader(test_ds_s, batch_size=32, shuffle=False)


class SineLSTM(nn.Module):
    def __init__(self, hidden_dim=32, num_layers=1):
        super().__init__()
        self.lstm = nn.LSTM(input_size=1, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
    
    def forward(self, x, hidden=None):
        out, hidden = self.lstm(x, hidden)
        out_last = out[:, -1, :]
        y_hat = self.fc(out_last)
        return y_hat


sine_model = SineLSTM(hidden_dim=32).to(device)
optimizer_s = torch.optim.Adam(sine_model.parameters(), lr=1e-3)
criterion_s = nn.MSELoss()


def train_sine(model, train_loader, test_loader, epochs=15):
    train_losses, test_losses = [], []
    for epoch in range(epochs):
        model.train()
        running = 0.0
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device).unsqueeze(-1)
            optimizer_s.zero_grad()
            y_hat = model(xb)
            loss = criterion_s(y_hat, yb)
            loss.backward()
            optimizer_s.step()
            running += loss.item() * xb.size(0)
        train_loss = running / len(train_loader.dataset)
        train_losses.append(train_loss)
        
        model.eval()
        running = 0.0
        with torch.no_grad():
            for xb, yb in test_loader:
                xb, yb = xb.to(device), yb.to(device).unsqueeze(-1)
                y_hat = model(xb)
                loss = criterion_s(y_hat, yb)
                running += loss.item() * xb.size(0)
        test_loss = running / len(test_loader.dataset)
        test_losses.append(test_loss)
        print(f"Epoch {epoch+1}/{epochs} - train_loss={train_loss:.4f}, test_loss={test_loss:.4f}")
    return train_losses, test_losses

train_losses_s, test_losses_s = train_sine(sine_model, train_loader_s, test_loader_s, epochs=15)

plt.figure(figsize=(8,4))
plt.plot(train_losses_s, label="Train loss")
plt.plot(test_losses_s, label="Test loss")
plt.xlabel("Epoch")
plt.ylabel("MSE")
plt.title("Sine Wave Prediction Loss")
plt.legend()
plt.tight_layout()
plt.show()

# Visualize predictions on a segment
sine_model.eval()
with torch.no_grad():
    xb, yb = next(iter(test_loader_s))
    xb, yb = xb.to(device), yb.to(device)
    y_hat = sine_model(xb).cpu().squeeze().numpy()

plt.figure(figsize=(10,3))
plt.plot(yb.numpy()[:100], label="True")
plt.plot(y_hat[:100], label="Predicted")
plt.title("Sine Wave: True vs Predicted (segment)")
plt.legend()
plt.tight_layout()
plt.show()

# 3. RNN vs LSTM on Long Sequences (Echo Task)

One of the key motivations for LSTMs is handling **long-term dependencies**.

We will create an **echo task**:

- Input: a sequence of length $T$ containing random bits (0 or 1)
- Target: output the **first element** of the sequence at the final time step

This requires the network to remember information from the very beginning. We will:

- Train a small **vanilla RNN** on this task
- Train a small **LSTM** of the same size
- Compare their training losses and final accuracies

We expect the LSTM to succeed much more easily, while the RNN struggles due to **vanishing gradients** over long sequences.

In [None]:
# Echo dataset

class EchoDataset(Dataset):
    def __init__(self, n_sequences=2000, seq_len=50):
        self.n_sequences = n_sequences
        self.seq_len = seq_len
        self.data = np.random.randint(0, 2, size=(n_sequences, seq_len)).astype(np.float32)
        self.targets = self.data[:, 0]  # echo of first element
    
    def __len__(self):
        return self.n_sequences
    
    def __getitem__(self, idx):
        x = self.data[idx]
        y = self.targets[idx]
        return torch.tensor(x).unsqueeze(-1), torch.tensor(y)

seq_len_echo = 50
train_ds_e = EchoDataset(n_sequences=2000, seq_len=seq_len_echo)
test_ds_e = EchoDataset(n_sequences=500, seq_len=seq_len_echo)

train_loader_e = DataLoader(train_ds_e, batch_size=64, shuffle=True)
test_loader_e = DataLoader(test_ds_e, batch_size=64, shuffle=False)


class EchoRNN(nn.Module):
    def __init__(self, hidden_dim=16):
        super().__init__()
        self.rnn = nn.RNN(input_size=1, hidden_size=hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
    
    def forward(self, x):
        out, _ = self.rnn(x)
        out_last = out[:, -1, :]
        y_hat = torch.sigmoid(self.fc(out_last))
        return y_hat


class EchoLSTM(nn.Module):
    def __init__(self, hidden_dim=16):
        super().__init__()
        self.lstm = nn.LSTM(input_size=1, hidden_size=hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
    
    def forward(self, x):
        out, _ = self.lstm(x)
        out_last = out[:, -1, :]
        y_hat = torch.sigmoid(self.fc(out_last))
        return y_hat



def train_echo(model, train_loader, test_loader, epochs=20, lr=1e-2):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCELoss()
    train_losses, test_losses, test_accs = [], [], []
    
    for epoch in range(epochs):
        model.train()
        running = 0.0
        for xb, yb in train_loader:
            xb = xb.to(device)
            yb = yb.to(device).unsqueeze(-1)
            optimizer.zero_grad()
            y_hat = model(xb)
            loss = criterion(y_hat, yb)
            loss.backward()
            optimizer.step()
            running += loss.item() * xb.size(0)
        train_loss = running / len(train_loader.dataset)
        train_losses.append(train_loss)
        
        model.eval()
        running = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for xb, yb in test_loader:
                xb = xb.to(device)
                yb = yb.to(device).unsqueeze(-1)
                y_hat = model(xb)
                loss = criterion(y_hat, yb)
                running += loss.item() * xb.size(0)
                preds = (y_hat > 0.5).float()
                correct += (preds == yb).sum().item()
                total += yb.size(0)
        test_loss = running / len(test_loader.dataset)
        test_acc = correct / total
        test_losses.append(test_loss)
        test_accs.append(test_acc)
        print(f"Epoch {epoch+1}/{epochs} - train_loss={train_loss:.3f}, test_loss={test_loss:.3f}, test_acc={test_acc:.3f}")
    
    return train_losses, test_losses, test_accs


print("\nTraining EchoRNN (vanilla RNN)...")
echo_rnn = EchoRNN(hidden_dim=16)
train_rnn_loss, test_rnn_loss, test_rnn_acc = train_echo(echo_rnn, train_loader_e, test_loader_e, epochs=20, lr=1e-2)

print("\nTraining EchoLSTM...")
echo_lstm = EchoLSTM(hidden_dim=16)
train_lstm_loss, test_lstm_loss, test_lstm_acc = train_echo(echo_lstm, train_loader_e, test_loader_e, epochs=20, lr=1e-2)

# Plot test accuracy
plt.figure(figsize=(8,4))
plt.plot(test_rnn_acc, label="RNN test acc")
plt.plot(test_lstm_acc, label="LSTM test acc")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Echo Task: RNN vs LSTM")
plt.legend()
plt.tight_layout()
plt.show()

You should see that the LSTM achieves near-perfect accuracy on this task, while the vanilla RNN often plateaus at much lower accuracy. This vividly illustrates the **long-term dependency problem** and why LSTMs (and GRUs) are so useful.

# 4. Bidirectional RNN for Palindrome Classification

Bidirectional RNNs process a sequence **from both directions**, giving the model access to past and future context.

We will build a toy classification task:

- Input: a sequence of digits (0–9) of fixed length
- Label: 1 if the sequence is a **palindrome**, 0 otherwise

We will:

- Train a simple unidirectional RNN classifier
- Train a bidirectional RNN classifier
- Compare their accuracies

This shows how using information from both ends of the sequence can improve performance on tasks that depend on the entire sequence.

In [None]:
class PalindromeDataset(Dataset):
    def __init__(self, n_samples=2000, seq_len=5):
        self.n_samples = n_samples
        self.seq_len = seq_len
        self.data, self.labels = self._generate()
    
    def _generate(self):
        data = []
        labels = []
        for _ in range(self.n_samples):
            if np.random.rand() < 0.5:
                # Palindrome
                half = np.random.randint(0, 10, size=(self.seq_len // 2,))
                if self.seq_len % 2 == 1:
                    middle = np.random.randint(0, 10, size=(1,))
                    seq = np.concatenate([half, middle, half[::-1]])
                else:
                    seq = np.concatenate([half, half[::-1]])
                label = 1
            else:
                # Non-palindrome
                seq = np.random.randint(0, 10, size=(self.seq_len,))
                # Ensure it's not accidentally a palindrome
                if np.all(seq == seq[::-1]):
                    seq[0] = (seq[0] + 1) % 10
                label = 0
            data.append(seq)
            labels.append(label)
        return np.array(data, dtype=np.int64), np.array(labels, dtype=np.int64)
    
    def __len__(self):
        return self.n_samples
    
    def __getitem__(self, idx):
        return torch.tensor(self.data[idx]), torch.tensor(self.labels[idx])

seq_len_pal = 7
train_ds_p = PalindromeDataset(n_samples=3000, seq_len=seq_len_pal)
test_ds_p = PalindromeDataset(n_samples=1000, seq_len=seq_len_pal)

train_loader_p = DataLoader(train_ds_p, batch_size=64, shuffle=True)
test_loader_p = DataLoader(test_ds_p, batch_size=64, shuffle=False)


class UniRNNClassifier(nn.Module):
    def __init__(self, vocab_size=10, embed_dim=8, hidden_dim=16):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim)
        self.rnn = nn.RNN(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
    
    def forward(self, x):
        x = self.embed(x)
        out, _ = self.rnn(x)
        out_last = out[:, -1, :]
        y_hat = torch.sigmoid(self.fc(out_last))
        return y_hat


class BiRNNClassifier(nn.Module):
    def __init__(self, vocab_size=10, embed_dim=8, hidden_dim=16):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim)
        self.rnn = nn.RNN(embed_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, 1)
    
    def forward(self, x):
        x = self.embed(x)
        out, _ = self.rnn(x)
        out_last = out[:, -1, :]  # concat of forward/backward hidden states
        y_hat = torch.sigmoid(self.fc(out_last))
        return y_hat


def train_palindrome(model, train_loader, test_loader, epochs=15, lr=1e-2):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCELoss()
    train_accs, test_accs = [], []
    
    for epoch in range(epochs):
        model.train()
        correct = 0
        total = 0
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device).unsqueeze(-1).float()
            optimizer.zero_grad()
            y_hat = model(xb)
            loss = criterion(y_hat, yb)
            loss.backward()
            optimizer.step()
            preds = (y_hat > 0.5).float()
            correct += (preds == yb).sum().item()
            total += yb.size(0)
        train_acc = correct / total
        train_accs.append(train_acc)
        
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for xb, yb in test_loader:
                xb, yb = xb.to(device), yb.to(device).unsqueeze(-1).float()
                y_hat = model(xb)
                preds = (y_hat > 0.5).float()
                correct += (preds == yb).sum().item()
                total += yb.size(0)
        test_acc = correct / total
        test_accs.append(test_acc)
        print(f"Epoch {epoch+1}/{epochs} - train_acc={train_acc:.3f}, test_acc={test_acc:.3f}")
    return train_accs, test_accs


print("\nTraining unidirectional RNN classifier...")
uni_model = UniRNNClassifier()
train_uni_acc, test_uni_acc = train_palindrome(uni_model, train_loader_p, test_loader_p, epochs=15, lr=1e-2)

print("\nTraining bidirectional RNN classifier...")
bi_model = BiRNNClassifier()
train_bi_acc, test_bi_acc = train_palindrome(bi_model, train_loader_p, test_loader_p, epochs=15, lr=1e-2)

plt.figure(figsize=(8,4))
plt.plot(test_uni_acc, label="Unidirectional test acc")
plt.plot(test_bi_acc, label="Bidirectional test acc")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Palindrome Classification: Uni vs Bi RNN")
plt.legend()
plt.tight_layout()
plt.show()

In many runs, the bidirectional RNN should achieve higher and more stable accuracy, because it can use information from **both ends** of the sequence, which is especially helpful for palindrome detection.

# 5. Hidden State Visualization (Optional)

Finally, we visualize how hidden states **evolve over time** in the sine-wave LSTM.

We will:

- Take a trained `SineLSTM`
- Feed a sine segment through the network
- Collect hidden states at each time step
- Use PCA to project hidden states to 2D
- Plot the trajectory of hidden states over time

This gives some geometric intuition for how the RNN’s internal state changes as it processes a sequence.

In [None]:
from sklearn.decomposition import PCA

# Take a single long sequence segment from the sine series
segment = series[split:split+seq_len_sine+100]  # some part from test region
x_segment, _ = create_sequences(segment, seq_len=seq_len_sine)

x0 = torch.tensor(x_segment[0]).unsqueeze(0).unsqueeze(-1).to(device)  # (1,T,1)

sine_model.eval()
hidden_states = []
with torch.no_grad():
    out, (h_n, c_n) = sine_model.lstm(x0)
    # out: (1,T,hidden_dim)
    hs = out.squeeze(0).cpu().numpy()  # (T,hidden_dim)
    hidden_states = hs

# PCA to 2D
pca = PCA(n_components=2)
hs_2d = pca.fit_transform(hidden_states)

plt.figure(figsize=(6,5))
plt.scatter(hs_2d[:,0], hs_2d[:,1], c=np.arange(len(hs_2d)), cmap="viridis")
for i in range(0, len(hs_2d), 5):
    plt.text(hs_2d[i,0], hs_2d[i,1], str(i), fontsize=8)
plt.colorbar(label="Time step")
plt.title("Hidden State Trajectory (SineLSTM)")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.tight_layout()
plt.show()

You can see the hidden state moving along a smooth trajectory as it tracks the phase of the sine wave. Different regions of the trajectory roughly correspond to different phases of the underlying periodic signal.

## Summary

In this RNN/LSTM lab we:

- Built a **character-level LSTM** to generate text from a tiny corpus.
- Trained an LSTM to **predict the next value** in a noisy sine wave.
- Compared a vanilla RNN vs an LSTM on a synthetic **long-term dependency** task (echo), showing why gating helps.
- Used a **bidirectional RNN** for a toy palindrome classification task and saw the benefit of looking both forward and backward.
- Visualized the **hidden state trajectory** of an LSTM over time using PCA.

These experiments complement Chapter 6 by giving concrete, runnable examples of sequence generation, prediction, long-term dependency handling, bidirectionality, and internal state dynamics.