In [None]:
import os
import re
import tarfile
import urllib.request
from collections import Counter
from pathlib import Path

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# 1. Download and extract IMDB data
DATA_URL = 'https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'
DATA_DIR = 'aclImdb'

def download_and_extract():
    if not os.path.isdir(DATA_DIR):
        filename, _ = urllib.request.urlretrieve(DATA_URL, 'aclImdb_v1.tar.gz')
        with tarfile.open(filename, 'r:gz') as tar:
            tar.extractall()

download_and_extract()

# 2. Preprocessing
def clean_text(text):
    text = text.lower()
    text = re.sub(r"<.*?>", "", text)
    text = re.sub(r"[^a-zA-Z']", " ", text)
    return text

def load_imdb_data(split='train'):
    data = []
    labels = []
    split_path = os.path.join(DATA_DIR, split)
    for label in ['pos', 'neg']:
        folder = os.path.join(split_path, label)
        for file in os.listdir(folder):
            with open(os.path.join(folder, file), encoding='utf-8') as f:
                review = clean_text(f.read())
                data.append(review)
                labels.append(1 if label == 'pos' else 0)
    return data, labels

print("Loading data...")
train_texts, train_labels = load_imdb_data('train')
test_texts, test_labels = load_imdb_data('test')

# 3. Tokenizer and Vocabulary
def tokenize(text):
    return text.split()

def build_vocab(texts, max_size=20000, min_freq=2):
    counter = Counter()
    for text in texts:
        counter.update(tokenize(text))
    vocab = {"<unk>": 0}
    index = 1
    for word, freq in counter.most_common(max_size):
        if freq >= min_freq:
            vocab[word] = index
            index += 1
    return vocab

vocab = build_vocab(train_texts)

# 4. Vectorization using Bag-of-Words
def text_to_bow_vector(text, vocab):
    vector = torch.zeros(len(vocab), dtype=torch.float32)
    for word in tokenize(text):
        idx = vocab.get(word, vocab["<unk>"])
        vector[idx] += 1
    return vector

# 5. Create Dataset and DataLoader
class IMDBDataset(Dataset):
    def __init__(self, texts, labels, vocab):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text_vector = text_to_bow_vector(self.texts[idx], self.vocab)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return text_vector, label

batch_size = 32
train_dataset = IMDBDataset(train_texts, train_labels, vocab)
test_dataset = IMDBDataset(test_texts, test_labels, vocab)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# 6. Define Single Layer Perceptron Model
class SLP(nn.Module):
    def __init__(self, input_size):
        super(SLP, self).__init__()
        self.fc = nn.Linear(input_size, 1)

    def forward(self, x):
        return torch.sigmoid(self.fc(x)).squeeze()

class MLP(nn.Module):
    def __init__(self, input_size, hidden_units=128):
        super(MLP, self).__init__()
        if hidden_units > 0:
            self.fc1 = nn.Linear(input_size, hidden_units)
            self.relu = nn.ReLU()
            self.fc2 = nn.Linear(hidden_units, 1)
        else:
            self.fc1 = nn.Linear(input_size, 1)  # Single-layer perceptron (SLP)

    def forward(self, x):
        x = self.fc1(x)
        if hasattr(self, 'relu'):
            x = self.relu(x)
            x = self.fc2(x)
        return torch.sigmoid(x).squeeze()

input_size = len(vocab)
# model = SLP(input_size)
model = MLP(input_size, hidden_units=128)  # Change hidden_units to 0 for SLP

# 7. Loss and Optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# 8. Training Loop
def train_model(model, loader, optimizer, criterion, epochs=5):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for features, labels in loader:
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(loader)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

# 9. Evaluate
def evaluate_model(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for features, labels in loader:
            outputs = model(features)
            preds = (outputs > 0.5).float()
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    accuracy = 100.0 * correct / total
    print(f"Accuracy: {accuracy:.2f}%")

# 10. Run training and evaluation
print("Training...")
train_model(model, train_loader, optimizer, criterion, epochs=5)

print("Evaluating...")
evaluate_model(model, test_loader)

Loading data...
Training...
Epoch 1, Loss: 0.3342
Epoch 2, Loss: 0.1632
Epoch 3, Loss: 0.0991
Epoch 4, Loss: 0.0588
Epoch 5, Loss: 0.0314
Evaluating...
Accuracy: 85.60%
