In [None]:
!pip install underthesea
!pip uninstall torch torchtext -y
!pip install torch==2.0.1 torchtext==0.15.2 torchvision==0.15.2
!pip install gensim
!pip install numpy==1.25.0
!pip install --upgrade gensim
!pip install sacrebleu

Collecting underthesea
  Downloading underthesea-6.8.4-py3-none-any.whl.metadata (15 kB)
Collecting python-crfsuite>=0.9.6 (from underthesea)
  Downloading python_crfsuite-0.9.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.3 kB)
Collecting underthesea-core==1.0.4 (from underthesea)
  Downloading underthesea_core-1.0.4-cp311-cp311-manylinux2010_x86_64.whl.metadata (1.7 kB)
Downloading underthesea-6.8.4-py3-none-any.whl (20.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.9/20.9 MB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading underthesea_core-1.0.4-cp311-cp311-manylinux2010_x86_64.whl (657 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m657.8/657.8 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading python_crfsuite-0.9.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m15.1 MB/s[0m eta [36m



In [None]:
# Data processing libraries
import pandas as pd
import numpy as np
import re, string

# Vietnamese tokenizer
from underthesea import word_tokenize

# TorchText tokenizer and vocab
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

# Type hints
from typing import Iterable, List

# Pretrained word vectors
from gensim.models import KeyedVectors

# PyTorch & Transformer
from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader

# Timer
from timeit import default_timer as timer

# Math
import math



# Ignore warnings
import warnings
warnings.filterwarnings('ignore')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Select device
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Positional encoding for tokens
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float = 0.1,
                 maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()

        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

# Embedding layer with scaling
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

# Full Seq2Seq Transformer model
class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 num_encoder_layers: int,
                 num_decoder_layers: int,
                 emb_size: int,
                 nhead: int,
                 src_vocab_size: int,
                 tgt_vocab_size: int,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(
            emb_size, dropout=dropout)

    def forward(self,
                src: Tensor,
                trg: Tensor,
                src_mask: Tensor,
                tgt_mask: Tensor,
                src_padding_mask: Tensor,
                tgt_padding_mask: Tensor,
                memory_key_padding_mask: Tensor):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        return self.generator(outs)

    def encode(self, src: Tensor, src_mask: Tensor):
        return self.transformer.encoder(self.positional_encoding(
                            self.src_tok_emb(src)), src_mask)

    def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
        return self.transformer.decoder(self.positional_encoding(
                          self.tgt_tok_emb(tgt)), memory,
                          tgt_mask)

In [None]:
# Dataset directory
data_dir = "/content/drive/MyDrive/Colab Notebooks/deep_learning/EnToVieTranfrom/dataset/"

# Load English and Vietnamese sentences
en_sents = open(data_dir + 'en_sents', "r").read().splitlines()
vi_sents = open(data_dir + 'vi_sents', "r").read().splitlines()

# Create dictionary and convert to DataFrame
data = {
    "en": [line for line in en_sents],
    "vi": [line for line in vi_sents],
}
df = pd.DataFrame(data, columns=["en", "vi"])

# Show dataset size and preview
print(len(en_sents))
df.head()

def preprocessing(df):
  # Remove punctuation
  df["en"] = df["en"].apply(lambda ele: ele.translate(str.maketrans('', '', string.punctuation)))
  df["vi"] = df["vi"].apply(lambda ele: ele.translate(str.maketrans('', '', string.punctuation)))

  # Convert to lowercase
  df["en"] = df["en"].apply(lambda ele: ele.lower())
  df["vi"] = df["vi"].apply(lambda ele: ele.lower())

  # Strip leading/trailing spaces
  df["en"] = df["en"].apply(lambda ele: ele.strip())
  df["vi"] = df["vi"].apply(lambda ele: ele.strip())

  # Normalize whitespace
  df["en"] = df["en"].apply(lambda ele: re.sub("\s+", " ", ele))
  df["vi"] = df["vi"].apply(lambda ele: re.sub("\s+", " ", ele))

  return df

# Apply preprocessing
df = preprocessing(df)
df.head()

254090


Unnamed: 0,en,vi
0,please put the dustpan in the broom closet,xin vui lòng đặt người quét rác trong tủ chổi
1,be quiet for a moment,im lặng một lát
2,read this,đọc này
3,tom persuaded the store manager to give him ba...,tom thuyết phục người quản lý cửa hàng trả lại...
4,friendship consists of mutual understanding,tình bạn bao gồm sự hiểu biết lẫn nhau


In [None]:
SRC_LANGUAGE = 'en'
TGT_LANGUAGE = 'vi'

token_transform = {}
vocab_transform = {}

# Vietnamese tokenizer using underthesea
def vi_tokenizer(sentence):
    tokens = word_tokenize(sentence)
    return tokens

# Tokenizer for each language
token_transform[SRC_LANGUAGE] = get_tokenizer('basic_english')
token_transform[TGT_LANGUAGE] = get_tokenizer(vi_tokenizer)

# Token generator for vocab building
def yield_tokens(data_iter: Iterable, language: str) -> List[str]:
    for index, data_sample in data_iter:
        yield token_transform[language](data_sample[language])

# Special token indices
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

# Build vocab for both source and target languages
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    train_iter = df.iterrows()
    vocab_transform[ln] = build_vocab_from_iterator(
        yield_tokens(train_iter, ln),
        min_freq=1,
        specials=special_symbols,
        special_first=True
    )

# Set default index to UNK
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    vocab_transform[ln].set_default_index(UNK_IDX)

In [None]:
# Generate causal mask for decoder
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

# Create masks for input and target
def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

In [None]:
# Sequential transformation of text (tokenization, vocab, tensor conversion)
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

# Convert token ids to tensor with BOS and EOS tokens
def tensor_transform(token_ids: List[int]):
    return torch.cat((torch.tensor([BOS_IDX]),
                      torch.tensor(token_ids),
                      torch.tensor([EOS_IDX])))

# Define transformations for source and target languages
text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[ln] = sequential_transforms(token_transform[ln],
                                               vocab_transform[ln],
                                               tensor_transform)

# Collate function for batching and padding sequences
def collate_fn(batch):
    src_batch, tgt_batch = [], []

    # Apply transformations to each example in the batch
    for src_sample, tgt_sample in batch:
        src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))

    # Pad the sequences to have equal lengths
    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
    return src_batch, tgt_batch


In [None]:
# Greedy Decoding function for translation
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)

    # Encode the source sentence
    memory = model.encode(src, src_mask)

    # Initialize target sequence with the start symbol
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)

    # Generate tokens for the target sequence
    for i in range(max_len-1):
        memory = memory.to(DEVICE)
        tgt_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(DEVICE)

        # Decode the target sequence
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)

        # Get the most probable next word
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        # Append the next word to the target sequence
        ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)

        # Stop if EOS (End Of Sentence) token is generated
        if next_word == EOS_IDX:
            break

    return ys

