# Challenge 5 - RNNs

Welcome to challenge #5!

In this challenge, you will implement a simple RNN classifier using an LSTM cell for sentiment analysis on the YelpReviewPolarity dataset (a binary sentiment classification dataset).

Your model should include:
- An **Embedding** layer (with a specified `padding_idx`)
- An **LSTM** layer (with `batch_first=True`)
- A **Fully Connected (fc)** layer to map the final hidden state to the output
- A **Sigmoid** activation to produce a probability between 0 and 1

Your tasks are:

1. **Data Preprocessing & DataLoader Setup** (2 points):  
   Import the YelpReviewPolarity dataset, tokenize the text, build a vocabulary, and create a DataLoader to supply batches to your model.

2. **RNNClassifier** (2 points):  
   Implement a class that builds and returns the RNN classifier model using the parameters provided.

3. **Training and evaluating the model** (2 points):  
   Implement a function that takes your model and trains it over a number of epochs and then tests it with sample yelp reviews.

4. **Q&A Section** (3 points total, 1 point each):  
   Answer three questions (in markdown) about your implementation and key concepts.

When you are finished, the provided pytest tests at the end of this notebook will automatically evaluate your code.


## Important Note on Environment:
This challenge will <span style="color: red">not work properly on Codespaces</span> because of the lack of GPU and pytorch support (at least I haven't figured out the setup yet). So for this challenge, you will open your repository's notebook in Google Collab and continue. You can do so by clicking the "Open in Collab" badge below and opening this notebook from your repository there.

<a href="https://colab.research.google.com/" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

After completing the code part, please make sure to run the TESTING section of this notebook for the test cases. Again, the pytest on Github will fail. I shall manually enter your grades without the help of the autograder for this challenge referring to the test section at the end.

## Imports and setup

In [73]:
# # Downloading the yelp review dataset
# !wget "https://drive.usercontent.google.com/download?id=0Bz8a_Dbh9QhbNUpYQ2N3SGlFaDg&export=download&authuser=0&confirm=t&uuid=08839d6e-0170-44f8-a1c1-2f829c484617&at=AIrpjvOJpeXNKY4yGqP9mw6bXpQS:1739966900676" -O yelp_review_polarity_csv.tar

# # Extracting the dataset
# !tar -xvf yelp_review_polarity_csv.tar

# # Downloading python package dependencies
# !pip install torchdata==0.6.1 torchtext portalocker==2.7.0

In [74]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from torchtext.datasets import YelpReviewPolarity
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

import csv

# Set random seed for reproducibility.
torch.manual_seed(42)

<torch._C.Generator at 0x7ecbf07663f0>

## Task 1: Data Preprocessing (2 points)

Note: the YelpReviewPolarity dataset label includes
- 1 : Negative polarity.
- 2 : Positive polarity.

Refer the [pytorch docs](https://pytorch.org/text/0.8.1/datasets.html#yelpreviewpolarity) for the dataset for more info.

The below code cell downloads the dataset and loads it for you.

In [75]:
def load_local_yelp_list(train_csv_path, test_csv_path, has_header=True, sample_size=50000):
    """
    Reads the local train.csv and test.csv for Yelp Review Polarity
    and returns two lists: train_list, test_list,
    where each element is (label, text).
    label is int (1 or 2), text is the review string.
    """
    train_list = []
    with open(train_csv_path, 'r', encoding='utf-8') as f:
        reader = csv.reader(f)
        if has_header:
            next(reader, None)  # skip the header row
        for row in reader:
            if len(train_list) >= sample_size:
                break
            label_str, text = row
            label = int(label_str)
            train_list.append((label, text))

    test_list = []
    with open(test_csv_path, 'r', encoding='utf-8') as f:
        reader = csv.reader(f)
        if has_header:
            next(reader, None)
        for row in reader:
            label_str, text = row
            label = int(label_str)
            test_list.append((label, text))

    return train_list, test_list

train_list, test_list = load_local_yelp_list('./yelp_review_polarity_csv/train.csv', './yelp_review_polarity_csv/test.csv', has_header=False)
print(f"Number of training examples: {len(train_list)}")
print(f"Number of testing examples: {len(test_list)}")

Number of training examples: 50000
Number of testing examples: 38000


In [76]:

# TODO: Define a tokenizer using torchtext's basic_english tokenizer.
tokenizer = get_tokenizer('basic_english')

# TODO: Write a function 'yield_tokens' that takes the dataset iterator and yields tokens.
def yield_tokens(data_iter):
    # Replace with your code: iterate over data_iter and yield tokens for each text.
    for label, text in data_iter:
      yield tokenizer(text)
    pass

# TODO: Print the first datapoint from your train list
print(train_list[0])


# TODO: Build the vocabulary using 'build_vocab_from_iterator' with special tokens '<unk>' and '<pad>'.
# Set the default index for unknown tokens to the index of '<unk>'.
specials = ['<unk>', '<pad>']
vocab = build_vocab_from_iterator(yield_tokens(train_list), specials=specials, special_first=True)
vocab.set_default_index(vocab['<unk>'])

# TODO: Define a text pipeline function that converts a text string into a list of token IDs using the vocabulary.
def text_pipeline(text):
    return [vocab[token] for token in tokenizer(text)]

# TODO: Define a label pipeline function that converts the label (as a string) into an integer (e.g., 1 for positive, 0 for negative).
def label_pipeline(label):
    return 1 if label == 2 else 0

(1, "Unfortunately, the frustration of being Dr. Goldberg's patient is a repeat of the experience I've had with so many other doctors in NYC -- good doctor, terrible staff.  It seems that his staff simply never answers the phone.  It usually takes 2 hours of repeated calling to get an answer.  Who has time for that or wants to deal with it?  I have run into this problem with many other doctors and I just don't get it.  You have office workers, you have patients with medical needs, why isn't anyone answering the phone?  It's incomprehensible and not work the aggravation.  It's with regret that I feel that I have to give Dr. Goldberg 2 stars.")


In [77]:
# -------------------------
# Create a custom Dataset and DataLoader
# -------------------------
class YelpDataset(Dataset):
    def __init__(self, data_iter):
        # TODO: Store the data and process it if necessary.
        self.data = data_iter

    def __len__(self):
        # TODO: Return the total number of examples.
        return len(self.data)

    def __getitem__(self, idx):
        # TODO: Return the processed (text_tensor, label) tuple for index idx.
        processed_text = text_pipeline(self.data[idx][1])
        processed_label = label_pipeline(self.data[idx][0])
        return torch.tensor(processed_text), torch.tensor(processed_label)

# TODO: Create a collate function that pads sequences in a batch.
def collate_batch(batch):
    text_list, label_list = zip(*batch)
    text_list = pad_sequence(text_list, batch_first=True, padding_value=vocab['<pad>'])
    labels = torch.tensor(label_list)
    return text_list, labels

# TODO: Create a DataLoader for the training data.
batch_size = 32
train_dataset = YelpDataset(train_list)  # Replace with your code to create an instance of YelpDataset.
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_batch) # Replace with your code to create a DataLoader using collate_batch.

## Task 2: Build the RNN Classifier (2 points)

In [78]:
# TODO: Implement the RNNClassifier class with no implementation provided.
class RNNClassifier(nn.Module):
    def __init__(self, vocab_size: int, embed_dim: int, hidden_dim: int, output_dim: int, num_layers: int, padding_idx: int):
        super(RNNClassifier, self).__init__()
        # TODO: Create an Embedding layer.
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=vocab[''])
        # TODO: Create an LSTM layer (batch_first=True).
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        # TODO: Create a fully connected layer mapping hidden_dim to output_dim.
        self.fc = nn.Linear(hidden_dim, output_dim)
        # TODO: Create a Sigmoid activation.
        self.sigmoid = nn.Sigmoid()

    def forward(self, text):
        embedded = self.embedding(text)
        output, (hidden, cell) = self.lstm(embedded)
        hidden_last = hidden[-1]
        logits = self.fc(hidden_last)
        return self.sigmoid(logits).squeeze()

