In [1]:
!pip install torchtext



In [2]:
from torchtext.legacy.data import Field, TabularDataset, BucketIterator

In [3]:
# tokenize = lambda x: x.split()

# informal = Field(sequential=True, use_vocab=True, tokenize=tokenize, lower=True)
# formal = Field(sequential=True, use_vocab=True, tokenize=tokenize, lower=True)

# fields = {'informal': ("i", informal), 'formal': ("f", formal)}

# train_data, test_data = TabularDataset.splits(path='data', 
#                                               train='train.csv', 
#                                               test='test.csv', 
#                                               format='csv', 
#                                               fields=fields)

In [4]:
# type(train_data)

In [5]:
# informal.build_vocab(train_data, max_size=10000, min_freq=2)

# train_iterator, test_iterator = BucketIterator.splits((train_data, test_data), 
#                                                       batch_size=2, 
#                                                       device='cuda')

In [6]:
# source: https://www.youtube.com/watch?v=EoGUlvhRYpk&list=PLhhyoLH6Ijfyl_VMCsi54UqGQafGkNOQH&index=1
# github: https://github.com/aladdinpersson/Machine-Learning-Collection/blob/ac5dcd03a40a08a8af7e1a67ade37f28cf88db43/ML/Pytorch/more_advanced/Seq2Seq/utils.py#L7

In [7]:
import torch
import spacy
from torchtext.data.metrics import bleu_score
import sys


def translate_sentence(model, sentence, informal, formal, device, max_length=50):
    # print(sentence)

    # sys.exit()

    # Load german tokenizer
    spacy_en = spacy.load("en_core_web_sm")

    # Create tokens using spacy and everything in lower case (which is what our vocab is)
    if type(sentence) == str:
        tokens = [token.text.lower() for token in spacy_en(sentence)]
    else:
        tokens = [token.lower() for token in sentence]

    # print(tokens)

    # sys.exit()
    # Add <SOS> and <EOS> in beginning and end respectively
    tokens.insert(0, informal.init_token)
    tokens.append(informal.eos_token)

    # Go through each german token and convert to an index
    text_to_indices = [informal.vocab.stoi[token] for token in tokens]

    # Convert to Tensor
    sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)

    # Build encoder hidden, cell state
    with torch.no_grad():
        hidden, cell = model.encoder(sentence_tensor)

    outputs = [formal.vocab.stoi["<sos>"]]

    for _ in range(max_length):
        previous_word = torch.LongTensor([outputs[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(previous_word, hidden, cell)
            best_guess = output.argmax(1).item()

        outputs.append(best_guess)

        # Model predicts it's the end of the sentence
        if output.argmax(1).item() == formal.vocab.stoi["<eos>"]:
            break

    translated_sentence = [formal.vocab.itos[idx] for idx in outputs]

    # remove start token
    return translated_sentence[1:]


def bleu(data, model, informal, formal, device):
    targets = []
    outputs = []

    for example in data:
        src = vars(example)["src"]
        trg = vars(example)["trg"]

        prediction = translate_sentence(model, src, informal, formal, device)
        prediction = prediction[:-1]  # remove <eos> token

        targets.append([trg])
        outputs.append(prediction)

    return bleu_score(outputs, targets)


def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model, optimizer):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import spacy
import random
from torch.utils.tensorboard import SummaryWriter

In [9]:
spacy_en = spacy.load('en_core_web_sm')

def tokenizer_en(text):
    return [token.text for token in spacy_en.tokenizer(text)]

informal = Field(tokenize=tokenizer_en, 
                 lower=True, 
                 init_token='<sos>', 
                 eos_token='<eos>')
formal = Field(tokenize=tokenizer_en, 
               lower=True, 
               init_token='<sos>', 
               eos_token='<eos>')

new_fields = {'informal': ("src", informal), 'formal': ("trg", formal)}

train_data, validation_data, test_data = TabularDataset.splits(path='data', 
                                              train='train.csv', validation='valid.csv', 
                                              test='test.csv', 
                                              format='csv', 
                                              fields=new_fields)

informal.build_vocab(train_data, max_size=10000, min_freq=2)
formal.build_vocab(train_data, max_size=10000, min_freq=2)

In [10]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, drop_p):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.dropout = nn.Dropout(drop_p)
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=drop_p)
        
    def forward(self, x):
        # x shape: (seq_length, N)
#         print("x encode", x.shape)
        
        embedding = self.dropout(self.embedding(x))
        outputs, (hidden, cell) = self.rnn(embedding)
        
#         print("embedding encode", embedding.shape)
#         print("hidden encode", hidden.shape)
#         print("cell encode", cell.shape)
        
        return hidden, cell