# Translate a sentence from source to target language
def translate(model: torch.nn.Module, src_sentence: str):
    model.eval()

    # Transform source sentence into token IDs
    src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
    num_tokens = src.shape[0]

    # Create a mask for the source tokens
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)

    # Perform greedy decoding to generate the translated sentence
    tgt_tokens = greedy_decode(model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()

    # Convert token IDs back to words and return the translation
    return " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "")


In [None]:
# Set manual seed for reproducibility
torch.manual_seed(0)

# Define model hyperparameters
SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 64
NUM_ENCODER_LAYERS = 4
NUM_DECODER_LAYERS = 4
DROP_OUT = 0.1

# Initialize the transformer model
transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM, DROP_OUT)


transformer.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/deep_learning/EnToVieTranfrom/models/viEn_transformer.pth", map_location=DEVICE))
model = transformer.to(DEVICE)
model.eval()

Seq2SeqTransformer(
  (transformer): Transformer(
    (encoder): TransformerEncoder(
      (layers): ModuleList(
        (0-3): 4 x TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
          )
          (linear1): Linear(in_features=512, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear2): Linear(in_features=512, out_features=512, bias=True)
          (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.1, inplace=False)
          (dropout2): Dropout(p=0.1, inplace=False)
        )
      )
      (norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    )
    (decoder): TransformerDecoder(
      (layers): ModuleList(
        (0-3): 4 x TransformerDecoderLayer(
          (self_attn): MultiheadAttent

In [None]:
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import sacrebleu

# Smoothing helps avoid BLEU = 0 for short sentences
smooth_fn = SmoothingFunction().method1

reference_sentences = []
translated_sentences = []
bleu_scores = []

# Input and expected output
en_sentence = "fucking this girl is so good"
vi_sentence = "chịch em này sướng ghê"
translated_sentence = translate(transformer, en_sentence)

# Tokenize sentences for NLTK
reference = vi_sentence.split()
candidate = translated_sentence.split()

# Compute BLEU using NLTK
score_nltk = sentence_bleu([reference], candidate, smoothing_function=smooth_fn)
bleu_scores.append(score_nltk)

# Compute BLEU using SacreBLEU
reference_sacrebleu = [vi_sentence]
candidate_sacrebleu = [translated_sentence]
score_sacrebleu = sacrebleu.corpus_bleu(candidate_sacrebleu, [reference_sacrebleu])

# Display results
print("Input English Sentence:", en_sentence)
print("Ground Truth Vietnamese:", vi_sentence)
print("Predicted Vietnamese    :", translated_sentence)
print("Sentence-level BLEU Score (NLTK): {:.4f}".format(score_nltk))
print("Sentence-level BLEU Score (SacreBLEU): {:.4f}".format(score_sacrebleu.score))


Input English Sentence: fucking this girl is so good
Ground Truth Vietnamese: chịch em này sướng ghê
Predicted Vietnamese    :  hiểu lầm cô gái này rất tốt 
Sentence-level BLEU Score (NLTK): 0.0330
Sentence-level BLEU Score (SacreBLEU): 6.5673


In [None]:
sentence = "She studied hard and passed the exam."

translated_sentence = translate(transformer, en_sentence)


reference_translations = ["Cô ấy học chăm chỉ và vượt qua kỳ thi."]


print(translated_sentence)

print(reference_translations)


bleu_score = sacrebleu.sentence_bleu(translated_sentence, reference_translations)

print(f"BLEU score: {bleu_score.score}")

 anh ấy sẽ học tốt nếu anh ấy làm việc chăm chỉ 
['Cô ấy học chăm chỉ và vượt qua kỳ thi.']
BLEU score: 8.054496384843702
