In [None]:
import math
import time
import string
import sys
import unidecode

import pandas as pd
import numpy as np
from collections import Counter
import nltk

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.autograd as autograd
from torch.utils.data import DataLoader, Dataset

from data import get_train_test_split, get_sentences

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Model

### Positional Encoding
On concatene à notre vecteur (qui represente une suite de caractère) un vecteur qui représente la position des caractères.

In [None]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

### Model (Transformers)

Définition du Transformers au niveau des caractères.
Le modèle comprend:
- Position Encoder - pour créer le vecteur de représentation des caractères
- Encoder (Embeddings) - pour créer le vecteur de représentation de la chaine de caractères
- Encoder Layers - Transformers (plusieurs couches)
- Decoder - retourne un vecteur de représentation de la chaine de caractères prédite

In [None]:
class ANAIS(nn.Module):
    """Transformers based model"""
    
    def __init__(self, vocab_size, emb_dim, num_heads, hidden_size, num_layers, dropout=0.5):
        super(ANAIS, self).__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(emb_dim, dropout)
        encoder_layers = nn.TransformerEncoderLayer(emb_dim, num_heads, hidden_size, dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.encoder = nn.Embedding(vocab_size, emb_dim)
        self.emb_dim = emb_dim
        self.decoder = nn.Linear(emb_dim, vocab_size)
        self.init_weights()

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src, src_mask):
        src = self.encoder(src) * math.sqrt(self.emb_dim)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output)
        return output
    
    def train(self, dataset, num_epochs=1, lr=5.0):
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(self.parameters(), lr=lr)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)
        
        model.train() # Turn on the train mode
        total_loss = 0.
        start_time = time.time()
        src_mask = self.generate_square_subsequent_mask(bptt).to(device)
        
        for batch, i in enumerate(range(0, train_data.size(0) - 1, bptt)):
            data, targets = get_batch(train_data, i)
            optimizer.zero_grad()
            if data.size(0) != bptt:
                src_mask = model.generate_square_subsequent_mask(data.size(0)).to(device)
            
            output = self.forward(data, src_mask)
            loss = criterion(output.view(-1, ntokens), targets)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
            optimizer.step()

            total_loss += loss.item()
            log_interval = 200
            
            if batch % log_interval == 0 and batch > 0:
                cur_loss = total_loss / log_interval
                elapsed = time.time() - start_time
                print('| epoch {:3d} | {:5d}/{:5d} batches | '
                      'lr {:02.2f} | ms/batch {:5.2f} | '
                      'loss {:5.2f} | ppl {:8.2f}'.format(
                        epoch, batch, len(train_data) // bptt, scheduler.get_lr()[0],
                        elapsed * 1000 / log_interval,
                        cur_loss, math.exp(cur_loss)))
                total_loss = 0
                start_time = time.time()
                
    def evaluate(eval_model, data_source):
        self.eval() # Turn on the evaluation mode
        total_loss = 0.
        src_mask = self.generate_square_subsequent_mask(bptt).to(device)
        with torch.no_grad():
            for i in range(0, data_source.size(0) - 1, bptt):
                data, targets = get_batch(data_source, i)
                if data.size(0) != bptt:
                    src_mask = self.generate_square_subsequent_mask(data.size(0)).to(device)
                output = self.forward(data, src_mask)
                output_flat = output.view(-1, ntokens)
                total_loss += len(data) * criterion(output_flat, targets).item()
        return total_loss / (len(data_source) - 1)

 # Custom Dataset and Data loaders

In [None]:
class AISDataset(Dataset):
    def __init__(self,filename,vocab):
        df = pd.read_csv(filename)
        df = df[['input','target','code']]
        df = df[df['target'].notna()]
        
        self.vocab = vocab
        self.input = df['input']
        self.target = df['target']
        self.code = df['code']
    
    def __getitem__(self, idx):
        return self.input[idx], self.target[idx], self.code[idx]
    
    def __len__(self):
        return len(self.input)

# Creating a vocabulary out of words in dataset

In [None]:
class Vocab(object):
    """Simple vocabulary wrapper."""

    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0

    def add_word(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def __call__(self, word):
        word = str(word).lower()
        if word not in self.word2idx:
            return self.word2idx['<unk>']
        return self.word2idx[word]

    def __getitem__(self,idx):
        return self.idx2word[idx]

    def __len__(self):
        return len(self.word2idx)
    
    def build_vocab(self, words):
        counter = Counter()
        for i, word in enumerate(words):
            token = str(word).lower().strip()
            if(token != ''):counter.update([token])

        # Create a vocab wrapper and add some special tokens.
        # self.add_word('<pad>')
        # self.add_word('<start>')
        # self.add_word('<end>')
        self.add_word('<unk>')

        # Add words to the vocabulary.
        for word, count in counter.items():
            self.add_word(word)

In [None]:
def data_processing(filename):
    df = pd.read_csv(filename)
    df = df[['input','target','code']]
    df = df[df['target'].notna()]
    
    words = df['input'].unique().tolist() + df['target'].unique().tolist()
    vocab = Vocab()
    vocab.build_vocab(words)
    
    
    data = [torch.tensor([vocab[token] for token in tokenizer(item)],
                       dtype=torch.long) for item in raw_text_iter]
      return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))
    
    
    dataset = AISDataset(filename, vocab)
    dataloader = DataLoader(dataset)
    
    return vocab, dataloader

In [None]:
# __main__
vocab, dataloader = data_processing('./clean_dataset.csv')
len(vocab)
len(dataloader)

In [None]:
train_df, test_df = get_train_test_split('./clean_dataset.csv')

In [None]:
all_characters = string.printable
n_characters = len(all_characters)

# Model instance

In [None]:
ntokens = len(vocab.stoi)
emsize = 200 
nhid = 200 
nlayers = 2
nhead = 2
dropout = 0.2
model = TransformerModel(ntokens, emsize,
                         nhead, nhid,
                         nlayers, dropout).to(device)