In [1]:
# 1D CNN for IMDB text classification

import re
import random
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cpu


In [2]:
# Load IMDB CSV from Data folder

csv_path = './Data/IMDB Dataset.csv'

# The CSV has columns: 'review', 'sentiment'
df = pd.read_csv(csv_path)
print(df.head())
print(df['sentiment'].value_counts())

# Map labels to integers
label_map = {'negative': 0, 'positive': 1}
df['label'] = df['sentiment'].map(label_map)

texts = df['review'].astype(str).tolist()
labels = df['label'].astype(int).tolist()

len(texts), len(labels)

                                              review sentiment
0  One of the other reviewers has mentioned that ...  positive
1  A wonderful little production. <br /><br />The...  positive
2  I thought this was a wonderful way to spend ti...  positive
3  Basically there's a family where a little boy ...  negative
4  Petter Mattei's "Love in the Time of Money" is...  positive
sentiment
positive    25000
negative    25000
Name: count, dtype: int64


(50000, 50000)

In [3]:
# Text cleaning and tokenization, vocabulary and sequence building

def clean_text(text: str) -> str:
    # Lowercase
    text = text.lower()
    # Remove HTML breaks
    text = text.replace('<br />', ' ')
    # Keep letters and basic punctuation, replace others with space
    text = re.sub(r"[^a-zA-Z']", ' ', text)
    # Collapse multiple spaces
    text = re.sub(r"\s+", ' ', text).strip()
    return text

def tokenize(text: str):
    return clean_text(text).split()

# Build vocabulary
min_freq = 2
word_freq = {}
for t in texts:
    for tok in tokenize(t):
        word_freq[tok] = word_freq.get(tok, 0) + 1

# Reserve 0 for PAD, 1 for UNK
word2idx = {'<PAD>': 0, '<UNK>': 1}
for word, freq in word_freq.items():
    if freq >= min_freq:
        word2idx[word] = len(word2idx)

idx2word = {idx: w for w, idx in word2idx.items()}
vocab_size = len(word2idx)
print('Vocab size:', vocab_size)

max_len = 200  # truncate / pad length

def encode(text):
    tokens = tokenize(text)
    ids = [word2idx.get(tok, word2idx['<UNK>']) for tok in tokens]
    # Truncate
    ids = ids[:max_len]
    # Pad
    if len(ids) < max_len:
        ids += [word2idx['<PAD>']] * (max_len - len(ids))
    return ids

all_encoded = [encode(t) for t in texts]

len(all_encoded), len(all_encoded[0])

Vocab size: 70386


(50000, 200)

In [4]:
# Train/validation split and Dataset/DataLoader (no sklearn)

X = np.array(all_encoded, dtype=np.int64)
y = np.array(labels, dtype=np.int64)

# Shuffle indices and split 80/20
num_samples = len(X)
indices = np.arange(num_samples)
np.random.seed(42)
np.random.shuffle(indices)

split = int(0.8 * num_samples)
train_idx = indices[:split]
val_idx = indices[split:]

X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]

class IMDBDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.long)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

batch_size = 128

train_dataset = IMDBDataset(X_train, y_train)
val_dataset = IMDBDataset(X_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

len(train_dataset), len(val_dataset)

(40000, 10000)

In [5]:
# 1D CNN model definition

class TextCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes, kernel_sizes=(3,4,5), num_filters=100, dropout=0.5):
        super(TextCNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)

        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=embed_dim,
                      out_channels=num_filters,
                      kernel_size=k)
            for k in kernel_sizes
        ])

        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(num_filters * len(kernel_sizes), num_classes)

    def forward(self, x):
        # x: (batch, seq_len)
        x = self.embedding(x)              # (batch, seq_len, embed_dim)
        x = x.transpose(1, 2)              # (batch, embed_dim, seq_len)

        conv_outs = []
        for conv in self.convs:
            c = torch.relu(conv(x))        # (batch, num_filters, L)
            c = torch.max(c, dim=2).values # global max pool over time -> (batch, num_filters)
            conv_outs.append(c)

        x = torch.cat(conv_outs, dim=1)    # (batch, num_filters * len(kernel_sizes))
        x = self.dropout(x)
        x = self.fc(x)
        return x

num_classes = 2
embed_dim = 128

model = TextCNN(vocab_size=vocab_size,
                embed_dim=embed_dim,
                num_classes=num_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

model

TextCNN(
  (embedding): Embedding(70386, 128, padding_idx=0)
  (convs): ModuleList(
    (0): Conv1d(128, 100, kernel_size=(3,), stride=(1,))
    (1): Conv1d(128, 100, kernel_size=(4,), stride=(1,))
    (2): Conv1d(128, 100, kernel_size=(5,), stride=(1,))
  )
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=300, out_features=2, bias=True)
)

In [6]:
# Training and evaluation loop

def train_epoch(model, loader, criterion, optimizer):
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0

    for X_batch, y_batch in loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * X_batch.size(0)
        preds = outputs.argmax(dim=1)
        correct += (preds == y_batch).sum().item()
        total += X_batch.size(0)

    return total_loss / total, correct / total


def eval_epoch(model, loader, criterion):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)

            total_loss += loss.item() * X_batch.size(0)
            preds = outputs.argmax(dim=1)
            correct += (preds == y_batch).sum().item()
            total += X_batch.size(0)

    return total_loss / total, correct / total


num_epochs = 5

for epoch in range(1, num_epochs + 1):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
    val_loss, val_acc = eval_epoch(model, val_loader, criterion)
    print(f"Epoch {epoch}: train_loss={train_loss:.4f}, train_acc={train_acc:.4f}, val_loss={val_loss:.4f}, val_acc={val_acc:.4f}")

Epoch 1: train_loss=0.6190, train_acc=0.6623, val_loss=0.4605, val_acc=0.7918
Epoch 2: train_loss=0.4693, train_acc=0.7718, val_loss=0.4157, val_acc=0.8020
Epoch 3: train_loss=0.4006, train_acc=0.8134, val_loss=0.3555, val_acc=0.8420
Epoch 4: train_loss=0.3385, train_acc=0.8492, val_loss=0.3370, val_acc=0.8526
Epoch 5: train_loss=0.2891, train_acc=0.8784, val_loss=0.3188, val_acc=0.8635
