# **Ágora - RNNs & LSTMs**


---



This notebook is divided in two parts.
First one:

* Introduction to sequential data and Recurrent Neural Networks (RNNs)
* Implementing RNNs using PyTorch
* Applications: Text classification and time series prediction.

Second one:
- Understanding LSTM networks for long-term dependencies.
- Implementing LSTM in PyTorch for sequence modeling.
- Use cases: Sentiment analysis and language modeling.

## **1. RNNs implementation**

First of all, we are going to define an RNN in PyTorch. We'll use `nn.RNN` to create an RNN layer, then we'll add a last, fully-connected layer to get the output size that we want. An RNN takes in a number of parameters:

* **input_size** - the size of the input `(seq_len, batch, input_size)`

* **hidden_size** - the number of features in the RNN output and in the hidden state. `(n_layers * num_directions, batch, input_size)` where num_layers is the number of stacked RNNs. `num_directions = 2` for bidirectional RNNs and 1 otherwise.
* **n_layers** - the number of layers that make up the RNN, typically 1-3; greater than 1 means that you'll create a stacked RNN
* **batch_first** - whether or not the input/output of the RNN will have the batch_size as the first dimension `(batch_size, seq_length, hidden_size)`

In the output:
* **out** is the output of the RNN from all timesteps from the last RNN layer. It is of the size `(seq_len, batch, num_directions * hidden_size)`.

* **h_n** is the hidden value from the last time-step of all RNN layers. It is of the size `(num_layers * num_directions, batch, hidden_size)`.

Take a look at the RNN [documentation](https://pytorch.org/docs/stable/nn.html#rnn) to read more about recurrent layers.

We will start by implementing a basic RNN in PyTorch for text classification. Let's use a small text dataset for binary classification.

In [2]:
# Importing the necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import numpy as np

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

We will create a small custom dataset for binary text classification, and check one batch tensor to verify its dimensions and padding correctness.

In [5]:
import torch
from torch.nn.utils.rnn import pad_sequence

class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels
        self.vocab = set(' '.join(texts).split())  # Simple word-level vocab
        self.word2idx = {word: idx for idx, word in enumerate(self.vocab)} #dictionary mapping each word to a unique integer.

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx].split()
        label = self.labels[idx]
        encoded_text = [self.word2idx[word] for word in text]
        return torch.tensor(encoded_text), torch.tensor(label)

# Example data
texts = [
    'The sky is blue',
    'The sun is bright',
    'Aliens have landed on Earth',
    'The moon is made of cheese',
    'NASA launches a new satellite',
    'Breaking news: The world is flat!',
]
labels = [0, 0, 1, 1, 0, 1]  # 0 = Real, 1 = Fake

# Create dataset and DataLoader
dataset = TextDataset(texts, labels)

# Custom collate function to pad sequences
def collate_fn(batch):
    texts, labels = zip(*batch)
    texts_padded = pad_sequence(texts, batch_first=True)
    return texts_padded, torch.tensor(labels)

# Create DataLoader with custom collate function
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

# Verify batch data (just to test)
for inputs, labels in dataloader:
    print(f'Input batch shape: {inputs.shape}')
    print(f'Label batch: {labels}')
    break


Input batch shape: torch.Size([2, 6])
Label batch: tensor([0, 1])


### Define RNN model

We will now define an RNN model for text classification, distinguising between real and fake news.

* **Embedding layer:** turns word indices into dense
vectors.
* **RNN layer:** processes the sequence and captures temporal patterns.
* **Fully connected layer:** classifies based on the final hidden state.
* Uses `log_softmax` for output (log probabilities for each class).


