## Imports

In [17]:
import torch
torch.utils.data.datapipes.utils.common.DILL_AVAILABLE = torch.utils._import_utils.dill_available()
from torch import Tensor
import torch.nn as nn
from torch.nn import Transformer
import math

from typing import Iterable, List
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from sklearn.model_selection import train_test_split
from torchtext.datasets import multi30k, Multi30k

gpu check

In [18]:
print(torch.cuda.device_count())
print(torch.cuda.get_device_name(0))
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

1
NVIDIA GeForce RTX 4090 Laptop GPU


## Util functions for Data Loading

importing data

In [19]:
# We need to modify the URLs for the dataset since the links to the original dataset are broken
# Refer to https://github.com/pytorch/text/issues/1756#issuecomment-1163664163 for more info
multi30k.URL["train"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/training.tar.gz"
multi30k.URL["valid"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/validation.tar.gz"

TGT_LANGUAGE = 'de'
SRC_LANGUAGE = 'en'

In [71]:
token_transform = {}
vocab_transform = {}

# load tokenizers
token_transform[SRC_LANGUAGE] = get_tokenizer('spacy', language='de_core_news_sm')
token_transform[TGT_LANGUAGE] = get_tokenizer('spacy', language='en_core_web_sm')

# set special symbol indices
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

# read dataset
def read_custom_dataset(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
    return [line.strip() for line in lines]

german_data = read_custom_dataset("./train/news-commentary-v9.de-en.de")
english_data = read_custom_dataset("./train/news-commentary-v9.de-en.de")

train_german, val_german, train_english, val_english = train_test_split(
    german_data, english_data, test_size=0.2, random_state=42)

# load dataset in pairs (german, english)
# for ln, data in zip([SRC_LANGUAGE, TGT_LANGUAGE], [english_data, german_data]):
#     vocab_transform[ln] = build_vocab_from_iterator(data,
#                                                     min_freq=1,
#                                                     specials=special_symbols,
#                                                     special_first=True)
    # vocab_transform[ln].set_default_index(UNK_IDX)

def yield_tokens(data_iter: Iterable, language: str) -> List[str]:
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}

    for data_sample in data_iter:
        yield token_transform[language](data_sample[language_index[language]])

for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    # Training data Iterator
    train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    # Create torchtext's Vocab object
    vocab_transform[ln] = build_vocab_from_iterator(yield_tokens(train_iter, ln),
                                                    min_freq=1,
                                                    specials=special_symbols,
                                                    special_first=True)

# set default return value to <UNK> when encountered unknown tokens
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
  vocab_transform[ln].set_default_index(UNK_IDX)

In [72]:
vocab_transform

{'en': Vocab(), 'de': Vocab()}

In [73]:
# Iterate over vocabulary and print each token
for token in vocab_transform[SRC_LANGUAGE].get_itos():
    print(token)

# Convert vocabulary to list
vocab_list = list(vocab_transform[TGT_LANGUAGE].get_itos())
print(vocab_list)


<unk>
<pad>
<bos>
<eos>
a
.
A
in
the
on
is
and
man
of
with
,
woman
are
to
Two
at
wearing
people
white
shirt
young
black
his
an
while
blue
sitting
red
girl
dog
boy
standing
men
playing
group
street
down
walking
front
her
holding
water
by
The
An
up
green
one
women
for
looking
outside
Three
child
as
little
large
through
yellow
two
brown
from
their
ball
hat
into
person
next
children
other
small
dressed
out
over
building
riding
running
People
near
jacket
around
another
some
sidewalk
field
beach
orange
crowd
stands
sits
jumping
pink
behind
table
grass
background
snow
stand
bike
air
city
girls
player
Man
looks
wall
top
dogs
off
dress
that
camera
park
talking
something
older
along
walks
guitar
hair
play
soccer
together
lady
working
boys
food
smiling
has
game
gray
picture
plays
Asian
car
holds
hand
Four
him
road
bench
glasses
pants
shorts
stage
sit
carrying
walk
it
couple
them
baby
bicycle
face
"
Several
side
tree
pool
taking
each
rock
old
race
doing
across
watching
jeans
area
head
male
dirt
ju

## Transformer Architecture

Transformer Architecture

In [21]:
from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
import math

# Positional encoding
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

# encode tensor input indices to tensor of token embedding
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

# Transformer architecture
class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 num_encoder_layers: int,
                 num_decoder_layers: int,
                 emb_size: int,
                 nhead: int,
                 src_vocab_size: int,
                 tgt_vocab_size: int,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(
            emb_size, dropout=dropout)

    def forward(self,
                src: Tensor,
                trg: Tensor,
                src_mask: Tensor,
                tgt_mask: Tensor,
                src_padding_mask: Tensor,
                tgt_padding_mask: Tensor,
                memory_key_padding_mask: Tensor):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        return self.generator(outs)

    def encode(self, src: Tensor, src_mask: Tensor):
        return self.transformer.encoder(self.positional_encoding(
                            self.src_tok_emb(src)), src_mask)

    def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
        return self.transformer.decoder(self.positional_encoding(
                          self.tgt_tok_emb(tgt)), memory,
                          tgt_mask)

generating masks for sequences

In [22]:
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask


def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

setting up transformer instance on gpu

In [23]:
torch.manual_seed(0)

SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 128
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

# EMB_SIZE = 16
# NHEAD = 1
# FFN_HID_DIM = 16
# BATCH_SIZE = 128
# NUM_ENCODER_LAYERS = 1
# NUM_DECODER_LAYERS = 1


transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(DEVICE)

loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)

optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)



