# Sentiment Classifier (GRU)

Train a simple GRU-based model to classify IMDB reviews as positive/negative with beginner guidance.

> Beginner quick start

- Windows: use the Python download cell (skip shell).
- Start with a small subset and 1 epoch to validate the pipeline, then scale up.

In [None]:
import os
import re
import zipfile
import urllib.request
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from wordcloud import WordCloud
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

## Get IMDB dataset

In [None]:
# Non-Windows (optional): shell-based download/extract
# !wget https://github.com/SalvatoreRa/tutorial/blob/main/datasets/IMDB.zip?raw=true
# !unzip IMDB.zip?raw=true
# df = pd.read_csv('IMDB Dataset.csv')

In [None]:
def ensure_imdb_csv(csv_name='IMDB Dataset.csv', url='https://raw.githubusercontent.com/SalvatoreRa/tutorial/main/datasets/IMDB.zip'):
    if os.path.exists(csv_name):
        return csv_name
    zip_path = 'IMDB.zip'
    urllib.request.urlretrieve(url, zip_path)
    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall('.')
    try:
        os.remove(zip_path)
    except OSError:
        pass
    if not os.path.exists(csv_name):
        raise FileNotFoundError(f
                                )
    return csv_name


csv_path = ensure_imdb_csv()
df = pd.read_csv(csv_path)
df.head()

## Quick EDA: word clouds and lengths

In [None]:
nltk.download('stopwords')
nltk.download('punkt')
stop_words = set(stopwords.words('english'))


def get_words(series):
    all_words = []
    for review in series:
        r = re.sub(r'[^\w\s]', ' ', review)
        r = re.sub(r'\d', '', r)
        words = r.split()
        all_words.extend(
            [w for w in words if w not in stop_words and len(w) > 1])
    return all_words


pos = df[df['sentiment'] == 'positive']['review']
neg = df[df['sentiment'] == 'negative']['review']
pos_counts = Counter(get_words(pos))
neg_counts = Counter(get_words(neg))

WordCloud(width=400, height=400, background_color='white').generate_from_frequencies(
    pos_counts).to_image()
WordCloud(width=400, height=400, background_color='white').generate_from_frequencies(
    neg_counts).to_image()

In [None]:
def review_lengths(series): return [len(r.split()) for r in series]


plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.hist(review_lengths(pos), bins=30, color='green', alpha=.7)
plt.title('Positive lengths')
plt.subplot(1, 2, 2)
plt.hist(review_lengths(neg), bins=30, color='red', alpha=.7)
plt.title('Negative lengths')
plt.tight_layout()
plt.show()

## Tokenize, vectorize, pad

In [None]:
def preprocess_review(review):
    review = re.sub(r'[^\w\s]', ' ', review)
    review = re.sub(r'\s+', ' ', review)
    review = re.sub(r'\d', '', review)
    return review.strip().lower()


def tokenize_and_build_vocab(reviews, max_vocab=1000):
    corpus = Counter()
    for r in reviews:
        words = word_tokenize(preprocess_review(r))
        corpus.update([w for w in words if w not in stop_words and len(w) > 1])
    vocab = {w: i+1 for i, (w, _) in enumerate(corpus.most_common(max_vocab))}
    return vocab


def vectorize_reviews(reviews, vocab):
    vec = []
    for r in reviews:
        words = word_tokenize(preprocess_review(r))
        vec.append([vocab[w] for w in words if w in vocab])
    return vec


X, y = df['review'].values, np.where(df['sentiment'] == 'positive', 0, 1)
x_train, x_test, y_train, y_test = train_test_split(
    X, y, stratify=y, test_size=.2, random_state=42)
x_train, x_val, y_train, y_val = train_test_split(
    x_train, y_train, stratify=y_train, test_size=.1, random_state=42)
vocab = tokenize_and_build_vocab(x_train, max_vocab=1000)


def pad(seqs, max_len):
    out = np.zeros((len(seqs), max_len), dtype=int)
    for i, s in enumerate(seqs):
        out[i, -len(s):] = np.array(s[:max_len]) if len(s) > 0 else 0
    return out


x_train, x_val, x_test = map(lambda s: pad(
    vectorize_reviews(s, vocab), 500), [x_train, x_val, x_test])
y_train, y_val, y_test = map(np.array, [y_train, y_val, y_test])

## Define model (GRU)

In [None]:
class SentimentRNN(nn.Module):
    def __init__(self, no_layers, vocab_size, hidden_dim, embedding_dim, drop_prob=0.5):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.no_layers = no_layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.GRU(input_size=embedding_dim, hidden_size=hidden_dim,
                          num_layers=no_layers, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(hidden_dim, 1)
        self.sig = nn.Sigmoid()

    def forward(self, x, hidden):
        embeds = self.embedding(x)
        rnn_out, hidden = self.rnn(embeds, hidden)
        rnn_out = rnn_out.contiguous().view(-1, self.hidden_dim)
        out = self.dropout(rnn_out)
        out = self.fc(out)
        out = self.sig(out)
        out = out.view(x.size(0), -1)[:, -1]
        return out, hidden

    def init_hidden(self, batch):
        return torch.zeros((self.no_layers, batch, self.hidden_dim))

## Train (small, quick run)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
no_layers, hidden_dim, embedding_dim = 2, 128, 100
model = SentimentRNN(no_layers, len(vocab)+1, hidden_dim,
                     embedding_dim).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
train_loader = DataLoader(TensorDataset(torch.from_numpy(
    x_train), torch.from_numpy(y_train)), batch_size=64, shuffle=True)
valid_loader = DataLoader(TensorDataset(torch.from_numpy(
    x_val), torch.from_numpy(y_val)), batch_size=64)
epochs = 1
for epoch in range(epochs):
    h = model.init_hidden(64).to(device)
    model.train()
    tr_loss = 0
    tr_acc = 0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device).float()
        h = h.data
        model.zero_grad()
        out, h = model(xb, h)
        loss = criterion(out, yb)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 5)
        optimizer.step()
        tr_loss += loss.item()*xb.size(0)
        preds = (out > 0.5).float()
        tr_acc += (preds == yb).sum().item()
    print(f'Epoch {epoch+1} Train Loss: {tr_loss/len(train_loader.dataset):.4f} | Train Acc: {tr_acc/len(train_loader.dataset):.3f}')

## Evaluate

In [None]:
test_loader = DataLoader(TensorDataset(torch.from_numpy(
    x_test), torch.from_numpy(y_test)), batch_size=256)
model.eval()
all_preds = []
all_true = []
with torch.no_grad():
    h = model.init_hidden(256).to(device)
    for xb, yb in test_loader:
        xb = xb.to(device)
        yb = yb.numpy()
        out, h = model(xb, h)
        preds = (torch.sigmoid(out) > 0.5).float().cpu().numpy()
        all_preds.extend(preds)
        all_true.extend(yb)
acc = accuracy_score(all_true, all_preds)
cm = confusion_matrix(all_true, all_preds)
print('Test accuracy:', acc)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.show()