In [116]:
class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, output_size):
        super(RNNClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.RNN(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.embedding(x)
        output, hidden = self.rnn(x)
        out = self.fc(hidden[-1])  # Use the last hidden state
        return F.log_softmax(out, dim=1)

# Hyperparameters
vocab_size = len(dataset.vocab) #is the number of unique words in your dataset
embed_size = 10 #size of each word vector (input size)
hidden_size = 20
output_size = 2  # Binary classification

# Initialize model
model = RNNClassifier(vocab_size, embed_size, hidden_size, output_size).to(device)
print(model)


RNNClassifier(
  (embedding): Embedding(24, 10)
  (rnn): RNN(10, 20, batch_first=True)
  (fc): Linear(in_features=20, out_features=2, bias=True)
)


### Training the model

For each batch:
* Performs a forward pass to generate predictions.
* Computes the loss using a criterion like + negative log likelihood or cross-entropy.
* Uses **BPTT (Backpropagation Through Time)** to compute gradients.
* Updates model parameters with gradient descent.
* Logs the average loss per epoch to monitor convergence.

In [117]:
# Loss function and optimizer
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
losses = []

# Training loop
epochs = 100  # Number of times the model will see the full training data
for epoch in range(epochs):
    total_loss = 0  # To track cumulative loss per epoch

    # Iterate through batches of the DataLoader
    for inputs, labels in dataloader:
        # Move input and labels to the GPU or CPU as per availability
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()  # Clear previous gradients
        outputs = model(inputs)  # Forward pass through the RNN model
        loss = criterion(outputs, labels)  # Compute loss between predictions and ground truth
        loss.backward()  # Backpropagate error (Backpropagation Through Time)
        optimizer.step()  # Update model parameters using gradients

        total_loss += loss.item()  # Accumulate batch loss
        losses.append(total_loss / len(dataloader))

    # Print average loss for the epoch
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(dataloader):.4f}')


Epoch [1/100], Loss: 0.8714
Epoch [2/100], Loss: 0.6872
Epoch [3/100], Loss: 0.8098
Epoch [4/100], Loss: 0.7540
Epoch [5/100], Loss: 0.7339
Epoch [6/100], Loss: 0.6482
Epoch [7/100], Loss: 0.7041
Epoch [8/100], Loss: 0.7179
Epoch [9/100], Loss: 0.6612
Epoch [10/100], Loss: 0.6681
Epoch [11/100], Loss: 0.6458
Epoch [12/100], Loss: 0.6306
Epoch [13/100], Loss: 0.6328
Epoch [14/100], Loss: 0.6307
Epoch [15/100], Loss: 0.6096
Epoch [16/100], Loss: 0.5998
Epoch [17/100], Loss: 0.5876
Epoch [18/100], Loss: 0.5550
Epoch [19/100], Loss: 0.5504
Epoch [20/100], Loss: 0.5575
Epoch [21/100], Loss: 0.5257
Epoch [22/100], Loss: 0.5283
Epoch [23/100], Loss: 0.5048
Epoch [24/100], Loss: 0.4834
Epoch [25/100], Loss: 0.5153
Epoch [26/100], Loss: 0.4553
Epoch [27/100], Loss: 0.4494
Epoch [28/100], Loss: 0.4713
Epoch [29/100], Loss: 0.4166
Epoch [30/100], Loss: 0.4352
Epoch [31/100], Loss: 0.4188
Epoch [32/100], Loss: 0.3679
Epoch [33/100], Loss: 0.3542
Epoch [34/100], Loss: 0.4082
Epoch [35/100], Loss: 0

### Testing the Prediction

Now, let's test the model on new text data to see whether it can correctly predict if a given statement is real or fake. The model will tokenize and encode the input text, process it through the network, and output a prediction based on the highest probability.

In [119]:
def predict_text(model, sentence, word2idx):
    model.eval()
    with torch.no_grad():
        words = sentence.split()
        encoded = [word2idx[word] for word in words if word in word2idx]

        if not encoded:
            return "Unknown input"

        input_tensor = torch.tensor(encoded).unsqueeze(0).to(device)  # shape: (1, seq_len)
        output = model(input_tensor)
        pred_label = torch.argmax(output, dim=1).item()

        return "Fake" if pred_label == 1 else "Real"

# Example usage
test_sentences = [
    "Aliens invade Earth again",
    "NASA discovers a new planet",
    "The world is flat according to new study",
    "Bright sun shines over the mountains"
]

for sentence in test_sentences:
    prediction = predict_text(model, sentence, dataset.word2idx)
    print(f"'{sentence}' => {prediction}")


'Aliens invade Earth again' => Fake
'NASA discovers a new planet' => Real
'The world is flat according to new study' => Fake
'Bright sun shines over the mountains' => Real


## **2. Long Short-Term Memory (LSTM)**

We will implement an LSTM network in PyTorch for a text classification task (sentiment analysis).

We will follow the same steps as in the previous section, first load the dataset on movie reviews (0= negative, 1= positive), and then, define the model.

In [130]:
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels
        self.vocab = set(' '.join(texts).split())  # Simple word-level vocab
        self.word2idx = {word: idx for idx, word in enumerate(self.vocab)}

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx].split()
        label = self.labels[idx]
        encoded_text = [self.word2idx[word] for word in text]
        return torch.tensor(encoded_text), torch.tensor(label)

# Example movie reviews
texts = [
    'This movie is fantastic',
    'I hated this movie',
    'What a great film',
    'Terrible acting and boring plot',
    'Best movie I have seen this year',
    'Waste of time',
]
labels = [1, 0, 1, 0, 1, 0]  # 1 = Positive, 0 = Negative

# Create dataset and DataLoader
dataset = TextDataset(texts, labels)

