In [None]:
import torch

if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")

Using device: mps


In [159]:
from pathlib import Path

# Define base path to the dataset
BASE_DATA_PATH = Path("../exercise_01/aclImdb")

TRAIN_PATH = BASE_DATA_PATH / "train"
TEST_PATH = BASE_DATA_PATH / "test"

TRAIN_POS_PATH = TRAIN_PATH / "pos"
TRAIN_NEG_PATH = TRAIN_PATH / "neg"
TEST_POS_PATH = TEST_PATH / "pos"
TEST_NEG_PATH = TEST_PATH / "neg"

assert (
    TRAIN_POS_PATH.exists()
    and TRAIN_NEG_PATH.exists()
    and TEST_POS_PATH.exists()
    and TEST_NEG_PATH.exists()
)

In [160]:
import random


def load_imdb_data(
    data_path: Path, subset_size: int | None = None, random_seed: int = 42
) -> tuple[list[str], list[int]]:
    """
    Loads movie reviews and their sentiments from the specified path.
    If subset_size is provided, returns a random subset of that size (with fixed seed).
    """
    texts: list[str] = []
    labels: list[int] = []

    for sentiment, folder_path in [
        ("pos", data_path / "pos"),
        ("neg", data_path / "neg"),
    ]:
        if not folder_path.exists():
            raise FileNotFoundError(f"Warning: Path {folder_path} does not exist.")

        binary_label = 1 if sentiment == "pos" else 0

        for file_path in folder_path.glob("*.txt"):
            texts.append(file_path.read_text(encoding="utf-8"))
            labels.append(binary_label)  # Use binary label, not score

    if subset_size is not None and subset_size < len(texts):
        random.seed(random_seed)
        indices = list(range(len(texts)))
        random.shuffle(indices)
        selected_indices = indices[:subset_size]
        texts = [texts[i] for i in selected_indices]
        labels = [labels[i] for i in selected_indices]

    return texts, labels


# Set the subset size for training data (e.g., 5000 for a smaller subset)
TRAIN_SUBSET_SIZE = 5000  # Change as needed

# Load training data (random subset)
train_texts_raw, train_labels = load_imdb_data(
    TRAIN_PATH, subset_size=TRAIN_SUBSET_SIZE, random_seed=42
)
print(f"Loaded {len(train_texts_raw)} training reviews (subset).")
print(
    f"Training labels distribution: Positive (1): {sum(train_labels)}, Negative (0): {len(train_labels) - sum(train_labels)}"
)

# Load test data (full set)
test_texts_raw, test_labels = load_imdb_data(TEST_PATH)
print(f"Loaded {len(test_texts_raw)} test reviews.")
print(
    f"Test labels distribution: Positive (1): {sum(test_labels)}, Negative (0): {len(test_labels) - sum(test_labels)}"
)


Loaded 5000 training reviews (subset).
Training labels distribution: Positive (1): 2511, Negative (0): 2489
Loaded 25000 test reviews.
Test labels distribution: Positive (1): 12500, Negative (0): 12500


In [161]:
train_texts_raw[:2], train_labels[:2]

