In [17]:
import os
import glob
import random
import argparse
from collections import Counter

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

# Try to import torchtext. If unavailable, we'll fall back to our own tokenizer.
try:
    import torchtext
    from torchtext.data.utils import get_tokenizer
    from torchtext.vocab import build_vocab_from_iterator
    torchtext_available = True
except ImportError:
    torchtext_available = False

In [18]:
max_length = 600       # Maximum sequence length
max_tokens = 20000     # Maximum vocabulary size
batch_size = 32
embed_dim = 256
num_heads = 2
dense_dim = 32
num_classes = 5 

In [19]:
class TextClassificationDataset(Dataset):
    """
    A custom Dataset that reads text files from a directory structure.
    Expects subdirectories for each category containing .txt files.
    """
    def __init__(self, root_dir, vocab, tokenizer, max_length=600):
        self.samples = []
        self.vocab = vocab
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.label2idx = {}
        self.idx2label = {}
        # Get categories sorted alphabetically
        categories = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
        for i, cat in enumerate(categories):
            self.label2idx[cat] = i
            self.idx2label[i] = cat
            cat_dir = os.path.join(root_dir, cat)
            files = glob.glob(os.path.join(cat_dir, "*.txt"))
            for f in files:
                with open(f, "r", encoding="utf-8") as fp:
                    text = fp.read().strip()
                self.samples.append((text, i))
        random.shuffle(self.samples)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        text, label = self.samples[idx]
        tokens = self.tokenizer(text)
        # Convert tokens to indices; if token not found, use index 0 (<unk>)
        indices = [self.vocab.get(token, 0) for token in tokens]
        # Pad or truncate the sequence to max_length
        if len(indices) < self.max_length:
            indices = indices + [0] * (self.max_length - len(indices))
        else:
            indices = indices[:self.max_length]
        return torch.tensor(indices, dtype=torch.long), torch.tensor(label, dtype=torch.long)

In [20]:
def build_vocab(data_dir, tokenizer, use_torchtext=True, max_tokens=20000):
    """
    Build a vocabulary from text files located in data_dir (assumed to be the training directory).
    """
    texts = []
    categories = sorted([d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))])
    for cat in categories:
        cat_dir = os.path.join(data_dir, cat)
        files = glob.glob(os.path.join(cat_dir, "*.txt"))
        for f in files:
            with open(f, "r", encoding="utf-8") as fp:
                text = fp.read().strip()
            texts.append(text)
    
    def yield_tokens(texts):
        for t in texts:
            yield tokenizer(t)
    
    if use_torchtext and torchtext_available:
        # Use torchtext's vocabulary builder
        vocab = build_vocab_from_iterator(yield_tokens(texts), specials=["<unk>"], max_tokens=max_tokens)
        vocab.set_default_index(vocab["<unk>"])
        # Convert torchtext Vocab to a regular dict mapping token -> index
        vocab_dict = {token: vocab[token] for token in vocab.get_itos()}
        return vocab_dict
    else:
        # Manually build vocabulary using collections.Counter
        counter = Counter()
        for t in texts:
            tokens = tokenizer(t)
            counter.update(tokens)
        most_common = counter.most_common(max_tokens - 1)  # reserve index 0 for <unk>
        vocab = {"<unk>": 0}
        for idx, (token, count) in enumerate(most_common, start=1):
            vocab[token] = idx
        return vocab

In [21]:
class PositionalEmbedding(nn.Module):
    """
    Adds token embeddings and learned positional embeddings.
    """
    def __init__(self, sequence_length, vocab_size, embed_dim):
        super().__init__()
        self.token_embeddings = nn.Embedding(vocab_size, embed_dim)
        self.position_embeddings = nn.Embedding(sequence_length, embed_dim)
        self.sequence_length = sequence_length

    def forward(self, x):
        # x: (batch_size, seq_length)
        batch_size, seq_length = x.size()
        positions = torch.arange(0, seq_length, device=x.device).unsqueeze(0).expand(batch_size, seq_length)
        token_emb = self.token_embeddings(x)          # (batch_size, seq_length, embed_dim)
        pos_emb = self.position_embeddings(positions)   # (batch_size, seq_length, embed_dim)
        return token_emb + pos_emb