In [11]:
class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers, drop_p):
        super(Decoder, self).__init__()
        
        #output_size=input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.dropout = nn.Dropout(drop_p)
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, 
                           hidden_size, 
                           num_layers, 
                           dropout=drop_p)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, hidden, cell):
        # x shape : N, want (1, N)
        # translating one word at a time
#         print("x before decode", x.shape)
        x = x.unsqueeze(0)
#         print("x after decode", x.shape)
        
        embedding = self.dropout(self.embedding(x))
#         print('hidden:', hidden.shape)
#         print('cell:', cell.shape)
        outputs, (hidden, cell) = self.rnn(embedding, (hidden, cell))
        
        predictions = self.fc(outputs)
        
        predictions = predictions.squeeze(0)
        
        return predictions, hidden, cell

In [12]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        
    def forward(self, source, target, teacher_force_ratio=0.5):
        # teacher determines chance to use predicted translation rather than target translation
        # prevents overtraining on data, test time it will see very diff words from train
        
        batch_size = source.shape[1]
        target_len = target.shape[0]
        target_vocab_size = len(formal.vocab)
        
        outputs = torch.zeros(target_len, 
                              batch_size, 
                              target_vocab_size).to(device)
        
        # run things into encoder to get hidden and cell, then run those through decoder
        hidden, cell = self.encoder(source)
        
        # start token
        x = target[0]
        
        for t in range(1, target_len):
            output, hidden, cell = self.decoder(x, hidden, cell)
            
            outputs[t] = output
            
            best_guess = output.argmax(1)
            
            x = target[t] if random.random() < teacher_force_ratio else best_guess
            
        return outputs

In [13]:
# training hyperparam

num_epochs = 20
learning_rate = 0.001
batch_size = 64

# model hyperparam

load_model = True
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_size_encoder = len(informal.vocab)
input_size_decoder = len(formal.vocab)
output_size = len(formal.vocab)
encoder_embedding_size = 300
decoder_embedding_size = 300
hidden_size = 1024
num_layers = 2
enc_dropout = 0.5
dec_dropout = 0.5

# tensorboard
writer = SummaryWriter(f'runs/loss_plot')
step = 0

train_iterator, valid_iterator, test_iterator = BucketIterator.splits((train_data, 
                                                                      validation_data, 
                                                                      test_data), 
                                                                      batch_size=batch_size, 
                                                                      sort_within_batch=True, 
                                                                      sort_key= lambda x: len(x.src), 
                                                                      device=device)

encoder_net = Encoder(input_size_encoder, 
                      encoder_embedding_size, 
                      hidden_size, 
                      num_layers, 
                      enc_dropout).to(device)

decoder_net = Decoder(input_size_decoder, 
                      decoder_embedding_size, 
                      hidden_size, 
                      output_size, 
                      num_layers, 
                      dec_dropout).to(device)

model = Seq2Seq(encoder_net, decoder_net).to(device)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)

pad_idx = formal.vocab.stoi['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

if load_model:
    load_checkpoint(torch.load('my_checkpoint.pth.tar'), model, optimizer)
    
# for epoch in range(num_epochs):
#     print(f'Epoch [{epoch}/{num_epochs}]')
    
#     checkpoint = {'state_dict':model.state_dict(), 'optimizer':optimizer.state_dict()}
#     save_checkpoint(checkpoint)
    
#     for batch_idx, batch in enumerate(train_iterator):
#         inp_data = batch.src.to(device)
#         target = batch.trg.to(device)
        
#         output = model(inp_data, target)
#         # output: (trg_len, batch_size, output_dim)
        
#         output = output[1:].reshape(-1, output.shape[2])
#         target = target[1:].reshape(-1)
        
#         optimizer.zero_grad()
#         loss = criterion(output, target)
        
#         loss.backward()
        
#         torch.nn.utils.clip_grad_norm(model.parameters(), max_norm=1)
#         optimizer.step()
        
#         writer.add_scalar('Training Loss', loss, global_step=step)
#         step += 1

=> Loading checkpoint


In [14]:
translate_sentence(model, "Honey, I will be back.", informal, formal, device)

['i', 'am', 'going', 'to', 'be', '.', '.', '<eos>']

In [15]:
def to_formal(sentence):
    lst = translate_sentence(model, sentence, informal, formal, device)
    final = ""
    for i in range(len(lst)-1):
        if i == len(lst)-2:
            final = final.strip()
        final = final + lst[i] + " "
    final = final.strip()
    return final.capitalize()

In [30]:
to_formal("How's life?")

'How is your life?'