(['This is one of those movies that you and a bunch of friends sit around drinking beers, eating pizza, and laugh at. Unfortunately for me I found myself watching this one alone. My friends and I rented a big block of movies and never got around to seeing this one. It was due back and I figured that it was a waste not to watch it. So I did, and I was impressed at how absolutely terrible this movie is.<br /><br />Now, I love bad movies quite a bit, and I probably would have liked this one if the "hero" wasn\'t so utterly loathsome. The entire movie I was hoping that he\'d put that stupid sword down and let someone kill him! He does very little heroic things in the movie. He\'s a beefy, disgusting, stupid thing. He has less redeeming qualities than the villains do. And what was it with all the naked chicks? I mean, I love naked chicks just as much as the next guy, but this movie went a tad overboard in that department.<br /><br />Well, anyway, if you love bad movies and can stand a disgu

In [162]:
import re
import string
from collections import Counter
from typing import List, Set


def pre_process(
    reviews: List[str],
    tokenize_punct: bool = False,
    lowercase: bool = True,
    remove_punct: bool = True,
    remove_high_freq_terms: bool = False,
    high_freq_threshold: float = 0.5,
    replace_numbers: bool = True,
) -> List[List[str]]:
    """
    Pre-processes a list of text reviews by tokenizing, cleaning, and normalizing them.

    Args:
        reviews (List[str]): A list of raw review strings.
        tokenize_punct (bool): If True, punctuation marks are treated as separate tokens.
                               If False, punctuation is discarded during tokenization.
        lowercase (bool): If True, converts all text to lowercase.
        remove_punct (bool): If True, removes punctuation tokens.
                             Note: This is only effective if `tokenize_punct` is True.
        remove_high_freq_terms (bool): If True, removes words that appear in more than
                                       `high_freq_threshold` proportion of documents.
        high_freq_threshold (float): The document frequency threshold for removing terms.
        replace_numbers (bool): If True, replaces all sequences of digits with a 'num' token.

    Returns:
        List[List[str]]: A list of processed reviews, where each review is a list of tokens.
    """
    processed_reviews: List[List[str]] = []
    punct_set: Set[str] = set(string.punctuation)

    for review in reviews:
        text: str = review
        if lowercase:
            text = text.lower()
        if replace_numbers:
            # Use a simple token 'num' that is compatible with the tokenizer.
            text = re.sub(r"\d+", "num", text)

        # Tokenize based on the specified flag
        if tokenize_punct:
            # Keeps words and punctuation as separate tokens
            tokens: List[str] = re.findall(r"\w+|[^\w\s]", text)
        else:
            # Keeps only word characters, effectively discarding punctuation
            tokens: List[str] = re.findall(r"\w+", text)

        # This step is only effective if tokenize_punct=True
        if remove_punct and tokenize_punct:
            tokens = [token for token in tokens if token not in punct_set]

        processed_reviews.append(tokens)

    if remove_high_freq_terms:
        doc_freq = Counter()
        # Calculate document frequency (word appears in how many docs)
        for tokens_list in processed_reviews:
            doc_freq.update(set(tokens_list))

        num_docs: int = len(processed_reviews)
        # Identify terms that are too frequent
        high_freq_terms = {
            term
            for term, freq in doc_freq.items()
            if (freq / num_docs) > high_freq_threshold
        }

        # Filter out the high-frequency terms from all reviews
        processed_reviews = [
            [token for token in tokens_list if token not in high_freq_terms]
            for tokens_list in processed_reviews
        ]

    return processed_reviews


# ==============================================================================
# ==================== FINAL CORRECTED TEST SUITE ==============================
# ==============================================================================

# Define a controlled test corpus to check all behaviors
test_corpus = [
    "Hello common world! This is test 123.",
    "Another common sentence, with a unique word.",
    "Just a common sentence.",
]

# --- Test 1: Default behavior ---
print("Testing default behavior...")
processed = pre_process(test_corpus)
expected_default = [
    ["hello", "common", "world", "this", "is", "test", "num"],
    ["another", "common", "sentence", "with", "a", "unique", "word"],
    ["just", "a", "common", "sentence"],
]
assert processed == expected_default, f"Default test failed. Got: {processed}"

# --- Test 2: `lowercase=False` ---
print("Testing lowercase=False...")
processed = pre_process(test_corpus, lowercase=False)
expected_no_lower = [
    ["Hello", "common", "world", "This", "is", "test", "num"],
    ["Another", "common", "sentence", "with", "a", "unique", "word"],
    ["Just", "a", "common", "sentence"],
]
assert processed == expected_no_lower, f"lowercase=False test failed. Got: {processed}"

# --- Test 3: `replace_numbers=False` ---
print("Testing replace_numbers=False...")
processed = pre_process(test_corpus, replace_numbers=False)
expected_no_num_replace = [
    ["hello", "common", "world", "this", "is", "test", "123"],
    ["another", "common", "sentence", "with", "a", "unique", "word"],
    ["just", "a", "common", "sentence"],
]
assert processed == expected_no_num_replace, (
    f"replace_numbers=False test failed. Got: {processed}"
)

# --- Test 4: Punctuation Handling ---
print("Testing tokenize_punct=True, remove_punct=False...")
processed = pre_process(test_corpus, tokenize_punct=True, remove_punct=False)
expected_keep_punct = [
    ["hello", "common", "world", "!", "this", "is", "test", "num", "."],
    ["another", "common", "sentence", ",", "with", "a", "unique", "word", "."],
    ["just", "a", "common", "sentence", "."],
]
assert processed == expected_keep_punct, (
    f"Keep punctuation test failed. Got: {processed}"
)

print("Testing tokenize_punct=True, remove_punct=True...")
processed = pre_process(test_corpus, tokenize_punct=True, remove_punct=True)
assert processed == expected_default, (
    f"Tokenize then remove punctuation test failed. Got: {processed}"
)

print("Testing tokenize_punct=False, remove_punct=False...")
processed = pre_process(test_corpus, tokenize_punct=False, remove_punct=False)
assert processed == expected_default, (
    f"Discard punctuation test failed. Got: {processed}"
)

# --- Test 5: High-Frequency Term Removal ---
print("Testing high-frequency removal (threshold=0.7)...")
processed = pre_process(
    test_corpus, remove_high_freq_terms=True, high_freq_threshold=0.7
)
expected_remove_common = [
    ["hello", "world", "this", "is", "test", "num"],
    ["another", "sentence", "with", "a", "unique", "word"],
    ["just", "a", "sentence"],
]
assert processed == expected_remove_common, (
    f"High-freq removal (0.7) test failed. Got: {processed}"
)

print("Testing high-frequency removal (threshold=0.6)...")
processed = pre_process(
    test_corpus, remove_high_freq_terms=True, high_freq_threshold=0.6
)
# CORRECTED: The word 'a' should also be removed as its doc freq (0.667) > 0.6
expected_remove_common_and_sentence = [
    ["hello", "world", "this", "is", "test", "num"],
    ["another", "with", "unique", "word"],
    ["just"],
]
assert processed == expected_remove_common_and_sentence, (
    f"High-freq removal (0.6) test failed. Got: {processed}"
)

# --- Test 6: Edge Cases ---
print("Testing edge cases...")
assert pre_process([]) == [], "Edge case: Empty list failed."
assert pre_process([""]) == [[]], "Edge case: List with empty string failed."
assert pre_process(["!@#$"]) == [[]], (
    "Edge case: Punctuation-only string (default) failed."
)
assert pre_process(["!@#$"], tokenize_punct=True, remove_punct=False) == [
    ["!", "@", "#", "$"]
], "Edge case: Punctuation-only string (keep) failed."

print("\n✅ All tests for the pre_process function passed successfully!")

Testing default behavior...
Testing lowercase=False...
Testing replace_numbers=False...
Testing tokenize_punct=True, remove_punct=False...
Testing tokenize_punct=True, remove_punct=True...
Testing tokenize_punct=False, remove_punct=False...
Testing high-frequency removal (threshold=0.7)...
Testing high-frequency removal (threshold=0.6)...
Testing edge cases...

✅ All tests for the pre_process function passed successfully!


In [163]:
tokenized_corpus = pre_process(train_texts_raw)

In [164]:
tokenized_corpus[1][:10] + ["..."] + tokenized_corpus[1][-10:]

['this',
 'documentary',
 'on',
 'schlockmeister',
 'william',
 'castle',
 'takes',
 'a',
 'few',
 'cheap',
 '...',
 'it',
 'does',
 'make',
 'you',
 'nostalgic',
 'for',
 'simpler',
 'movie',
 'going',
 'days']

In [165]:
PAD_TOKEN = "<PAD>"
UNK_TOKEN = "<UNK>"

# Vocabulary
vocab = list(set(word for sentence in tokenized_corpus for word in sentence))

# Add special tokens
if PAD_TOKEN not in vocab:
    vocab.append(PAD_TOKEN)
if UNK_TOKEN not in vocab:
    vocab.append(UNK_TOKEN)

word_to_idx = {word: i for i, word in enumerate(vocab)}
idx_to_word = {i: word for word, i in word_to_idx.items()}
vocab_size = len(vocab)

print(vocab[:5] + ["..."] + vocab[-5:])

print("Vocab Size", vocab_size)
print(f"PAD token index: {word_to_idx[PAD_TOKEN]}")
print(f"UNK token index: {word_to_idx[UNK_TOKEN]}")

['hanged', 'etebari', 'ladder', 'sexists', 'buzzkill', '...', 'despot', 'p', 'fairview', '<PAD>', '<UNK>']
Vocab Size 38281
PAD token index: 38279
UNK token index: 38280


### RNN 

In [166]:
SEQUENCE_LENGTH = 50


def truncate_or_pad(
    sentence: list[str], pad_token=PAD_TOKEN, sequence_length=SEQUENCE_LENGTH
) -> list[str]:
    if len(sentence) >= sequence_length:
        output = sentence[:sequence_length]
    else:
        output = sentence + [pad_token] * (sequence_length - len(sentence))

    assert len(output) == sequence_length

    return output


training_inputs = [truncate_or_pad(sentence) for sentence in tokenized_corpus]


assert all(len(sentence) == SEQUENCE_LENGTH for sentence in training_inputs)

for sentence in [s for s in training_inputs if PAD_TOKEN in s][:5]:
    print(sentence[25:])


['s', 'great', 'student', 'bodies', 'styled', 'gags', 'br', 'br', 'too', 'bad', 'this', 'isn', 't', 'on', 'video', 'but', 'you', 'can', 'still', 'watch', 'it', 'on', 'flix', '<PAD>', '<PAD>']
['were', 'the', 'last', 'movie', 'on', 'earth', 'great', 'actors', 'but', 'bad', 'script', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']
['it', 's', 'just', 'basically', 'a', 'wonderfull', 'movie', 'for', 'all', 'ages', 'i', 'found', 'the', 'last', 'battle', 'scene', 'awesome', 'basically', 'this', 'was', 'a', 'great', 'flick', '<PAD>', '<PAD>']
['s', 'the', 'only', 'porn', 'movie', 'i', 'know', 'that', 'is', 'worth', 'watching', 'between', 'the', 'sex', 'scenes', 'br', 'br', 'bon', 'cinema', 'br', 'br', 'laurent', '<PAD>', '<PAD>', '<PAD>']
['still', 'a', 'very', 'good', 'movie', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD

In [167]:
import torch
from torch.utils.data import Dataset


class RNNDataset(Dataset):
    def __init__(
        self, inputs: list[list[str]], labels: list[int], word_to_idx: dict[str, int]
    ):
        self.inputs = inputs
        self.labels = labels
        self.word_to_idx = word_to_idx
        self.unk_idx = word_to_idx[UNK_TOKEN]

    def __len__(self) -> int:
        return len(self.inputs)

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        sentence = self.inputs[idx]
        # Handle unknown words by mapping them to UNK token
        indices = [self.word_to_idx.get(word, self.unk_idx) for word in sentence]
        label = self.labels[idx]
        return torch.tensor(indices, dtype=torch.long), torch.tensor(label)

In [None]:
from sympy import sequence
import torch
from torch import nn


class RNN(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int, hidden_dim:int, dropout_prob:float=0.5) -> None:
        super().__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim

        self.embeddings = nn.Embedding(
            num_embeddings=vocab_size, embedding_dim=embedding_dim
        )

        self.input_to_hidden= nn.Linear(in_features=embedding_dim, out_features=hidden_dim)
        self.hidden_to_hidden = nn.Linear(in_features=hidden_dim, out_features=hidden_dim)

        self.regression = nn.Linear(in_features=hidden_dim, out_features=1)

        self.dropout = nn.Dropout(p=dropout_prob)


    def forward(self, sentences: torch.Tensor):
        batch_size, sequence_length = sentences.shape

        previous_memories = torch.zeros((batch_size, self.hidden_dim), device=sentences.device)

        for i in range(sequence_length):
            tokens = sentences[:, i]

            embeddings = self.embeddings(tokens)

            new_memories = torch.tanh(self.input_to_hidden(embeddings) + self.hidden_to_hidden(previous_memories))
            new_memories = self.dropout(new_memories)
            previous_memories = new_memories

        return self.regression(new_memories)

In [169]:
from torch.optim import Adam
from torch.utils.data import DataLoader


BATCH_SIZE: int = 32
LEARNING_RATE: float = 0.001

SEQUENCE_LENGTH: int = 100
training_inputs = [truncate_or_pad(sentence, sequence_length=SEQUENCE_LENGTH) for sentence in tokenized_corpus]

# Create training dataset (with UNK token handling)
train_dataset = RNNDataset(training_inputs, train_labels, word_to_idx=word_to_idx)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

EMBEDDING_DIM = 50
HIDDEN_DIM= 50


model = RNN(vocab_size=vocab_size, embedding_dim=EMBEDDING_DIM, hidden_dim=HIDDEN_DIM)
model.to(device)
optimizer = Adam(model.parameters(), lr=LEARNING_RATE)
loss_function = nn.BCEWithLogitsLoss()




EPOCHS: int = 20

# Lists to store metrics for plotting later
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []

for epoch in range(EPOCHS):
    # --- Training Phase ---
    model.train() # Set the model to training mode
    total_train_loss = 0.0
    correct_train = 0
    total_train = 0
    
    for batch_inputs, batch_labels in train_loader:
        batch_inputs, batch_labels = batch_inputs.to(device), batch_labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(batch_inputs)
        
        # Calculate training accuracy for the batch
        preds = (torch.sigmoid(outputs) > 0.5).int().squeeze()
        correct_train += (preds == batch_labels).sum().item()
        total_train += batch_labels.size(0)
        
        # Calculate loss and update weights
        batch_labels = batch_labels.float().unsqueeze(1)
        loss = loss_function(outputs, batch_labels)
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()

    # Calculate average training metrics for the epoch
    avg_train_loss = total_train_loss / len(train_loader)
    epoch_train_acc = correct_train / total_train
    
    train_losses.append(avg_train_loss)
    train_accuracies.append(epoch_train_acc)

    # --- Evaluation Phase ---
    model.eval() # Set the model to evaluation mode (IMPORTANT!)
    total_test_loss = 0.0
    correct_test = 0
    total_test = 0
    
    # Pre-process and load test data once
    test_tokenized = pre_process(test_texts_raw)
    test_inputs = [truncate_or_pad(s, sequence_length=SEQUENCE_LENGTH) for s in test_tokenized]
    test_dataset = RNNDataset(test_inputs, test_labels, word_to_idx=word_to_idx)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)
    
    with torch.no_grad(): # Disable gradient calculations (Saves memory and computation)
        for batch_inputs, batch_labels in test_loader:
            batch_inputs, batch_labels = batch_inputs.to(device), batch_labels.to(device)
            
            outputs = model(batch_inputs)
            
            # Calculate test accuracy for the batch
            preds = (torch.sigmoid(outputs) > 0.5).int().squeeze()
            correct_test += (preds == batch_labels).sum().item()
            total_test += batch_labels.size(0)
            
            # Calculate loss
            batch_labels = batch_labels.float().unsqueeze(1)
            loss = loss_function(outputs, batch_labels)
            total_test_loss += loss.item()

    # Calculate average test metrics for the epoch
    avg_test_loss = total_test_loss / len(test_loader)
    epoch_test_acc = correct_test / total_test
    
    test_losses.append(avg_test_loss)
    test_accuracies.append(epoch_test_acc)

    print(f"Epoch [{epoch+1}/{EPOCHS}] | "
          f"Train Loss: {avg_train_loss:.4f} | Train Acc: {epoch_train_acc:.4f} | "
          f"Test Loss: {avg_test_loss:.4f} | Test Acc: {epoch_test_acc:.4f}")

Epoch [1/20] | Train Loss: 0.6977 | Train Acc: 0.5120 | Test Loss: 0.6967 | Test Acc: 0.5012
Epoch [2/20] | Train Loss: 0.6839 | Train Acc: 0.5556 | Test Loss: 0.7003 | Test Acc: 0.5135
Epoch [3/20] | Train Loss: 0.6665 | Train Acc: 0.5978 | Test Loss: 0.7094 | Test Acc: 0.5164
Epoch [4/20] | Train Loss: 0.6336 | Train Acc: 0.6380 | Test Loss: 0.7188 | Test Acc: 0.5425
Epoch [5/20] | Train Loss: 0.5836 | Train Acc: 0.6922 | Test Loss: 0.7396 | Test Acc: 0.5600
Epoch [6/20] | Train Loss: 0.5336 | Train Acc: 0.7246 | Test Loss: 0.7977 | Test Acc: 0.5714
Epoch [7/20] | Train Loss: 0.4651 | Train Acc: 0.7746 | Test Loss: 0.8279 | Test Acc: 0.5782
Epoch [8/20] | Train Loss: 0.3988 | Train Acc: 0.8126 | Test Loss: 0.8893 | Test Acc: 0.5677
Epoch [9/20] | Train Loss: 0.3473 | Train Acc: 0.8402 | Test Loss: 0.9470 | Test Acc: 0.5447
Epoch [10/20] | Train Loss: 0.2819 | Train Acc: 0.8730 | Test Loss: 1.0409 | Test Acc: 0.5852
Epoch [11/20] | Train Loss: 0.2793 | Train Acc: 0.8946 | Test Loss: 1

In [170]:
# Preprocess test data using the same pipeline as training
test_tokenized_corpus = pre_process(test_texts_raw)
test_inputs = [truncate_or_pad(sentence) for sentence in test_tokenized_corpus]

# Create test dataset (RNNDataset automatically handles OOV words with UNK token)
test_dataset = RNNDataset(test_inputs, test_labels, word_to_idx=word_to_idx)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Test dataset size: {len(test_dataset)}")
print(f"Test batches: {len(test_loader)}")

# Check how many OOV words we have in test data
all_test_words = set(word for sentence in test_tokenized_corpus for word in sentence)
train_vocab_words = set(word_to_idx.keys()) - {PAD_TOKEN, UNK_TOKEN}
oov_words = all_test_words - train_vocab_words
print(f"Out-of-vocabulary words in test data: {len(oov_words)}")
print(f"Total unique words in test data: {len(all_test_words)}")
print(f"OOV percentage: {len(oov_words) / len(all_test_words) * 100:.2f}%")


Test dataset size: 25000
Test batches: 782
Out-of-vocabulary words in test data: 41964
Total unique words in test data: 72915
OOV percentage: 57.55%


In [171]:
from collections import Counter

# Evaluate the model on test data
model.eval()
correct_predictions = 0
total_predictions = 0

all_predictions = []
all_labels = []

with torch.no_grad():
    for batch_inputs, batch_labels in test_loader:
        outputs = model(batch_inputs)
        
        # Apply sigmoid to get probabilities
        predictions = torch.sigmoid(outputs).squeeze()
        
        # Convert to binary predictions (0 or 1)
        binary_predictions = (predictions > 0.5).int()
        
        # Store predictions and labels for analysis
        all_predictions.extend(binary_predictions.tolist())
        all_labels.extend(batch_labels.tolist())
        
        # Count correct predictions
        correct_predictions += (binary_predictions == batch_labels).sum().item()
        total_predictions += batch_labels.size(0)

# Calculate accuracy
accuracy = correct_predictions / total_predictions
print(f"Test Accuracy: {accuracy:.4f} ({correct_predictions}/{total_predictions})")

# Additional metrics
prediction_counts = Counter(all_predictions)
label_counts = Counter(all_labels)

print(f"Prediction distribution: {prediction_counts}")
print(f"True label distribution: {label_counts}")

# Calculate precision, recall, and F1-score for positive class
true_positives = sum(1 for pred, true in zip(all_predictions, all_labels) if pred == 1 and true == 1)
false_positives = sum(1 for pred, true in zip(all_predictions, all_labels) if pred == 1 and true == 0)
false_negatives = sum(1 for pred, true in zip(all_predictions, all_labels) if pred == 0 and true == 1)

precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1_score:.4f}")


RuntimeError: Placeholder storage has not been allocated on MPS device!

In [None]:
# Show some example predictions
import random

def show_predictions(num_examples=5):
    indices = random.sample(range(len(all_predictions)), num_examples)
    
    print("Example predictions:")
    print("-" * 80)
    
    for i, idx in enumerate(indices):
        prediction = all_predictions[idx]
        true_label = all_labels[idx]
        
        # Get the original text (first 200 characters)
        original_text = test_texts_raw[idx][:200] + "..." if len(test_texts_raw[idx]) > 200 else test_texts_raw[idx]
        
        status = "✓" if prediction == true_label else "✗"
        sentiment_pred = "Positive" if prediction == 1 else "Negative"
        sentiment_true = "Positive" if true_label == 1 else "Negative"
        
        print(f"{i+1}. {status} Predicted: {sentiment_pred} | True: {sentiment_true}")
        print(f"   Text: {original_text}")
        print()

show_predictions()


Example predictions:
--------------------------------------------------------------------------------
1. ✗ Predicted: Positive | True: Negative
   Text: I would like to comment on how the girls are chosen. why is that their are always more white women chosen then their are black women. every episode their is always more white women then black one's. a...

2. ✗ Predicted: Negative | True: Positive
   Text: What a pleasure. This is really a parody. Only french people can do that kind of thing without being coarse. And as a result, you spend a really good time watching Jean Dujardin playing the dumb. Most...

3. ✓ Predicted: Positive | True: Positive
   Text: Not having seen the film in its commercial debut, we just caught with it via DVD. Expecting the worst, "Hitch" proved to be a pleasant experience because of the three principals in it. Thanks to Andy ...

4. ✓ Predicted: Positive | True: Positive
   Text: The '70s were a great time for horror movies. The Brotherhood of Satan is yet a