In [1]:
import torch.nn as nn
import numpy as np
import torch
import unicodedata
import string
from torch import optim
import torch.nn.functional as F
from tqdm import tqdm_notebook

import re

device = torch.device("cuda")
device

device(type='cuda')

In [76]:
class Util():
    """
        Util class used in order to load the training text.
       The text is loaded from a file.
    """
    
    def __init__(self, file_path):
        # ammount of shit in the caesar cypher
        self.shift = 1
        # tuple of sentence -> target
        self.data = self.read_file(file_path)
        # Data dictionary (character based)
        self.word_2_index = self.create_dictionary(self.data)
        # Inverse dictionary (transform charcter code to the given character)
        self.index_2_word = {v: k for k, v in self.word_2_index.items()}
        
    # Turn a Unicode string to plain ASCII, thanks to
    # https://stackoverflow.com/a/518232/2809427
    def unicodeToAscii(self, s):
        return ''.join(
            c for c in unicodedata.normalize('NFD', s)
            if unicodedata.category(c) != 'Mn'
        )
    
    def read_file(self, file_path):
        """ 
            Read the training data file and append the sentence -> target
            to the data array.
        """
        
        f = open(file_path)
        data = []
        
        for line in f.readlines():
            line = line.split('\t')[0]
            sentence = line.split(' ') # split into words
            sentence = [word.lower() for word in sentence] # make all the words lowercase
            sentence = [word.translate(str.maketrans('', '', string.punctuation)) for word in sentence]
            # convert characters to asci and remove non letters
            sentence = [self.unicodeToAscii(word.strip()) for word in sentence]
            sentence = [re.sub(r"([.!?}{])", r" \1", word) for word in sentence]
            sentence = [re.sub(r"[^a-zA-Z.!?}{]+", r"", word) for word in sentence]
 
            target = []
            for word in sentence:
                stri = ""
                coded = [chr(ord(x) + self.shift) if x !='z' else x for x in word ]
                for x in coded:
                    stri += x
                target.append(stri)
            
            data.append((sentence,target))
            
        return data
    
    def create_dictionary(self, data):
        """ 
            Iterate over each sentence in the data and add the characters
            to the dictionary used. Based on it create an inverse dictionary
            that maps the numbers to character.
        """
        word_2_index = {}
        
        word_2_index["SOS"] = 0
        word_2_index["EOS"] = 1
        
        for sentence, target in data:
            for word in sentence:
                for char in word:
                    if char not in word_2_index:
                        word_2_index[char] = len(word_2_index)
                        
            for word in target:
                for char in word:
                    if char not in word_2_index:
                        word_2_index[char] = len(word_2_index)
                    
        return word_2_index
    
    def get_values(self, inp, output):
        """
            Get a normal sentence and transform it to a tensor of dictionary values.
        """
        input_tensor = [self.word_2_index[char] for word in inp for char in word]
        output_tensor = [self.word_2_index[char] for word in output for char in word]
        
        output_tensor.append(1)
        
        return torch.tensor(input_tensor).to(device), torch.tensor(output_tensor).to(device)
        
util = Util("ron.txt")
util.get_values(util.data[25][0], util.data[25][1])

(tensor([13,  7, 15, 15, 19, 14], device='cuda:0'),
 tensor([20,  5,  6,  7, 19,  7,  1], device='cuda:0'))

In [None]:
class Encoder(nn.Module):
    def __init__(self, hidden_size, emb_size, vocab_size):
        super(Encoder, self).__init__()
        # Define the layers size
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.emb_size = emb_size
        
        # Transformations used
        # LSTM layer used to encode the embedding output
        self.lstm = nn.LSTM(self.hidden_size, self.hidden_size)
        # Embedding layer for the input of a sentence (char or word)
        self.embedding = nn.Embedding(self.vocab_size, self.emb_size)
        # Linear layer to map the embedding dimension to the hidden unit dimension
        self.lin = nn.Linear(self.emb_size, self.hidden_size)
        
        
    def init_hidden(self):
        return (torch.zeros(1, 1, self.hidden_size).to(device), torch.zeros(1, 1, self.hidden_size).to(device))
            
    
    def forward(self, inputs, hidden):
        # Embedd each character to a higher space
        x = self.embedding(inputs).view(1, 1, -1)
        # Transform the higher space to the lstm space (50 ->256)
        x = self.lin(x)
        # Non-linear activation
        x = F.relu(x)
        
        # Apply the sequential LSTM to each input
        out, hidden = self.lstm(x, hidden)
        
        return out, hidden