# Custom collate function to pad sequences
def collate_fn(batch):
    texts, labels = zip(*batch)
    texts_padded = pad_sequence(texts, batch_first=True)
    return texts_padded, torch.tensor(labels)

dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

In [131]:
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, output_size, num_layers=1):
        super(LSTMClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.embedding(x)
        output, (hidden, cell) = self.lstm(x)
        out = self.fc(hidden[-1])  # Use the last hidden state
        return out

# Hyperparameters
vocab_size = len(dataset.vocab)
embed_size = 10
hidden_size = 20
output_size = 2  # Binary classification (Positive/Negative)

# Initialize the model
model = LSTMClassifier(vocab_size, embed_size, hidden_size, output_size).to(device)
print(model)

LSTMClassifier(
  (embedding): Embedding(23, 10)
  (lstm): LSTM(10, 20, batch_first=True)
  (fc): Linear(in_features=20, out_features=2, bias=True)
)


### Training the model

In [132]:
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 100
for epoch in range(epochs):
    total_loss = 0
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(dataloader):.4f}')


Epoch [1/100], Loss: 0.7091
Epoch [2/100], Loss: 0.6761
Epoch [3/100], Loss: 0.6550
Epoch [4/100], Loss: 0.6733
Epoch [5/100], Loss: 0.6494
Epoch [6/100], Loss: 0.6970
Epoch [7/100], Loss: 0.6904
Epoch [8/100], Loss: 0.6892
Epoch [9/100], Loss: 0.6577
Epoch [10/100], Loss: 0.6614
Epoch [11/100], Loss: 0.6471
Epoch [12/100], Loss: 0.6517
Epoch [13/100], Loss: 0.6256
Epoch [14/100], Loss: 0.6432
Epoch [15/100], Loss: 0.6345
Epoch [16/100], Loss: 0.6426
Epoch [17/100], Loss: 0.6073
Epoch [18/100], Loss: 0.5992
Epoch [19/100], Loss: 0.6219
Epoch [20/100], Loss: 0.5877
Epoch [21/100], Loss: 0.5958
Epoch [22/100], Loss: 0.6065
Epoch [23/100], Loss: 0.5940
Epoch [24/100], Loss: 0.5720
Epoch [25/100], Loss: 0.5799
Epoch [26/100], Loss: 0.5742
Epoch [27/100], Loss: 0.5472
Epoch [28/100], Loss: 0.5359
Epoch [29/100], Loss: 0.5265
Epoch [30/100], Loss: 0.5170
Epoch [31/100], Loss: 0.5007
Epoch [32/100], Loss: 0.4904
Epoch [33/100], Loss: 0.5034
Epoch [34/100], Loss: 0.4652
Epoch [35/100], Loss: 0

### Evaluation of the model

We are going to evaluate the performance of the model for two different tasks:
1. Prediction of the next word, taking into account the sentence context.
2. Sentiment analysis of sentences.



In [142]:
# Example: Language modeling (next word prediction)

def predict_next_word(model, input_sequence, vocab, word2idx, idx2word):
    model.eval()
    with torch.no_grad():
        input_tensor = torch.tensor([word2idx[word] for word in input_sequence.split()], device=device).unsqueeze(0)
        output = model(input_tensor)
        predicted_idx = torch.argmax(output, dim=1).item()
        return idx2word[predicted_idx]

# Inverse mapping for word2idx
idx2word = {idx: word for word, idx in dataset.word2idx.items()}

# Test next word prediction
input_seq = 'Best movie I have'
predicted_word = predict_next_word(model, input_seq, dataset.vocab, dataset.word2idx, idx2word)
print(f'Next word prediction: {predicted_word}')

Next word prediction: seen


In [140]:
def predict_sentiment(model, text, word2idx):
    model.eval()
    with torch.no_grad():
        words = text.split()
        encoded = [word2idx[word] for word in words if word in word2idx]
        if not encoded:
            return "Unknown input"

        input_tensor = torch.tensor(encoded).unsqueeze(0).to(device)  # (1, seq_len)
        output = model(input_tensor)
        predicted_label = torch.argmax(output, dim=1).item()

        return "Positive" if predicted_label == 1 else "Negative"

test_sentences = [
    "Amazing performance and great story",
    "I regret watching this movie",
    "Not bad, could be better",
    "Horrible direction and poor acting"
]

for sentence in test_sentences:
    sentiment = predict_sentiment(model, sentence, dataset.word2idx)
    print(f"'{sentence}' => {sentiment}")

'Amazing performance and great story' => Positive
'I regret watching this movie' => Negative
'Not bad, could be better' => Unknown input
'Horrible direction and poor acting' => Negative