In [22]:
class TransformerEncoder(nn.Module):
    """
    A single Transformer encoder block with multi-head self-attention, a feedforward network, 
    and residual connections with layer normalization.
    """
    def __init__(self, embed_dim, dense_dim, num_heads):
        super().__init__()
        self.multihead_attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.linear1 = nn.Linear(embed_dim, dense_dim)
        self.linear2 = nn.Linear(dense_dim, embed_dim)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(0.1)
        self.activation = nn.ReLU()

    def forward(self, x, src_mask=None):
        # x: (batch_size, seq_length, embed_dim)
        attn_output, _ = self.multihead_attn(x, x, x, attn_mask=src_mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.linear2(self.activation(self.linear1(x)))
        x = self.norm2(x + self.dropout(ff_output))
        return x


In [23]:
class TextClassificationModel(nn.Module):
    """
    Complete model: embeddings, transformer encoder, global max pooling, dropout, and a classification head.
    """
    def __init__(self, sequence_length, vocab_size, embed_dim, num_heads, dense_dim, num_classes):
        super().__init__()
        self.embedding = PositionalEmbedding(sequence_length, vocab_size, embed_dim)
        self.transformer = TransformerEncoder(embed_dim, dense_dim, num_heads)
        # Global max pooling is implemented using adaptive max pooling over the time dimension.
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(embed_dim, num_classes)

    def forward(self, x):
        # x: (batch_size, seq_length)
        x = self.embedding(x)            # (batch_size, seq_length, embed_dim)
        x = self.transformer(x)          # (batch_size, seq_length, embed_dim)
        # Permute to (batch_size, embed_dim, seq_length) for pooling
        x = x.transpose(1, 2)
        x = self.pool(x).squeeze(2)      # (batch_size, embed_dim)
        x = self.dropout(x)
        x = self.fc(x)
        return x



In [24]:
# ------------------------
# Training and Evaluation
# ------------------------
def train_model(model, train_loader, val_loader, device, epochs=5, lr=1e-3):
    train_accuracy_array = []
    val_accuracy_array = []
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.RMSprop(model.parameters(), lr=lr)
    model.to(device)
    best_val_acc = 0
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        running_corrects = 0
        total = 0
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels).item()
            total += inputs.size(0)
        epoch_loss = running_loss / total
        epoch_acc = running_corrects / total
        print(f"Epoch {epoch+1}/{epochs} - Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")
        train_accuracy_array.append(epoch_acc)
        # Validate
        model.eval()
        val_corrects = 0
        val_total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                val_corrects += torch.sum(preds == labels).item()
                val_total += inputs.size(0)
        val_acc = val_corrects / val_total
        val_accuracy_array.append(val_acc)
        print(f"Validation Accuracy: {val_acc:.4f}")
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), "best_model.pth")
    print("Training complete. Best validation accuracy: {:.4f}".format(best_val_acc))
    return train_accuracy_array, val_accuracy_array

def test_model(model, test_loader, device):
    model.to(device)
    model.eval()
    test_corrects = 0
    test_total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            test_corrects += torch.sum(preds == labels).item()
            test_total += inputs.size(0)
    test_acc = test_corrects / test_total
    print(f"Test Accuracy: {test_acc:.4f}")

In [26]:

# ------------------------
# Main Function
# ------------------------

use_torchtext = True
if use_torchtext and torchtext_available:
    print("Using torchtext tokenizer (basic_english)")
    tokenizer = get_tokenizer("basic_english")
else:
    print("Using basic Python tokenizer (split on whitespace)")
    tokenizer = lambda x: x.lower().split()

train_dir = os.path.join("Datasets", "train")
vocab = build_vocab(train_dir, tokenizer, use_torchtext=use_torchtext, max_tokens=max_tokens)
vocab_size = len(vocab)
print("Vocabulary size:", vocab_size)

# Create datasets and dataloaders
train_dataset = TextClassificationDataset(train_dir, vocab, tokenizer, max_length)
val_dir = os.path.join("Datasets", "val")
val_dataset = TextClassificationDataset(val_dir, vocab, tokenizer, max_length)
test_dir = os.path.join("Datasets", "test")
test_dataset = TextClassificationDataset(test_dir, vocab, tokenizer, max_length)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the model
model = TextClassificationModel(sequence_length=max_length,
                                vocab_size=vocab_size,
                                embed_dim=embed_dim,
                                num_heads=num_heads,
                                dense_dim=dense_dim,
                                num_classes=num_classes)
print(model)

# Train and validate the model
nepochs=20
train_array, val_array = train_model(model, train_loader, val_loader, device, epochs=nepochs)

# Load the best model and evaluate on the test set
model.load_state_dict(torch.load("best_model.pth", map_location=device))
test_model(model, test_loader, device)

Using torchtext tokenizer (basic_english)
Vocabulary size: 20000
TextClassificationModel(
  (embedding): PositionalEmbedding(
    (token_embeddings): Embedding(20000, 256)
    (position_embeddings): Embedding(600, 256)
  )
  (transformer): TransformerEncoder(
    (multihead_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
    )
    (linear1): Linear(in_features=256, out_features=32, bias=True)
    (linear2): Linear(in_features=32, out_features=256, bias=True)
    (norm1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
    (activation): ReLU()
  )
  (pool): AdaptiveMaxPool1d(output_size=1)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=256, out_features=5, bias=True)
)
Epoch 1/20 - Train Loss: 2.4921 Acc: 0.2261
Validation Accuracy: 0.3818
Epoch 2/20 - Train Loss: 1.5657 Acc

In [None]:
import matplotlib.pyplot as plt

# Assume train_array and val_array are lists or numpy arrays of the same length.
epochs = range(1, len(train_array) + 1)

plt.figure(figsize=(10, 6))
plt.plot(epochs, train_array, 'b-o', label='Train Metric')
plt.plot(epochs, val_array, 'r-o', label='Validation Metric')
plt.xlabel('Epoch')
plt.ylabel('Metric Value')  # Change label as needed (e.g., "Loss" or "Accuracy")
plt.title('Training and Validation Metrics using basic tokenizer')
plt.legend()
plt.grid(True)
plt.tight_layout()

# Save the plot to a file
plt.savefig('accuracy_plot_simple.png')

# Optionally, display the plot
plt.show()


In [None]:
# test accuracy using basic = 77.41%
# test accuracy using torchtext = 74.42%