## Task 3: Training and evaluating your model (2 points)

In [79]:
def train_and_evaluate():
    # TODO: Define the device (e.g., "cuda")
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Use the vocabulary built in the Data Preprocessing section.
    vocab_size = len(vocab)
    embed_dim = 100        # You can choose a value (e.g., 100)
    hidden_dim = 128       # You can choose a value (e.g., 128)
    output_dim = 1         # For binary classification, output dimension is 1
    num_layers = 1         # Number of LSTM layers, e.g., 1
    padding_idx = vocab['<pad>']

    # TODO: Build your RNN classifier using your RNNClassifier class.
    model = RNNClassifier(vocab_size, embed_dim, hidden_dim, output_dim, num_layers, padding_idx)
    model = model.to(device)

    # TODO: Define your loss function
    criterion = nn.BCELoss()

    # TODO: Define your optimizer with a chosen learning rate.
    optimizer = optim.Adam(model.parameters(), lr=.0001)

    # Set the number of epochs for training.
    num_epochs = 10  # You can adjust the number of epochs.

    for epoch in range(num_epochs):
        # TODO: Set the model to training mode.
        model.train()

        epoch_loss = 0.0  # Initialize epoch loss.

        # Iterate over batches in your training DataLoader.
        for batch_idx, (text_batch, label_batch) in enumerate(train_loader):
            # TODO: Move text_batch and label_batch to the defined device.
            text_batch, label_batch = text_batch.to(device), label_batch.float().to(device)

            # TODO: Zero out the gradients.
            optimizer.zero_grad()

            # TODO: Perform a forward pass: obtain predictions from the model.
            predictions = model(text_batch)

            # TODO: Compute the loss using your criterion.
            loss = criterion(predictions, label_batch)

            # TODO: Perform the backward pass to compute gradients.
            loss.backward()


            # TODO: Update the model parameters.
            optimizer.step()


            # TODO: Accumulate the loss for reporting.
            epoch_loss += loss.item()

        # Compute the average loss for the epoch and print it.
        avg_loss = epoch_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

    # -------------------------
    # Evaluation on Sample Yelp Reviews
    # -------------------------

    # Define some sample Yelp reviews for evaluation.
    sample_reviews = [
        "The food was amazing and the service was excellent!",
        "I did not enjoy my visit at all. The experience was terrible.",
        "The ambiance was pleasant but the food was just okay.",
        "Absolutely loved the place! Will come back again."
    ]

    # TODO: Set the model to evaluation mode.

    predictions = []  # Store the predictions for the sample reviews.
    print("Evaluation on Sample Yelp Reviews:")
    for review in sample_reviews:
        with torch.no_grad():
            # TODO: Convert the review text to a tensor using your text_pipeline function.
            # Convert text to tensor and add batch dimension.
            text_tensor = torch.tensor(text_pipeline(review)).unsqueeze(0).to(device)
            prediction = model(text_tensor)
            # Return sentiment label based on threshold 0.5.
            sentiment = "Positive" if prediction.item() >= 0.5 else "Negative"

            # Print the review and its predicted sentiment along with the prediction score.
            print(f"Review: {review}\nPredicted Sentiment: {sentiment} (Score: {prediction.item():.4f})\n")

            # TODO: Append the prediction score (as a float) to the predictions list.
            predictions.append(prediction.item())

    # Return a dictionary with keys "avg_loss" and "predictions".
    return {"avg_loss": avg_loss, "predictions": predictions}

