In [None]:
!pip install pyvi

!pip install -U pip setuptools wheel
!pip install -U spacy
!python -m spacy download ja_core_news_sm
!pip install torchmetrics

# prepare DATA 

In [None]:
# read data
import string
import torch
from torch import nn
import numpy as np
vi_input = []
with open("/content/drive/MyDrive/Colab Notebooks/NLP/Data/VN.txt") as f:
  for line in f:
    line = line.replace('  ', ' ').lower()
    vi_input.append(line.strip())

ja_input = []
with open("/content/drive/MyDrive/Colab Notebooks/NLP/Data/JA.txt") as f:
  for line in f:
    ja_input.append(line.strip())

# Thêm token đánh dấu điểm bắt đầu và kết thúc của câu vào mỗi câu trong ngôn ngữ đích
eos = '<eos>'
bos = '<bos>'

from pyvi import ViTokenizer
vi_input_tokenize = [ViTokenizer.tokenize(i).split() for i in vi_input]
for i in range(len(vi_input_tokenize)):
  vi_input_tokenize[i].insert(0, bos)
  vi_input_tokenize[i].insert(len(vi_input_tokenize[i]), eos)

import spacy
nlp = spacy.load("ja_core_news_sm")
ja_input_tokenize = [[] for i in range(len(ja_input))]
for i in range(len(ja_input)):
  doc = nlp(ja_input[i])
  for token in doc:
    ja_input_tokenize[i].append(str(token))

#build vocab
from tensorflow.keras.preprocessing.text import Tokenizer

ja_tokenizer = Tokenizer(oov_token = '<oov>')
ja_tokenizer.fit_on_texts(ja_input_tokenize)
ja_vocabulary = ja_tokenizer.word_index


vi_tokenizer = Tokenizer()
vi_tokenizer.fit_on_texts(vi_input_tokenize)
vi_vocabulary = vi_tokenizer.word_index

ja_vocabulary_reverse = {}
for key, value in ja_tokenizer.word_index.items():
  ja_vocabulary_reverse[value] = key
ja_vocabulary_reverse[0] = ''

vi_vocabulary_reverse = {}
for key, value in vi_tokenizer.word_index.items():
  vi_vocabulary_reverse[value] = key
vi_vocabulary_reverse[0] = ''
vi_vocab_size = len(vi_vocabulary)
ja_vocab_size = len(ja_vocabulary)


#padding
from tensorflow.keras.preprocessing.sequence import pad_sequences

ja_sequence = ja_tokenizer.texts_to_sequences(ja_input_tokenize)
jamaxlen = max([len(i) for i in ja_sequence]) 
ja_sequence = pad_sequences(ja_sequence, maxlen = jamaxlen, padding = 'post')
vi_sequence = vi_tokenizer.texts_to_sequences(vi_input_tokenize)
vimaxlen = max([len(i) for i in vi_sequence])
vi_sequence = pad_sequences(vi_sequence, maxlen = vimaxlen, padding = 'post')

PAD_IDX = 0
BOS_IDX = 1
EOS_IDX = 2
#torch.utils.data.DataLoader
#jamaxlen, vimaxlen, ja_vocab_size, vi_vocab_size, 
#onehot = nn.functional.one_hot(input_vi, num_classes)

# model

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, dim_model, dropout_p, max_len):
        super().__init__()
        # Modified version from: https://pytorch.org/tutorials/beginner/transformer_tutorial.html
        # max_len determines how far the position can have an effect on a token (window)
        
        # Info
        self.dropout = nn.Dropout(dropout_p)
        
        # Encoding - From formula
        pos_encoding = torch.zeros(max_len, dim_model)
        positions_list = torch.arange(0, max_len, dtype=torch.float).view(-1, 1) # 0, 1, 2, 3, 4, 5
        division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.0)) / dim_model) # 1000^(2i/dim_model)
        
        # PE(pos, 2i) = sin(pos/1000^(2i/dim_model))
        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
        
        # PE(pos, 2i + 1) = cos(pos/1000^(2i/dim_model))
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)
        
        # Saving buffer (same as parameter without gradients needed)
        pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer("pos_encoding",pos_encoding)
        
    def forward(self, token_embedding: torch.tensor) -> torch.tensor:
        # Residual connection + pos encoding
        return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0), :])

In [None]:
vi_vocab_size

1634

