In [3]:
import os
import tarfile
import urllib.request
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.datasets import IMDB
from torchtext.data.utils import get_tokenizer
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, random_split


# Function to download and extract IMDB dataset
def download_extract_imdb(root="./imdb_data"):
    if not os.path.exists(root):
        os.makedirs(root)

    url = "http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
    filename = os.path.join(root, "aclImdb_v1.tar.gz")
    urllib.request.urlretrieve(url, filename)

    # Extract the tar.gz file
    with tarfile.open(filename, "r:gz") as tar:
        tar.extractall(root)

# Check if the dataset is downloaded and extracted
if not os.path.exists("./imdb_data/aclImdb"):
    download_extract_imdb()

# Tokenizer
tokenizer = get_tokenizer("basic_english")

# Load data
def load_imdb_data(root="./imdb_data/aclImdb"):
    data = []
    for label in ["pos", "neg"]:
        label_dir = os.path.join(root, "train", label)
        for filename in os.listdir(label_dir):
            with open(os.path.join(label_dir, filename), "r", encoding="utf-8") as file:
                review = file.read()
                # Tokenize review
                tokenized_review = tokenizer(review)
                data.append((tokenized_review, 1 if label == "pos" else 0))
    return data

# Load training data
train_data = load_imdb_data()

# Build vocabulary
def build_vocab(data):
    vocab = set()
    for tokens, _ in data:
        vocab.update(tokens)
    vocab = list(vocab)
    vocab.insert(0, '<unk>')  # unknown token
    vocab.insert(1, '<pad>')  # padding token
    vocab_to_idx = {word: idx for idx, word in enumerate(vocab)}
    return vocab_to_idx

vocab_to_idx = build_vocab(train_data)

# Model
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, hidden_dim, num_layers, num_classes, dropout=0.5):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=hidden_dim, dropout=dropout),
            num_layers=num_layers
        )
        self.fc = nn.Linear(embed_dim, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        embedded = self.embedding(text)
        embedded = embedded.permute(1, 0, 2)
        transformer_output = self.transformer_encoder(embedded)
        pooled_output = torch.mean(transformer_output, dim=0)
        pooled_output = self.dropout(pooled_output)
        logits = self.fc(pooled_output)
        return logits

# Initialize model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
VOCAB_SIZE = len(vocab_to_idx)
EMBED_DIM = 60
NUM_HEADS = 2
HIDDEN_DIM = 60
NUM_LAYERS = 1
NUM_CLASSES = 2

model = TransformerModel(VOCAB_SIZE, EMBED_DIM, NUM_HEADS, HIDDEN_DIM, NUM_LAYERS, NUM_CLASSES).to(device)

# Training
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()



# Define the collate function to pad sequences
def collate_batch(batch):
    text, labels = zip(*batch)
    labels = torch.tensor(labels)
    # Find the maximum length of text in the batch
    max_length = max(len(item) for item in text)
    # Create a tensor to hold the padded sequences
    padded_text = torch.zeros((len(text), max_length), dtype=torch.long)
    for i, item in enumerate(text):
        # Fill the tensor with the sequences, leaving the remaining space as padding
        padded_text[i, :len(item)] = torch.tensor([vocab_to_idx[token] for token in item])
    return padded_text, labels

# Split dataset into training and validation sets
train_size = int(0.8 * len(train_data))
val_size = len(train_data) - train_size
train_dataset, val_dataset = random_split(train_data, [train_size, val_size])

# Define batch size
BATCH_SIZE = 6

# Create data loaders
train_iterator = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
val_iterator = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_batch)

# Training loop
N_EPOCHS = 5


# Define the number of epochs
N_EPOCHS = 5

# Loop over epochs
for epoch in range(N_EPOCHS):
    # Training Phase
    print("epoch", epoch)
    model.train()  # Set the model to training mode
    epoch_train_loss = 0  # Initialize epoch training loss

    for text, labels in train_iterator:
        text, labels = text.to(device), labels.to(device)  # Move data to GPU if available

        optimizer.zero_grad()  # Zero the gradients

        # Forward pass
        predictions = model(text)

        # Compute the loss
        train_loss = criterion(predictions, labels)

        # Backward pass
        train_loss.backward()

        # Update the parameters
        optimizer.step()

        # Accumulate the training loss for this batch
        epoch_train_loss += train_loss.item()

    # Compute average training loss for the epoch
    average_train_loss = epoch_train_loss / len(train_iterator)

    # Validation Phase
    model.eval()  # Set the model to evaluation mode
    epoch_val_loss = 0  # Initialize epoch validation loss

    with torch.no_grad():  # No gradient computation during validation
        for text, labels in val_iterator:
            text, labels = text.to(device), labels.to(device)  # Move data to GPU if available

            # Forward pass
            predictions = model(text)

            # Compute the loss
            val_loss = criterion(predictions, labels)

            # Accumulate the validation loss for this batch
            epoch_val_loss += val_loss.item()

    # Compute average validation loss for the epoch
    average_val_loss = epoch_val_loss / len(val_iterator)

    # Print epoch information
    print(f'Epoch: {epoch+1:02} | Train Loss: {average_train_loss:.3f} | Val. Loss: {average_val_loss:.3f}')