## Task 4: Q&A Section (3 points)

Please answer the questions below in brief. Each carries 1 point. This section is also open-book i.e., you can refer documentation to inform your response.


**Question 1:**  
What is the purpose of specifying a `padding_idx` in the Embedding layer, and how does it affect the model's training and output?

<font color='red'>*Your Answer:* </font>

> *(Type your answer here)*

**Question 2:**  
Why do we use the final hidden state from the LSTM for classification? How does this hidden state encapsulate the overall information of the input sequence in the context of sentiment analysis?

<font color='red'>*Your Answer:* </font>

> *(Type your answer here)*

**Question 3:**  
What is the role of the Sigmoid activation in this binary classification model? How might this change if you were working on a multi-class classification task? (think of other activation functions)

<font color='red'>*Your Answer:* </font>

It maps the output from a probability (range between 0 and 1) to a binary classification (0 or 1). This allows the model to predict "Positive" or "Negative"

---
# Autograder section.

After you finish your code implementation, please run this part of the notebook to see your score for the coding section (6 points).

In [None]:
# ================================
# Pytest Code Cells for Evaluation
# ================================

def run_tests_and_accumulate_score():
    score = 0
    total = 6  # Total code points available: 2 + 2 + 2

    # ------------------------------
    # Part 1: Data Preprocessing (2 points)
    # ------------------------------
    try:
        # Test that tokenizer is defined and callable.
        assert tokenizer is not None, "Tokenizer is not defined."
        assert callable(tokenizer), "Tokenizer is not callable."

        # Test that yield_tokens is implemented.
        assert callable(yield_tokens), "yield_tokens function is not defined or callable."

        # Test that train_list is loaded and non-empty.
        assert len(train_list) > 0, "train_list appears to be empty."

        # Test that vocab is defined and includes the '<pad>' token.
        assert vocab is not None, "vocab is not defined."
        itos = vocab.get_itos() if hasattr(vocab, "get_itos") else []
        assert '<pad>' in itos, "vocab does not contain the '<pad>' token."

        # Test text_pipeline: it should return a list of integers.
        sample_text = "this is a test"
        token_ids = text_pipeline(sample_text)
        assert isinstance(token_ids, list), "text_pipeline should return a list."
        for tid in token_ids:
            assert isinstance(tid, int), "Each token ID from text_pipeline should be an integer."

        # Test label_pipeline: check that it converts labels (e.g., "positive") to an integer.
        sample_label = "positive"
        label_int = label_pipeline(sample_label)
        assert isinstance(label_int, int), "label_pipeline should return an integer."

        # Optionally, test that the Dataset and DataLoader work.
        dataset = YelpDataset(train_list)
        # Ensure __len__ and __getitem__ work.
        assert len(dataset) > 0, "Dataset __len__ returned zero."
        sample_item = dataset[0]
        assert isinstance(sample_item, tuple) and len(sample_item) == 2, "Dataset __getitem__ should return a tuple (text_tensor, label)."
        # Test collate_batch by creating a mini-batch.
        batch = [dataset[i] for i in range(min(3, len(dataset)))]
        collated = collate_batch(batch)
        assert isinstance(collated, tuple) and len(collated) == 2, "collate_batch should return a tuple (padded_texts, labels)."

        score += 2
        print("Data Preprocessing Test: Passed (2 points)")
    except AssertionError as e:
        print("Data Preprocessing Test: Failed -", e)

    # ------------------------------
    # Part 2: RNN Implementation (2 points)
    # ------------------------------
    try:
        # Use arbitrary parameters for testing.
        test_vocab_size = 2000
        test_embed_dim = 50
        test_hidden_dim = 64
        test_output_dim = 1
        test_num_layers = 1
        test_padding_idx = 0

        model = RNNClassifier(test_vocab_size, test_embed_dim, test_hidden_dim, test_output_dim, test_num_layers, test_padding_idx)
        assert isinstance(model, nn.Module), "Model is not an instance of nn.Module."
        # Verify required layers exist.
        assert hasattr(model, "embedding"), "Model is missing an embedding layer."
        assert hasattr(model, "lstm"), "Model is missing an LSTM layer."
        assert hasattr(model, "fc"), "Model is missing a fully connected layer."

        # Test a forward pass using dummy input.
        dummy_input = torch.randint(0, test_vocab_size, (4, 10))  # Batch size of 4, sequence length of 10.
        output = model(dummy_input)
        # Expect output to be 1D with length equal to batch size.
        assert output.dim() == 1, "Output of forward pass should be 1D."
        assert output.shape[0] == 4, f"Output batch size expected 4 but got {output.shape[0]}."

        score += 2
        print("RNN Implementation Test: Passed (2 points)")
    except AssertionError as e:
        print("RNN Implementation Test: Failed -", e)

    # ------------------------------
    # Part 3: Training & Evaluation (2 points)
    # ------------------------------
    try:
        # Check that a function train_and_evaluate() is defined.
        assert callable(train_and_evaluate), "train_and_evaluate() function is not defined or callable."

        # Call the function and capture its output.
        results = train_and_evaluate()
        # Expect results to be a dictionary containing at least 'predictions' and 'avg_loss'.
        assert isinstance(results, dict), "train_and_evaluate() should return a dictionary."
        assert "predictions" in results, "Results should contain the key 'predictions'."
        assert "avg_loss" in results, "Results should contain the key 'avg_loss'."
        # Check that predictions is a list and contains 4 elements (one for each sample review).
        predictions = results["predictions"]
        assert isinstance(predictions, list), "'predictions' should be a list."
        assert len(predictions) == 4, "Expected 4 predictions for the sample reviews."

        targetPreds = ['Positive', 'Negative', 'Negative', 'Positive']
        for idx, pred in enumerate(predictions):
            assert isinstance(pred, float), "Each prediction should be a float."
            assert 0.0 <= pred <= 1.0, "Each prediction should be between 0 and 1."
            sentiment = "Positive" if pred >= 0.5 else "Negative"
            assert sentiment == targetPreds[idx], f"Expected sentiment '{targetPreds[idx]}' but got '{sentiment}'."

        score += 2
        print("Training & Evaluation Test: Passed (2 points)")
    except AssertionError as e:
        print("Training & Evaluation Test: Failed -", e)

    print(f"Total Code Score: {score} / {total}")

# Run the custom test runner
run_tests_and_accumulate_score()


Data Preprocessing Test: Passed (2 points)
RNN Implementation Test: Passed (2 points)