In [None]:
import math
from torchmetrics.classification import BinaryAccuracy
from torch import Tensor
class Transformer(nn.Module): 
    # Constructor
    def __init__(
        self,
        num_tokens_vi,
        num_tokens_ja,
        dim_model,
        num_heads,
        num_encoder_layers,
        num_decoder_layers,
        dropout_p,
    ):
        super(Transformer, self).__init__()

        # INFO
        self.model_type = "Transformer"
        self.dim_model = dim_model

        # LAYERS
        self.positional_encoder_vi = PositionalEncoding(
            dim_model=dim_model, dropout_p=dropout_p, max_len=num_tokens_vi
        )
        self.positional_encoder_ja = PositionalEncoding(
            dim_model=dim_model, dropout_p=dropout_p, max_len=num_tokens_ja
        )
        self.embedding_vi = nn.Embedding(num_tokens_vi, dim_model)
        self.embedding_ja = nn.Embedding(num_tokens_ja, dim_model)
        self.transformer = nn.Transformer(
            d_model=dim_model,
            nhead=num_heads,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dropout=dropout_p,
        )
        self.out = nn.Sequential(
            nn.Linear(dim_model, num_tokens_vi),
            nn.Softmax(dim=2)
        )
        
    def forward(self, src, tgt, src_mask: Tensor,
                tgt_mask: Tensor, src_padding_mask: Tensor,
                tgt_padding_mask: Tensor, memory_key_padding_mask: Tensor):
        # Src size must be (batch_size, src sequence length)
        # Tgt size must be (batch_size, tgt sequence length)
        

        # Embedding + positional encoding - Out size = (batch_size, sequence length, dim_model)
        src = self.embedding_ja(src) * math.sqrt(self.dim_model)
        tgt = self.embedding_vi(tgt) * math.sqrt(self.dim_model)
        src = self.positional_encoder_ja(src)
        tgt = self.positional_encoder_vi(tgt)
        
       
        # We could use the parameter batch_first=True, but our KDL version doesn't support it yet, so we permute
        # to obtain size (sequence length, batch_size, dim_model),
        src = src.permute(1,0,2)
        tgt = tgt.permute(1,0,2)
      
        # Transformer blocks - Out size = (sequence length, batch_size, num_tokens)
        transformer_out = self.transformer(src, tgt, src_mask=src_mask, tgt_mask=tgt_mask, 
                                           src_key_padding_mask=src_padding_mask, tgt_key_padding_mask=tgt_padding_mask)
        out = self.out(transformer_out)
        out = out.permute(1,0,2)
        return out
      
    def get_tgt_mask(self, size) -> torch.tensor:
        # Generates a squeare matrix where the each row allows one word more to be seen
        mask = torch.tril(torch.ones(size, size) == 1) # Lower triangular matrix
        mask = mask.float()
        mask = mask.masked_fill(mask == 0, float('-inf')) # Convert zeros to -inf
        mask = mask.masked_fill(mask == 1, float(0.0)) # Convert ones to 0
        
        # EX for size=5:
        # [[0., -inf, -inf, -inf, -inf],
        #  [0.,   0., -inf, -inf, -inf],
        #  [0.,   0.,   0., -inf, -inf],
        #  [0.,   0.,   0.,   0., -inf],
        #  [0.,   0.,   0.,   0.,   0.]]
        
        return mask
    
    def create_pad_mask(self, matrix: torch.tensor, pad_token: int) -> torch.tensor:
        # If matrix = [1,2,3,0,0,0] where pad_token=0, the result mask is
        # [False, False, False, True, True, True]
        return (matrix == pad_token)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = Transformer(num_tokens_vi=vi_vocab_size, num_tokens_ja=ja_vocab_size, dim_model=256, num_heads=2, num_encoder_layers=3, num_decoder_layers=3, dropout_p=0.1).to(device)
opt = torch.optim.Adam(
    model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9
)
loss_fn = nn.BCELoss()


In [None]:
class Dataset():
  def __init__(self, ja, vi, num_vocab):
    self.ja = torch.Tensor(ja).to(torch.int64)
    self.vi = torch.Tensor(vi).to(torch.int64)
    self.num_vocab = num_vocab
  def __getitem__(self, i):
    vi_onehot = self.vi[i]
    vi_onehot = nn.functional.one_hot(vi_onehot, self.num_vocab)
    
    return [self.ja[i], self.vi[i][:-1], vi_onehot[1:]]
dataset = Dataset(ja_sequence[:1700], vi_sequence[:1700], vi_vocab_size)
val_dataset = Dataset(ja_sequence[1700:], vi_sequence[1700:], vi_vocab_size)

In [None]:
import tensorflow as tf
class Dataloader(tf.keras.utils.Sequence):
    def __init__(self, dataset, batch_size, size):
        self.dataset = dataset
        self.batch_size = batch_size
        self.size = size

    def __getitem__(self, i):
        # collect batch data
        start = i * self.batch_size
        stop = (i + 1) * self.batch_size
        data = []
        for j in range(start, stop):
            data.append(self.dataset[j])

        batch = [np.stack(samples, axis=0) for samples in zip(*data)]
        return tuple(batch)

    def __len__(self):
        return self.size // self.batch_size
train_dataloader = Dataloader(dataset, 128, len(ja_sequence[:1700]))
val_dataloader = Dataloader(val_dataset, 128, len(vi_sequence[1700:]))

