Transformer model with pytorch

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim                         # Adam optimizer
import torch.nn.functional as F                     # Softmax function
from torch.utils.data import DataLoader, Dataset    # Loading batches
import torch.nn.utils.rnn as rnn_utils              # Padding the sequence
from torch.optim.lr_scheduler import OneCycleLR                  # Learning rate scheduler
from transformers import AutoTokenizer              # BPE Tokenizer
import pandas as pd
import numpy as np
import math

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [3]:
# BPE TOKENIZER TEST
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# get number of tokens
encoding = tokenizer.encode_plus(
    slogans[0],
    max_length=10,
    padding="max_length",
    truncation=True,
    return_tensors="pt"
)

tokenizer.decode(encoding['input_ids'][0])

NameError: name 'slogans' is not defined

In [4]:
all_slogans = pd.read_csv('all_slogans.csv', sep=';')
slogans = all_slogans['slogan']
slogans = slogans.str.lower()

# reducing invaluable tokens
to_remove = ['\n', '\r', '>', '\x80', '\x93', '\x94', '\x99', '\x9d', '\xa0',
             '¦', '®', '°', 'º', '¼', '½','×', 'â', 'ã', 'è', 'é', 'ï', 'ñ', 'ú', 'ü',
             '⁄', '（', '）', '，', '·']

dict_to_remove = {"’" : "'", "‘" : "'", "“" : '"', "”" : '"',
                  "…" : '...', '—': '-', '–': '-'}


# removing useless toknes
for char in to_remove:
    slogans = slogans.str.replace(char, ' ')

# replacing tokens with normalised versions
for key, value in dict_to_remove.items():
    slogans = slogans.str.replace(key, value)


# getting the characters (tokens) set
characters = [char for slogan in slogans for char in slogan]
characters = sorted((set(characters)))


# adding in the end of every slogan 'E' end token
slogans = slogans + 'E'
characters = ['E'] + characters

# adding the start of sequence token 'S'
slogans = slogans.apply(lambda x: 'S' + x)
characters = ['S'] + characters

# Add padding token at 0 index
characters = ['P'] + characters


print(characters)
len(characters)

['P', 'S', 'E', ' ', '!', '"', '#', '$', '%', '&', "'", '*', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '=', '?', '[', ']', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '£']


60

In [5]:
# encoding string to integers sequence
# decoding integers to string sequence
to_int = {char: idx for idx, char in enumerate(characters)}
to_str = {idx: char for idx, char in enumerate(characters)}

encode = lambda sentence: [to_int[char] for char in sentence]
decode = lambda sentence: [to_str[char] for char in sentence]

encoded_slogans = [encode(slogan) for slogan in slogans]

In [6]:
# define hyperparameters
vocab_size = len(characters)
d_model = 32 # dim of the embedding vector               # TO CHANGE
nhead = 8 # number of attention heads
num_decoder_layers = 3 # number of decoder layers
dim_feedforward = 2048 # feed-forward network dimension
max_seq_length = 100 
batch_size = 128
dropout = 0.1
PAD_TOKEN = 0


In [7]:
class SloganDataset(Dataset):
    def __init__(self, slogans, encode, max_seq_length=100):
        self.slogans = slogans
        self.encode = encode
        self.max_seq_length = max_seq_length
        
    def __len__(self):
        return len(self.slogans)
    
    def __getitem__(self, idx):
        slogan = self.slogans[idx]
        
        # Truncate if slogan is too long
        if len(slogan) > self.max_seq_length:
            slogan = slogan[:self.max_seq_length]     

        input_sequence = torch.tensor(self.encode(slogan[:-1]), dtype=torch.long)
        target_sequence = torch.tensor(self.encode(slogan[1:]), dtype=torch.long)
        return input_sequence, target_sequence
    

# padding the sequence (For the largest in batch)
def collate_fn(batch): 
    input_sequences, target_sequences = zip(*batch)
    input_sequences_padded = rnn_utils.pad_sequence(input_sequences, batch_first=True, padding_value=0)
    target_sequences_padded = rnn_utils.pad_sequence(target_sequences, batch_first=True, padding_value=0)
    return input_sequences_padded, target_sequences_padded


# Test with subset of slogans
subset_slogans = slogans
dataset = SloganDataset(subset_slogans, encode)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)




### Building Positional Encoding and masks

In [27]:
# Sinusoidal positional encoding
def positional_encoding(seq_len, embed_dim):
    pe = torch.zeros(seq_len, embed_dim)
    for pos in range(seq_len):
        for i in range(0, embed_dim, 2):
            pe[pos, i] = math.sin(pos / (10000 ** (2 * i / embed_dim)))
            pe[pos, i + 1] = math.cos(pos / (10000 ** (2 * i / embed_dim)))
    return pe # Watch the change

# Generate padding mask to prevent looking at not used tokens
def generate_padding_mask(sequence, pad_token=0):
    mask = (sequence == pad_token).float()
    mask = mask.masked_fill(mask == 1, float('-inf')).masked_fill(mask == 0, float(0.0))
    return mask

# Generate look ahead mask to prevent looking at future tokens
def generate_look_ahead_mask(size):
    mask = torch.triu(torch.ones(size, size) * float('-inf'), diagonal=1)
    return mask



In [33]:
### TESTING

# # Example of batch
# for batch in dataloader:
#     input_sequences, target_sequences = batch
#     src = input_sequences
#     break