In [4]:
class AttentionDecoder(nn.Module):
    def __init__(self, vocab_size, emb_hidden, hidden_size):
        super(AttentionDecoder, self).__init__()
        # vocab dimension, embedding space dimenson, hidden space dimension
        self.vocab_size = vocab_size
        self.emb_hidden = emb_hidden
        self.hidden_size = hidden_size
        
        # create the nn layers for the decoding part
        self.embedding = nn.Embedding(self.vocab_size, self.emb_hidden)
        self.dense = nn.Linear(self.emb_hidden, self.hidden_size)
        self.attn = nn.Linear(3 * hidden_size, 1)
        self.lstm = nn.LSTM(self.hidden_size, self.hidden_size)
        self.input_combine = nn.Linear(2 * hidden_size, hidden_size)
        self.last = nn.Linear(self.hidden_size, self.vocab_size)
        
    def init_hidden(self):
        return (torch.zeros(1, 1, self.hidden_size).to(device), torch.zeros(1, 1, self.hidden_size).to(device))
    
    def forward(self, x, decoder_hidden, encoder_outputs):
        # Embedd each input to a higher space
        x = self.embedding(x).view(1, 1, -1)
        # Transform the higher space to the lstm space (50 ->256)
        x = self.dense(x)
        # Non-linear activation
        x = F.relu(x)
        attention_values = []
        
        for i in range(len(encoder_outputs)):
            # concatinate encoder_output at i with the decoder hidden state (tuple cause lstm has 2) -> 3 * hidden_size 
            enc_dec_concat = torch.cat((encoder_outputs[i].view(1, 1, -1), torch.cat((decoder_hidden[0], decoder_hidden[1]), 2)), 2)
            
            attn_value = self.attn(enc_dec_concat)
            attention_values.append(attn_value)
        alphas = torch.cat(attention_values, 1)
        alphas_norm = F.softmax(alphas, dim=1)
        # Weight multiplication for each encoder output to denote it's importance 
        c = torch.bmm(alphas_norm.view(1, 1, -1), encoder_outputs.view(1, -1, self.hidden_size))
        
        x = torch.cat((x.view(1, 1, -1), c.view(1, 1, -1)), 2)
        x = self.input_combine(x)
        x = F.relu(x)
        
        out, decoder_hidden = self.lstm(x, decoder_hidden)
        out = self.last(out[0])
        
        out = F.log_softmax(out, dim=1)
        
        return out, decoder_hidden
        
        

In [5]:
# Define the encoder and decoder architecture
encoder = Encoder(256, 50, len(util.word_2_index)).to(device)
decoder = AttentionDecoder(len(util.word_2_index), 50, 256).to(device)
learning_rate = 0.01
# Define optimizers for each architecture used
encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)

# Define a criterion to calculate the error
criterion = nn.NLLLoss()

In [63]:
def learn(input_tensor, target_tensor):
    """
        Method used to iterate and train over one sentence.
    """
    # Initiate the encoder hidden layer for the lstm
    encoder_hidden = encoder.init_hidden()
    # Define the attention encoder outputs
    encoder_outputs = torch.zeros([len(input_tensor), 1, 256]).cuda()
    
    loss = 0
    
    #clear the last gradients
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()
    
    # Encode the inputs in order to get the encoder outputs for the context vector
    for i in range(len(input_tensor)):
        encoder_out, encoder_hidden = encoder(input_tensor[i], encoder_hidden)
        encoder_outputs[i] = encoder_out
    
    out = torch.tensor([0]).cuda() # sos
    # Decoder lstm input gets its values from the encoder lstm output
    decoder_hidden = encoder_hidden
    
    for i, targ in enumerate(target_tensor):
        # decoder output softmax of vocab size
        decoder_out, decoder_hidden = decoder(out.cuda(), decoder_hidden, encoder_outputs)
        # get the max value and the max index
        topv, topi = decoder_out.topk(1)
        # transform the max index in a tensor and feed it as an input
        out = topi.detach().long().cuda()
        # calculate the loss [batch_size, output_softmax], long_target_value
        loss += criterion(decoder_out.squeeze().unsqueeze(0).to(device), target_tensor[i].unsqueeze(0).to(device))
        if out.item() == 1:
            break
        
    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()   
    
    
    return loss / len(target_tensor)


In [64]:
def train():
    for i in range(6):
        loss = 0
        for inp, target in tqdm_notebook(util.data):
            inp_tensor, output_tensor = util.get_values(inp, target)
            loss += learn(inp_tensor, output_tensor)
        
        print(loss)
        
train()

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  after removing the cwd from sys.path.


HBox(children=(FloatProgress(value=0.0, max=2165.0), HTML(value='')))


