In [20]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from collections import Counter
import re


## transformer

In [21]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()
    
    def forward(self, Q, K, V, mask=None):
        d_k = Q.size(-1)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention = F.softmax(scores, dim=-1)
        output = torch.matmul(attention, V)
        return output, attention


In [22]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, model_dim):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.model_dim = model_dim
        
        assert model_dim % num_heads == 0
        
        self.depth = model_dim // num_heads
        self.WQ = nn.Linear(model_dim, model_dim)
        self.WK = nn.Linear(model_dim, model_dim)
        self.WV = nn.Linear(model_dim, model_dim)
        self.linear = nn.Linear(model_dim, model_dim)
        
        self.attention = ScaledDotProductAttention()
    
    def split_heads(self, x, batch_size):
        x = x.view(batch_size, -1, self.num_heads, self.depth)
        return x.transpose(1, 2)
    
    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        
        Q = self.split_heads(self.WQ(Q), batch_size)
        K = self.split_heads(self.WK(K), batch_size)
        V = self.split_heads(self.WV(V), batch_size)
        
        scaled_attention, attention_weights = self.attention(Q, K, V, mask)
        
        scaled_attention = scaled_attention.transpose(1, 2).contiguous()
        scaled_attention = scaled_attention.view(batch_size, -1, self.model_dim)
        
        output = self.linear(scaled_attention)
        return output, attention_weights


In [23]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, model_dim, ff_dim):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(model_dim, ff_dim)
        self.linear2 = nn.Linear(ff_dim, model_dim)
    
    def forward(self, x):
        return self.linear2(F.relu(self.linear1(x)))


In [24]:
class EncoderLayer(nn.Module):
    def __init__(self, model_dim, num_heads, ff_dim, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.mha = MultiHeadAttention(num_heads, model_dim)
        self.ffn = PositionwiseFeedForward(model_dim, ff_dim)
        
        self.layernorm1 = nn.LayerNorm(model_dim)
        self.layernorm2 = nn.LayerNorm(model_dim)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
    
    def forward(self, x, mask=None):
        attn_output, _ = self.mha(x, x, x, mask)
        out1 = self.layernorm1(x + self.dropout1(attn_output))
        
        ffn_output = self.ffn(out1)
        out2 = self.layernorm2(out1 + self.dropout2(ffn_output))
        
        return out2


In [25]:
class Encoder(nn.Module):
    def __init__(self, input_dim, model_dim, num_layers, num_heads, ff_dim, dropout=0.1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, model_dim)
        self.pos_encoding = self.positional_encoding(max_len=5000, model_dim=model_dim)
        
        self.layers = nn.ModuleList([EncoderLayer(model_dim, num_heads, ff_dim, dropout) for _ in range(num_layers)])
        self.dropout = nn.Dropout(dropout)
    
    def positional_encoding(self, max_len, model_dim):
        pos_enc = torch.zeros(max_len, model_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, model_dim, 2).float() * (-math.log(10000.0) / model_dim))
        pos_enc[:, 0::2] = torch.sin(position * div_term)
        pos_enc[:, 1::2] = torch.cos(position * div_term)
        return pos_enc.unsqueeze(0)
    
    def forward(self, x, mask=None):
        seq_len = x.size(1)
        x = self.embedding(x) + self.pos_encoding[:, :seq_len, :].to(x.device)
        x = self.dropout(x)
        
        for layer in self.layers:
            x = layer(x, mask)
        
        return x


In [26]:
class DecoderLayer(nn.Module):
    def __init__(self, model_dim, num_heads, ff_dim, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.mha1 = MultiHeadAttention(num_heads, model_dim)
        self.mha2 = MultiHeadAttention(num_heads, model_dim)
        self.ffn = PositionwiseFeedForward(model_dim, ff_dim)
        
        self.layernorm1 = nn.LayerNorm(model_dim)
        self.layernorm2 = nn.LayerNorm(model_dim)
        self.layernorm3 = nn.LayerNorm(model_dim)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
    
    def forward(self, x, enc_output, look_ahead_mask=None, padding_mask=None):
        attn1, _ = self.mha1(x, x, x, look_ahead_mask)
        out1 = self.layernorm1(x + self.dropout1(attn1))
        
        attn2, _ = self.mha2(out1, enc_output, enc_output, padding_mask)
        out2 = self.layernorm2(out1 + self.dropout2(attn2))
        
        ffn_output = self.ffn(out2)
        out3 = self.layernorm3(out2 + self.dropout3(ffn_output))
        
        return out3


In [27]:
class Decoder(nn.Module):
    def __init__(self, output_dim, model_dim, num_layers, num_heads, ff_dim, dropout=0.1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, model_dim)
        self.pos_encoding = self.positional_encoding(max_len=5000, model_dim=model_dim)
        
        self.layers = nn.ModuleList([DecoderLayer(model_dim, num_heads, ff_dim, dropout) for _ in range(num_layers)])
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(model_dim, output_dim)
    
    def positional_encoding(self, max_len, model_dim):
        pos_enc = torch.zeros(max_len, model_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, model_dim, 2).float() * (-math.log(10000.0) / model_dim))
        pos_enc[:, 0::2] = torch.sin(position * div_term)
        pos_enc[:, 1::2] = torch.cos(position * div_term)
        return pos_enc.unsqueeze(0)
    
    def forward(self, x, enc_output, look_ahead_mask=None, padding_mask=None):
        seq_len = x.size(1)
        x = self.embedding(x) + self.pos_encoding[:, :seq_len, :].to(x.device)
        x = self.dropout(x)
        
        for layer in self.layers:
            x = layer(x, enc_output, look_ahead_mask, padding_mask)
        
        x = self.linear(x)
        return x


