In [16]:
import math
import copy
import torch
import torch.nn as nn
import torch.nn.functional as F

In [17]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Download dataset
'''!wget https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xzf aclImdb_v1.tar.gz'''

'''import wget
url = "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
wget.download(url, "aclImdb_v1.tar.gz")'''


# Load data
def load_imdb_data(data_dir):
    import os
    texts = []
    labels = []
    for label in ['pos', 'neg']:
        dir_name = os.path.join(data_dir, label)
        for fname in os.listdir(dir_name):
            if fname.endswith('.txt'):
                with open(os.path.join(dir_name, fname), encoding='utf-8') as f:
                    texts.append(f.read())
                    labels.append(1 if label == 'pos' else 0)
    return texts, labels

train_texts, train_labels = load_imdb_data('attention/train')
test_texts, test_labels = load_imdb_data('attention/test')


In [18]:
class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, dropout=0.1):
        super().__init__()
        self.model_dim = model_dim
        self.embedding = nn.Embedding(input_dim, model_dim)
        self.pos_encoder = PositionalEncoding(model_dim, dropout)
        encoder_layers = nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_heads, dropout=dropout, batch_first = True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
        self.dropout = nn.Dropout(dropout)
        self.fc_out = nn.Linear(model_dim, 1)  # For binary classification

    def forward(self, src, src_mask=None):
        src = self.embedding(src) * math.sqrt(self.model_dim)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_key_padding_mask=src_mask)
        output = output.mean(dim=0)  # Global average pooling
        output = self.fc_out(self.dropout(output))
        return output.squeeze()


In [19]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
        )
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position.float() * div_term)
        pe[:, 1::2] = torch.cos(position.float() * div_term)
        pe = pe.unsqueeze(1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


In [20]:
from collections import Counter
import re

def tokenize(text):
    text = text.lower()
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    return text.strip().split()

# Build vocabulary
counter = Counter()
for text in train_texts:
    tokens = tokenize(text)
    counter.update(tokens)

# Keep most common words
vocab_size = 10000
most_common = counter.most_common(vocab_size - 2)  # Reserve 2 for PAD and UNK tokens
word2idx = {word: idx + 2 for idx, (word, _) in enumerate(most_common)}
word2idx['<PAD>'] = 0
word2idx['<UNK>'] = 1

# Create inverse mapping
idx2word = {idx: word for word, idx in word2idx.items()}

# Update INPUT_DIM
INPUT_DIM = vocab_size # To be defined after building the vocabulary


In [21]:
MODEL_DIM = 128
NUM_HEADS = 4
NUM_LAYERS = 2
DROPOUT = 0.1
BATCH_SIZE = 16  # Adjust based on available memory
MAX_SEQ_LEN = 128  # Truncate or pad sequences to this length

In [22]:
model = TransformerEncoder(
    input_dim=INPUT_DIM,
    model_dim=MODEL_DIM,
    num_heads=NUM_HEADS,
    num_layers=NUM_LAYERS,
    dropout=DROPOUT
)

In [23]:
def encode(text):
    tokens = tokenize(text)
    indices = [word2idx.get(token, word2idx['<UNK>']) for token in tokens]
    if len(indices) > MAX_SEQ_LEN:
        indices = indices[:MAX_SEQ_LEN]
    else:
        indices += [word2idx['<PAD>']] * (MAX_SEQ_LEN - len(indices))
    return indices

train_sequences = [encode(text) for text in train_texts]
test_sequences = [encode(text) for text in test_texts]


In [24]:
from torch.utils.data import DataLoader, TensorDataset

train_inputs = torch.LongTensor(train_sequences)
train_labels = torch.FloatTensor(train_labels)
test_inputs = torch.LongTensor(test_sequences)
test_labels = torch.FloatTensor(test_labels)

train_dataset = TensorDataset(train_inputs, train_labels)
test_dataset = TensorDataset(test_inputs, test_labels)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)


In [25]:
import torch.optim as optim

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)


In [26]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model.to(device)

TransformerEncoder(
  (embedding): Embedding(10000, 128)
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
        )
        (linear1): Linear(in_features=128, out_features=2048, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=2048, out_features=128, bias=True)
        (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (dropout): Dropout(p=0.1, inplace=False)
  (fc_out): Linear(in_features=128, out_features=1, bias=True)
)

In [27]:
num_epochs = 40

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for inputs, labels in train_loader:
        inputs = inputs.transpose(0, 1).to(device)  # Transformer expects seq_len x batch_size
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")


Epoch 1/40, Loss: 0.6415
Epoch 2/40, Loss: 0.5528
Epoch 3/40, Loss: 0.5117
Epoch 4/40, Loss: 0.4854
Epoch 5/40, Loss: 0.4623
Epoch 6/40, Loss: 0.4415
Epoch 7/40, Loss: 0.4234
Epoch 8/40, Loss: 0.4091
Epoch 9/40, Loss: 0.3959
Epoch 10/40, Loss: 0.3839
Epoch 11/40, Loss: 0.3738
Epoch 12/40, Loss: 0.3662
Epoch 13/40, Loss: 0.3558
Epoch 14/40, Loss: 0.3488
Epoch 15/40, Loss: 0.3452
Epoch 16/40, Loss: 0.3363
Epoch 17/40, Loss: 0.3272
Epoch 18/40, Loss: 0.3246
Epoch 19/40, Loss: 0.3177
Epoch 20/40, Loss: 0.3113
Epoch 21/40, Loss: 0.3071
Epoch 22/40, Loss: 0.3017
Epoch 23/40, Loss: 0.2978
Epoch 24/40, Loss: 0.2937
Epoch 25/40, Loss: 0.2895
Epoch 26/40, Loss: 0.2839
Epoch 27/40, Loss: 0.2825
Epoch 28/40, Loss: 0.2773
Epoch 29/40, Loss: 0.2760
Epoch 30/40, Loss: 0.2703
Epoch 31/40, Loss: 0.2684
Epoch 32/40, Loss: 0.2666
Epoch 33/40, Loss: 0.2621
Epoch 34/40, Loss: 0.2602
Epoch 35/40, Loss: 0.2569
Epoch 36/40, Loss: 0.2553
Epoch 37/40, Loss: 0.2519
Epoch 38/40, Loss: 0.2488
Epoch 39/40, Loss: 0.

In [28]:
from sklearn.metrics import accuracy_score

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.transpose(0, 1).to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        preds = torch.round(torch.sigmoid(outputs))
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")


Test Accuracy: 0.8321


In [29]:
torch.save(model.state_dict(), 'attention_model.pt')