(128, 63)

In [None]:

batch_size = 128
from torchmetrics.classification import BinaryAccuracy
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

def create_mask(src, tgt):
  src_seq_len = src.shape[1]
  tgt_seq_len = tgt.shape[1]

  tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
  src_mask = torch.zeros((src_seq_len, src_seq_len), device=device).type(torch.bool)

  src_padding_mask = (src == PAD_IDX)#.transpose(0, 1)
  tgt_padding_mask = (tgt == PAD_IDX)#.transpose(0, 1)
  return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask
def train_loop(model, opt, loss_fn, dataloader):
  model.train()
  total_loss = 0
  for i in range(len(dataloader)):
    src = torch.Tensor(dataloader[i][0]).to(torch.int64).to(device)
    tgt_i = torch.Tensor(dataloader[i][1]).to(torch.int64).to(device)
    tgt_o =torch.Tensor(dataloader[i][2]).to(torch.float).to(device) 

    #mask
    src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_i)
    
    #print(src_mask.shape, src_padding_mask.shape, tgt_padding_mask.shape, tgt_mask.shape)
    pre = model(src, tgt_i, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)
    loss = loss_fn(pre, tgt_o) 
    opt.zero_grad()
    loss.backward()
    opt.step()
    total_loss += loss.detach().item()
  
  return total_loss / len(dataloader)

In [None]:
from torchmetrics.classification import BinaryAccuracy
def validation_loop(model, loss_fn, dataloader):
    model.eval()
    total_loss = 0
    for i in range(len(dataloader)):
        for batch in dataloader:
            src = torch.Tensor(dataloader[i][0]).to(torch.int64).to(device)
            tgt_i = torch.Tensor(dataloader[i][1]).to(torch.int64).to(device)
            tgt_o =torch.Tensor(dataloader[i][2]).to(torch.float).to(device) 
            
            # Get mask to mask out the next words
          
            

            # Standard training except we pass in y_input and src_mask
            src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_i)
    
    #print(src_mask.shape, src_padding_mask.shape, tgt_padding_mask.shape, tgt_mask.shape)
            pred = model(src, tgt_i, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)
            #print(pred.shape, tgt_o.shape)
            # Permute pred to have batch size first again
               
            loss = loss_fn(pred, tgt_o)
            total_loss += loss.detach().item()

    return total_loss / len(dataloader)

In [None]:
train_loss_list, validation_loss_list = [], []
epochs = 200
for epoch in range(epochs):
      print("-"*25, f"Epoch {epoch + 1}","-"*25)
      tgt_mask = torch.nn.Transformer().generate_square_subsequent_mask(44).to(device).transpose(0,1)  
      train_loss = train_loop(model, opt, loss_fn, train_dataloader)
      train_loss_list += [train_loss]
        
      validation_loss = validation_loop(model, loss_fn, val_dataloader)
      validation_loss_list += [validation_loss]
        
      print(f"Training loss: {10000*train_loss:.4f}")
      print(f"Validation loss: {10000*validation_loss:.4f}")
      print()

In [None]:
src_input = torch.Tensor(val_dataloader[0][0]).to(torch.int64).to(device)
tgt_input = torch.Tensor(val_dataloader[0][1]).to(torch.int64).to(device)
tgt_output = torch.Tensor(val_dataloader[0][2]).to(torch.int64).to(device)
src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src_input, tgt_input)
print(src_input.shape, tgt_input.shape, tgt_mask.shape, tgt_output.shape)
output = model(src_input, tgt_input, src_mask, tgt_mask,
                                src_padding_mask, tgt_padding_mask, src_padding_mask)

torch.Size([128, 63]) torch.Size([128, 44]) torch.Size([44, 44]) torch.Size([128, 44, 1634])


In [None]:
sum(output[0][0])

tensor(1.0000, device='cuda:0', grad_fn=<AddBackward0>)

In [None]:
pre = [np.argmax(i.cpu().detach().numpy()) for i in output[0]]
tar = [np.argmax(t.cpu().detach().numpy()) for t in tgt_output[0]]

In [None]:
print(pre[0:10], tar[0:10])

[72, 241, 64, 4, 1299, 1300, 25, 40, 466, 3] [72, 241, 64, 4, 1299, 1300, 25, 40, 466, 3]


In [None]:
vi_sent_pred = [vi_vocabulary_reverse[i] for i in pre]
vi_sent = [vi_vocabulary_reverse[i] for i in tar]

In [None]:
print(vi_sent_pred, vi_sent)

['con', 'chó', 'nhà', 'tôi', 'chôn', 'xương', 'ở', 'trong', 'vườn', '.', '<eos>', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', ''] ['con', 'chó', 'nhà', 'tôi', 'chôn', 'xương', 'ở', 'trong', 'vườn', '.', '<eos>', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