In [28]:
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, model_dim, num_layers, num_heads, ff_dim, dropout=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(input_dim, model_dim, num_layers, num_heads, ff_dim, dropout)
        self.decoder = Decoder(output_dim, model_dim, num_layers, num_heads, ff_dim, dropout)
    
    def forward(self, src, tgt, src_mask=None, tgt_mask=None, src_tgt_mask=None):
        enc_output = self.encoder(src, src_mask)
        output = self.decoder(tgt, enc_output, tgt_mask, src_tgt_mask)
        return output


## example: classification

In [29]:
# some data
data = [
    ("I loved this movie, it was fantastic!", 1),
    ("Horrible film, would not recommend.", 0),
    ("The plot was very boring and predictable.", 0),
    ("Great acting and amazing cinematography.", 1),
    ("I really enjoyed this film.", 1),
    ("Waste of time, I hated it.", 0)
]

# 
def preprocess_text(text):
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    text = text.lower().strip().split()
    return text

vocab = Counter()
for sentence, label in data:
    vocab.update(preprocess_text(sentence))

vocab = {word: idx + 2 for idx, (word, _) in enumerate(vocab.items())}
vocab["<pad>"] = 0
vocab["<unk>"] = 1

def sentence_to_indices(sentence, vocab):
    return [vocab.get(word, vocab["<unk>"]) for word in preprocess_text(sentence)]

# index
indexed_data = [(sentence_to_indices(sentence, vocab), label) for sentence, label in data]


In [30]:
class TextClassificationDataset(Dataset):
    def __init__(self, data, vocab, max_len=20):
        self.data = data
        self.vocab = vocab
        self.max_len = max_len
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        sentence, label = self.data[idx]
        sentence = sentence[:self.max_len]
        sentence += [self.vocab["<pad>"]] * (self.max_len - len(sentence))
        return torch.tensor(sentence), torch.tensor(label)

dataset = TextClassificationDataset(indexed_data, vocab)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)


In [31]:
class TransformerForClassification(nn.Module):
    def __init__(self, input_dim, model_dim, num_layers, num_heads, ff_dim, num_classes, dropout=0.1):
        super(TransformerForClassification, self).__init__()
        self.encoder = Encoder(input_dim, model_dim, num_layers, num_heads, ff_dim, dropout)
        self.fc = nn.Linear(model_dim, num_classes)
    
    def forward(self, src, mask=None):
        enc_output = self.encoder(src, mask)
        #
        enc_output = enc_output[:, 0, :]
        output = self.fc(enc_output)
        return output


In [36]:
# model parameter
input_dim = len(vocab)
model_dim = 128
num_layers = 2
num_heads = 4
ff_dim = 512
num_classes = 2
dropout = 0.1

# model
model = TransformerForClassification(input_dim, model_dim, num_layers, num_heads, ff_dim, num_classes, dropout)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# train
def train(model, dataloader, criterion, optimizer, num_epochs=100):
    model.train()
    for epoch in range(num_epochs):
        for i, (src, labels) in enumerate(dataloader):
            optimizer.zero_grad()
            output = model(src)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

train(model, train_loader, criterion, optimizer, num_epochs=100)

# evaluate
def evaluate(model, dataloader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for src, labels in dataloader:
            output = model(src)
            preds = torch.argmax(output, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    return accuracy_score(all_labels, all_preds)

accuracy = evaluate(model, test_loader)
print(f"Test Accuracy: {accuracy}")


Epoch 1/100, Loss: 3.441915273666382
Epoch 2/100, Loss: 0.3453487157821655
Epoch 3/100, Loss: 0.09848977625370026
Epoch 4/100, Loss: 0.007795997895300388
Epoch 5/100, Loss: 0.016726884990930557
Epoch 6/100, Loss: 0.007545831613242626
Epoch 7/100, Loss: 0.002078601624816656
Epoch 8/100, Loss: 0.0025863463524729013
Epoch 9/100, Loss: 0.000902361876796931
Epoch 10/100, Loss: 0.0008003484690561891
Epoch 11/100, Loss: 0.0011971123749390244
Epoch 12/100, Loss: 0.00074769341154024
Epoch 13/100, Loss: 0.0005101444548927248
Epoch 14/100, Loss: 0.0004277488333173096
Epoch 15/100, Loss: 0.0004083866660948843
Epoch 16/100, Loss: 0.000583829649258405
Epoch 17/100, Loss: 0.0004920220817439258
Epoch 18/100, Loss: 0.0003246132982894778
Epoch 19/100, Loss: 0.0005507726455107331
Epoch 20/100, Loss: 0.00041075178887695074
Epoch 21/100, Loss: 0.00027324981056153774
Epoch 22/100, Loss: 0.0003000017604790628
Epoch 23/100, Loss: 0.00024482584558427334
Epoch 24/100, Loss: 0.0002473861677572131
Epoch 25/100, L

In [39]:
#predict
def predict(model, sentence, vocab, max_len=20):
    model.eval()
    indices = sentence_to_indices(sentence, vocab)
    indices = indices[:max_len]
    indices += [vocab["<pad>"]] * (max_len - len(indices))
    src_tensor = torch.tensor(indices).unsqueeze(0)
    
    with torch.no_grad():
        output = model(src_tensor)
        predicted_class = output.argmax(1).item()
    
    return predicted_class

# example
sentence = "I really enjoyed this film"
predicted_class = predict(model, sentence, vocab)
print(f"'{sentence}' 's prediction type: {predicted_class}")


'I really enjoyed this film' 's prediction type: 1