pad_mask = generate_padding_mask(src)
pad_mask[3]

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf,
        -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf])

In [55]:
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_decoder_layers, 
                 dim_feedforward, max_seq_length):
        super(TransformerModel, self).__init__()
        
        # Create the token embedding
        self.embedding = nn.Embedding(vocab_size, d_model)

        # Initialize weights with Xavier normal for stability
        nn.init.xavier_normal_(self.embedding.weight) 

        # Unsqueeze to add batch dimension
        self.pos_encoder = positional_encoding(max_seq_length, d_model).unsqueeze(0).to(device)

        # Transformer Decoder layers
        self.transformer_decoder_layer = nn.TransformerDecoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(
            self.transformer_decoder_layer, num_layers=num_decoder_layers
        )

        self.fc_out = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(0.2)

    def forward(self, src):
        # Generate look ahead mask to prevent looking at future tokens
        tgt_mask = generate_look_ahead_mask(src.size(1)).to(device) # check the change to 1
        # Use padding mask to prevent looking at not used tokens
        src_pad_mask = generate_padding_mask(src).to(device)
        # sqrt for stabilization
        src = self.embedding(src) * math.sqrt(d_model) # (batch_size, seq_len, d_model)
        # add positional encoding 
        src = src + self.pos_encoder[:, :src.size(1), :] # src.size(1) = seq_len
        output = self.transformer_decoder(tgt=src, memory=src, tgt_mask=tgt_mask,
                                          memory_mask=tgt_mask, tgt_key_padding_mask=src_pad_mask) # Change the memory mask
        output = self.dropout(output)
        output = self.fc_out(output)
        
        return output
    
model = TransformerModel(vocab_size, d_model, nhead, 
                          num_decoder_layers, dim_feedforward, max_seq_length).to(device) # Watch out



criterion = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN)
optimizer = optim.Adam(model.parameters(), lr=0.0005)

# Warmup with LR scheduling (Cosine annealing)
scheduler = OneCycleLR(optimizer, max_lr=0.0001, epochs=20, steps_per_epoch=batch_size)

In [56]:
# Example training loop with dataloader
num_epochs = 20
for epoch in range(num_epochs):
    print(f'Epoch {epoch}')
    for batch in dataloader:
        # Move to GPU
        input_sequences, target_sequences = batch
        input_sequences = input_sequences.to(device)
        target_sequences = target_sequences.to(device)
        optimizer.zero_grad()
        output = model(input_sequences)
        loss = criterion(output.view(-1, vocab_size), target_sequences.view(-1))
        loss.backward()

        optimizer.step()
        scheduler.step()

        
    print(f'Epoch: {epoch}, Loss: {loss.item()}, LR: {scheduler.get_last_lr()[0]:.6f}')

Epoch 0
Epoch: 0, Loss: 4.124213695526123, LR: 0.000007
Epoch 1
Epoch: 1, Loss: 3.791106939315796, LR: 0.000017
Epoch 2
Epoch: 2, Loss: 3.581543207168579, LR: 0.000031
Epoch 3
Epoch: 3, Loss: 3.388166904449463, LR: 0.000048
Epoch 4
Epoch: 4, Loss: 3.2397501468658447, LR: 0.000066
Epoch 5
Epoch: 5, Loss: 3.1353073120117188, LR: 0.000082
Epoch 6
Epoch: 6, Loss: 2.985197067260742, LR: 0.000093
Epoch 7
Epoch: 7, Loss: 2.9241206645965576, LR: 0.000099
Epoch 8
Epoch: 8, Loss: 2.844715118408203, LR: 0.000100
Epoch 9
Epoch: 9, Loss: 2.7662816047668457, LR: 0.000098
Epoch 10
Epoch: 10, Loss: 2.7443525791168213, LR: 0.000096
Epoch 11
Epoch: 11, Loss: 2.6770524978637695, LR: 0.000092
Epoch 12
Epoch: 12, Loss: 2.647608757019043, LR: 0.000087
Epoch 13
Epoch: 13, Loss: 2.6566505432128906, LR: 0.000082
Epoch 14
Epoch: 14, Loss: 2.5873239040374756, LR: 0.000075
Epoch 15
Epoch: 15, Loss: 2.5613865852355957, LR: 0.000068
Epoch 16
Epoch: 16, Loss: 2.5430586338043213, LR: 0.000060
Epoch 17
Epoch: 17, Loss

In [63]:
def generate_slogan(model, start_sequence, max_lenght=100):
    model.eval()
    input_sequence = torch.tensor(encode(start_sequence), dtype=torch.long).unsqueeze(0)
    generated_sequence = input_sequence.tolist()[0]

    for _ in range(max_lenght - len(start_sequence)):   # Watch out
        input_tensor = torch.tensor(generated_sequence[-max_lenght:], dtype=torch.long).unsqueeze(0).to(device)
        with torch.no_grad():
            output = model(input_tensor)
        next_token = torch.argmax(F.softmax(output[0, -1, :], dim=0)).item()
        generated_sequence.append(next_token)
        if to_str[next_token] == 'E':
            break
    
    return ''.join([to_str[idx] for idx in generated_sequence])

start_sequence = "Syo"
generated_slogan = generate_slogan(model, start_sequence)
print(f"Generated slogan: {generated_slogan}")

Generated slogan: Syou the the the the the the the the the the the the the the the the theane the the the the the ane 