## Pre - Processing

pre-processing sequence data into tensors

In [24]:
from torch.nn.utils.rnn import pad_sequence

# helper function to club together sequential operations
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

# add begin of sentence and end of sentence tensors to each sequence
def tensor_transform(token_ids: List[int]):
    return torch.cat((torch.tensor([BOS_IDX]),
                      torch.tensor(token_ids),
                      torch.tensor([EOS_IDX])))

# convert raw strings to tensor sequences with bos and eos tokens
text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[ln] = sequential_transforms(token_transform[ln], #Tokenization
                                               vocab_transform[ln], #Numericalization
                                               tensor_transform) # Add BOS/EOS and create tensor


# convert data samples into batch tensors
def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))

    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
    return src_batch, tgt_batch

## Training

In [26]:
from torch.utils.data import DataLoader

# train and get training score
def train_epoch(model, optimizer):
    model.train()
    losses = 0
    # train_dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn, shuffle=True)
    train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    train_dataloader = DataLoader(train_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)

    for src, tgt in train_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        tgt_input = tgt[:-1, :]

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

        optimizer.zero_grad()

        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        loss.backward()

        optimizer.step()
        losses += loss.item()

    return losses / len(list(train_dataloader))


# val test and get testing score
def evaluate(model):
    model.eval()
    losses = 0
    # val_dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
    val_iter = Multi30k(split='valid', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    val_dataloader = DataLoader(val_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)

    for src, tgt in val_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        tgt_input = tgt[:-1, :]

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        losses += loss.item()

    return losses / len(list(val_dataloader))


## Evaluation / Testing

In [43]:
from timeit import default_timer as timer
NUM_EPOCHS = 6

for epoch in range(1, NUM_EPOCHS + 1):
    start_time = timer()
    train_loss = train_epoch(transformer, optimizer)
    end_time = timer()
    val_loss = evaluate(transformer)
    print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}, "f"Epoch time = {(end_time - start_time):.3f}s"))

#     torch.cuda.empty_cache()


# function to generate output sequence using greedy algorithm
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)

    memory = model.encode(src, src_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len-1):
        memory = memory.to(DEVICE)
        tgt_mask = (generate_square_subsequent_mask(ys.size(0))
                    .type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat([ys,
                        torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        if next_word == EOS_IDX:
            break
    return ys


# actual function to translate input sentence into target language
def translate(model: torch.nn.Module, src_sentence: str):
    model.eval()
    src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1).to(DEVICE)  # Move src tensor to device
    num_tokens = src.shape[0]
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool).to(DEVICE)  # Move src_mask tensor to device
    tgt_tokens = greedy_decode(
        model,  src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
    return " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "")




Epoch: 1, Train loss: 2.211, Val loss: 2.476, Epoch time = 24.938s
Epoch: 2, Train loss: 2.036, Val loss: 2.350, Epoch time = 23.019s
Epoch: 3, Train loss: 1.882, Val loss: 2.277, Epoch time = 23.055s
Epoch: 4, Train loss: 1.745, Val loss: 2.203, Epoch time = 23.081s
Epoch: 5, Train loss: 1.627, Val loss: 2.168, Epoch time = 23.003s
Epoch: 6, Train loss: 1.525, Val loss: 2.135, Epoch time = 22.937s


In [48]:
MODEL_SAVE_PATH = "transformer_model.pth"

torch.cuda.empty_cache()

torch.save(transformer.state_dict(), MODEL_SAVE_PATH)
print("Model saved!")

Model saved!


In [52]:
print(translate(transformer, "Outside a building, a uniformed security guard looks at the camera from behind a fence."))

 Ein Mann in einem orangen Oberteil schaut hinter einem Zaun , das hinter einem Zaun schaut . 


In [None]:
sentences_en = []
sentences_de = []
predicted_de = []

with open("./test/english.txt", "r", encoding="utf-8") as f:
    sentences_en = f.readlines()

with open("./test/german.txt", "r", encoding="utf-8") as f:
    sentences_de = f.readlines()

for sample in sentences_en:
    predicted_de.append(translate(transformer, sample))

with open("./test/pred.txt", "wb") as f:
    f.writelines([x.encode('utf-8') for x in '\n'.join(predicted_de)])

In [53]:
import torch
from torchtext.data.utils import get_tokenizer
from nltk.translate.bleu_score import corpus_bleu
from torchtext.data.utils import get_tokenizer

# Load tokenizer
tokenizer = get_tokenizer('spacy', language='en_core_web_sm')

# Load sentences from files
def load_sentences(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
    return [line.strip() for line in lines]

# Define file paths
reference_file = "./test/german.txt"
candidate_file = "./test/pred.txt"

# Load sentences
reference_sentences = load_sentences(reference_file)
candidate_sentences = load_sentences(candidate_file)

# Tokenize sentences
reference_tokenized = [tokenizer(sentence) for sentence in reference_sentences]
candidate_tokenized = [tokenizer(sentence) for sentence in candidate_sentences]

# Calculate BLEU score
bleu_score = corpus_bleu([[ref] for ref in reference_tokenized], candidate_tokenized)
print("BLEU Score:", bleu_score)


BLEU Score: 0.22655279066095582