tensor(0.6894, device='cuda:0', grad_fn=<ThAddBackward>)


HBox(children=(FloatProgress(value=0.0, max=2165.0), HTML(value='')))


tensor(0.6745, device='cuda:0', grad_fn=<ThAddBackward>)


HBox(children=(FloatProgress(value=0.0, max=2165.0), HTML(value='')))


tensor(0.6603, device='cuda:0', grad_fn=<ThAddBackward>)


HBox(children=(FloatProgress(value=0.0, max=2165.0), HTML(value='')))


tensor(0.6467, device='cuda:0', grad_fn=<ThAddBackward>)


HBox(children=(FloatProgress(value=0.0, max=2165.0), HTML(value='')))


tensor(0.6336, device='cuda:0', grad_fn=<ThAddBackward>)


HBox(children=(FloatProgress(value=0.0, max=2165.0), HTML(value='')))


tensor(0.6210, device='cuda:0', grad_fn=<ThAddBackward>)


In [65]:
def transform_code_sentence(tensor_vec):
    
    sentence = [util.index_2_word[i.item()] for i in tensor_vec]
    
    return sentence

In [66]:
def validate_sentence(inp_tensor, target_tensor):
    
    with torch.no_grad():
        correct_predictions = 0
        encoder_outputs = torch.zeros([len(inp_tensor), 1, 256]).cuda()
        encoder_hidden = encoder.init_hidden()
            
        for i, inp in enumerate(inp_tensor):
            encoder_out, encoder_hidden = encoder(inp.cuda(), encoder_hidden)
            encoder_outputs[i] = encoder_out

        decoder_hidden = encoder_hidden
        out = torch.tensor([0]).cuda() # EOS
        
        for i, target in enumerate(target_tensor):
            # decoder output softmax of vocab size
            decoder_out, decoder_hidden = decoder(out.cuda(), decoder_hidden, encoder_outputs)
            # get the max value and the max index
            topv, topi = decoder_out.topk(1)
            # transform the max index in a tensor and feed it as an input
            out = topi.detach().long().cuda()

            if out.item() == target:
                correct_predictions += 1
            if out.item() == 1:
                break
    
    return correct_predictions, len(target_tensor)

In [67]:
util2 = Util("ron_validation.txt")

correct_predictions = 0
total_predictions = 0

for inp, target in tqdm_notebook(util2.data):
    inp, target = util.get_values(inp, target)
    correct_it, total_it = validate_sentence(inp, target)
    correct_predictions += correct_it
    total_predictions += total_it

print(correct_predictions)
print(total_predictions)
print((correct_predictions / total_predictions) * 100)

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  


HBox(children=(FloatProgress(value=0.0, max=185.0), HTML(value='')))


1290
4800
26.875


In [43]:
inpts, _ = util.get_values("imibagpl", "asae")
print(transform_code_sentence(inpts))
encoder_outputs = torch.zeros([len(inpts), 1, 256]).cuda()
encoder_hidden = encoder.init_hidden()

['i', 'm', 'i', 'b', 'a', 'g', 'p', 'l']


In [58]:
with torch.no_grad():
    for i, inp in enumerate(inpts):
        encoder_out, encoder_hidden = encoder(inp.cuda(), encoder_hidden)
        encoder_outputs[i] = encoder_out

    out = torch.tensor([0]).cuda() # EOS
    decoder_hidden = encoder_hidden
    result = []
    for i in range(15):
        # decoder output softmax of vocab size
        decoder_out, decoder_hidden = decoder(out.cuda(), decoder_hidden, encoder_outputs)
        # get the max value and the max index
        topv, topi = decoder_out.topk(1)
        # transform the max index in a tensor and feed it as an input
        out = topi.detach().long().cuda()
        result.append(out)
        print(out)
        if out.item() == 1:
            break
            
    print(transform_code_sentence(result))

tensor([[4]], device='cuda:0')
tensor([[7]], device='cuda:0')
tensor([[4]], device='cuda:0')
tensor([[23]], device='cuda:0')
tensor([[15]], device='cuda:0')
tensor([[2]], device='cuda:0')
tensor([[4]], device='cuda:0')
tensor([[14]], device='cuda:0')
tensor([[2]], device='cuda:0')
tensor([[23]], device='cuda:0')
tensor([[7]], device='cuda:0')
tensor([[4]], device='cuda:0')
tensor([[18]], device='cuda:0')
tensor([[23]], device='cuda:0')
tensor([[2]], device='cuda:0')
['j', 'n', 'j', 'b', 'e', 'h', 'j', 'f', 'h', 'b', 'n', 'j', 'm', 'b', 'h']
