<a id="plan"></a>

# Tableau de bord pour la librairie libNLP

Ce tableau de bord possède deux fonctions :

1/ Une fonctionalité _tutorielle_ : La librairie est entièrement détaillée, avec une explication et une illustration pour chaque module et modèle.

2/ Une fonctionalité d'_édition_ : Chaque modification dans une cellule de ce notebook est appliquée au module correspondant dans la librairie.

## Table des matières

1. [Modèles](#modeles)

    1.1 [Classifieurs de documents](#ClassifieursDeDocuments)
    
    1.2 [Convertisseurs de texte en distribution](#Convertisseurs)
    

2. [Modules](#modules)

    2.1 [Encodeurs de texte](#encodeursDeTexte)
        2.1.1 Encodeurs de mots
        2.1.2 Encodeur de texte
        
    2.2 [Modules d'attention simple](#attentionSimple)
        2.2.1 Attention additive
        2.2.2 Attention additive multi-tête
        2.2.3 Attention additive multi-hopée
        
    2.3 [Modules d'attention hiérarchique](#attentionHierarchique)
    
    2.4 [Décodeurs](#decodeurs)
        2.4.1 Décodeurs sélectifs
        2.4.1 Décodeurs génératifs




Création du répertoire principal contenant la librairie, dans le quel on se déplace ensuite et où on génère un fichier README.txt avec une brève présentation de cette librairie :

In [21]:
%mkdir libNLP_v2

Un sous-r‚pertoire ou un fichier libNLP_v2 existe d‚j….


In [22]:
%cd libNLP_v2

C:\Users\Jb\Desktop\Scripts\notebooks\libNLP_v2


In [23]:
%%writefile README.txt


Inspiration pour la construction de la librairie :
https://github.com/pytorch/fairseq
https://github.com/allenai/allennlp
https://www.dabeaz.com/modulepackage/ModulePackage.pdf


Dans le tutorial complet on doit avoir :

- en intro, la représentation de la lib_NLP en graphe
- une table des matières avec liens hypertexte. Structure :
    1° Modèles :
        - classifieur de documents
    2° Modules :
        - encodage
        - attention
        - encodage hiérarchique
        - décodage

- option d'écriture de chaque module dans le fichier .py correspondant dans la librairie

- faire un Setup pour l'import de données

Overwriting README.txt


Transformation du répertoire courant en librairie Python :

In [24]:
%%writefile __init__.py

#import libNLP.modules
#import libNLP.models

Overwriting __init__.py


<a id="modeles"></a>


# 1 Modèles

[Retour à la table des matières](#plan)

Génération du sous-répertoire _libNLP.models_ contenant l'ensemble des modèles de Deep Learning développés dans cette librairie :

In [25]:
%mkdir models

Un sous-r‚pertoire ou un fichier models existe d‚j….


In [26]:
%%writefile models/__init__.py


from .Text_Classifier import (TextClassifier, 
                              TextClassifierCreator, 
                              TextClassifierTrainer)
from .Root_Converter import RootConverter

Overwriting models/__init__.py


<a id="ClassifieursDeDocuments"></a>

## 1.1 Classifieurs de documents

[Retour à la table des matières](#plan)

Le modèle principal de classification de documents est le module **TextClassifier**, basé sur le modèle _Hierarchical Attention Network_ et constitué de trois sous-modules (stockés dans _libNLP.modules_) :

- Un _Text Encoder_ afin de transformer chaque mot d'un texte en un vecteur contextualisé
- Une _Attention Hiérarchique_ pour extraire l'information contenue dans le texte encodé
- Un _Class Decoder_ afin de générer une classe de sortie basé sur le résultat de l'attention

Ce modèle est accompagné d'une méthode **TextClassifierCreator** afin de générer un classifieur basé sur la classe TextClassifier, ainsi que d'un module **TextClassifierTrainer** permettant d'entrainer ce classifieur sur un jeu de données.

![Multilabel_Classifier](figs/Attention_multiple_residuelle.png)

In [27]:
%%writefile models/Text_Classifier.py

import math
import time

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker #, FuncFormatter
#%matplotlib inline

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.autograd import Variable
from libNLP_v2.modules import (
                            RecurrentWordsEncoder, 
                            TextEncoder, 
                            HierarchicalTextEncoder,
                            
                            AdditiveAttention,
                            MultiHeadAttention,
                            MultiHopedAttention,
                            RecurrentHierarchicalAttention, 
                            
                            ClassDecoder,
                            MultiTaskClassDecoder,
                            MultilabelDecoder,
                            MultilabelDecoderV2)




class TextClassifier(nn.Module):
    def __init__(self, device, text_encoder, attention, classes_decoder) :
        super(TextClassifier, self).__init__()
        
        self.device = device
        self.n_level = attention.n_level if attention is not None else 1
        # modules        
        self.text_encoder = text_encoder
        self.attention = attention
        self.classes_decoder = classes_decoder
    
    
    # ---------------------- Technical methods -----------------------------
    def loadSubModule(self, text_encoder = None, attention = None, classes_decoder = None) :
        if text_encoder is not None :
            self.text_encoder = text_encoder
        if attention is not None :
            self.attention = attention
        if classes_decoder is not None :
            self.classes_decoder = classes_decoder
        return
    
    
    def freezeSubModule(self, text_encoder = False, attention = False, classes_decoder = False) :
        for param in self.text_encoder.parameters():
            param.requires_grad = not text_encoder
        for param in self.attention.parameters():
            param.requires_grad = not attention
        for param in self.classes_decoder.parameters():
            param.requires_grad = not classes_decoder
        return
    
        
    def nbParametres(self) :
        count = 0
        for p in self.parameters():
            if p.requires_grad == True :
                count += p.data.nelement()
        return count
    
    
    def flatten(self, description) :
        '''Baisse le nombre de niveaux de 1 dans la description'''
        flatten = []
        for line in description :
            flatten += line
        return flatten
        
        
    # ------------------------- Working modes --------------------------------
    def answerTrain(self, description, rand = 0):
        '''Applies text encoding and attention computation followed by a last linear layer 
           with output dimension the number of possible classes. Evaluation is directly 
           performed on this vector though the corresponding trainer module.
        '''
        if self.n_level == 1 :
            # 1) put description into lvl_1 format
            description = self.flatten(description)
            
            # 2) apply text encoding
            words_memory, sentences_memory = self.text_encoder(description, rand)
            
            # 3) perform attention when applicable
            if self.attention is not None :
                text_vector, attn1_weights = self.attention(words_memory)
            else :
                text_vector = sentences_memory
                attn1_weights = None
            attn2_weights = None
            
        elif self.n_level == 2 :
            # 1) apply text encoding
            words_memory, sentences_memory = self.text_encoder(description, rand)
            
            # 2) perform attention when applicable
            text_vector, attn1_weights, attn2_weights = self.attention(words_memory)
            
        classes_vector = self.classes_decoder(text_vector, train_mode = True)
        return classes_vector, attn1_weights, attn2_weights 

        
    def forward(self, description):
        '''Applies text encoding and attention computation followed by a linear layer
           and a softmax transformation. When a class decoder is loaded, returns the
           index of most probable class. When a multilabel class decoder is loaded, 
           returns the list of most probable 0 - 1 labels.
        '''
        if self.n_level == 1 :
            # 1) put description into lvl_1 format
            description = self.flatten(description)
            
            # 2) apply text encoding
            words_memory, sentences_memory = self.text_encoder(description, rand = 0)
            
            # 3) perform attention when applicable
            if self.attention is not None :
                text_vector, attn1_weights = self.attention(words_memory)
            else :
                text_vector = sentences_memory
                attn1_weights = None
            attn2_weights = None
            
        elif self.n_level == 2 :
            # 1) apply text encoding
            words_memory, sentences_memory = self.text_encoder(description, rand = 0)
            
            # 2) perform attention when applicable
            text_vector, attn1_weights, attn2_weights = self.attention(words_memory)
            
        classe, probas = self.classes_decoder(text_vector)
        return classe, probas, attn1_weights, attn2_weights 
    
    
    
    # ------------------------ Visualisation methods ---------------------------------
    def flattenWeights(self, weights) :
        '''Baisse le nombre de niveaux de 1 dans les poids d'attention'''
        flatten = []
        for weight_layer in weights :
            flatten.append(torch.cat(tuple(weight_layer.values()), dim = 2))
        return flatten
    
    
    def formatWeights(self, description, attn1_weights, attn2_weights) :
        if self.n_level == 2 :
            attn1_weights = self.flattenWeights(attn1_weights)
        hops = self.attention.hops
        l, L = len(description), max([len(line) for line in description])
        Table = np.zeros((l, 1, L))
        Liste = np.zeros((l, 1)) if attn2_weights is not None else None
        count = 0
        count_line = 0
        for i, line in enumerate(description) :
            present = False
            for j, word in enumerate(line) :
                if word in self.text_encoder.lang.word2index.keys():
                    present = True
                    Table[i, 0, j] = sum([attn1_weights[k][0, 0, count].data for k in range(hops)])
                    count += 1
            if present and Liste is not None :
                Liste[i] = sum([attn2_weights[k][count_line].data for k in range(hops)])
                count_line += 1
        return Table, Liste
    
    
    def showAttention(self, description, target, liste, fig_size = 'auto', maxi = None):
        classe, probas, attn1_weights, attn2_weights = self.forward(description)
        if target is not None :
            print('target : ', liste[int(target[0])])
        print('predic : ', liste[int(classe)])
        table, liste = self.formatWeights(description, attn1_weights, attn2_weights)
        l = table.shape[0] if fig_size == 'auto' else fig_size[0]
        L = table.shape[2] if fig_size == 'auto' else fig_size[1]
        fig = plt.figure(figsize = (l, L))
        for i, line in enumerate(description):
            ax = fig.add_subplot(l, 1, i+1)
            vals = table[i]
            text = [' '] + line + [' ' for k in range(L-len(line))] if L>len(line) else [' '] + line
            if liste is not None :
                vals = np.concatenate((np.zeros((1, 1)) , vals), axis = 1)  
                vals = np.concatenate((np.reshape(liste[i], (1, 1)) , vals), axis = 1)
                text = [' '] + [' '] + text
                
            cax = ax.matshow(vals, vmin=0, vmax=maxi, cmap='YlOrBr')
            ax.set_xticklabels(text, ha='left')
            ax.set_yticklabels(' ')
            ax.tick_params(axis=u'both', which=u'both',length=0, labelrotation = 30, labelright  = True)
            ax.grid(b = False, which="minor", color="w", linestyle='-', linewidth=1)
            ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
            plt.subplots_adjust(wspace = 0.5, top = 1.2, bottom = 0)
        plt.show()
        return
    

    

  

    
def TextClassifierCreator(lang,                     ###
                          embedding_dim,              #
                          hidden_dim,                 # --- Encoder options
                          n_layers,                 ###

                          sentence_hidden_dim,      ###
                          hops,                       #
                          share,                      # --- Hierarchical encoder options
                          transf,                     #
                          dropout,                  ###

                          n_labels,                 ### --- Decoder options
                          decoder_version,            #
                          
                          device                      #
                         ):
    '''Create an agent with specified dimensions and specificities
    '''
    contextualization = True if hidden_dim > 0 else False
    hierarchical = True if sentence_hidden_dim > 0 else False

    # 1) ----- encoding -----
    embedding = nn.Embedding(lang.n_words, embedding_dim)
    encoder = RecurrentWordsEncoder(device, 
                                    embedding, 
                                    hidden_dim, 
                                    n_layers, 
                                    dropout) # embedding, hidden_dim, n_layers = 1, dropout = 0
    text_encoder = HierarchicalTextEncoder(device, lang, encoder) if hierarchical else TextEncoder(device, lang, encoder)
    word_hidden_dim = encoder.output_dim

    
    # 2) ----- attention -----
    if decoder_version == 'class' :
        n_heads = 1
    elif decoder_version == 'multilabel_1' :
        n_heads = 1
    elif decoder_version == 'multilabel_2' :
        n_heads = n_labels
    if hierarchical :
        query_dim = word_hidden_dim if hops > 1 else 0
        attention = RecurrentHierarchicalAttention(device,
                                                   word_hidden_dim,
                                                   sentence_hidden_dim, 
                                                   query_dim = query_dim,
                                                   n_heads = n_heads,
                                                   n_layers = n_layers,
                                                   hops = hops,
                                                   share = share,
                                                   transf = transf,
                                                   dropout = dropout)
    else :
        attention = MultiHopedAttention(targets_dim = word_hidden_dim,
                                        base_query_dim = 0,
                                        hops = hops,
                                        share = share,
                                        transf = transf,
                                        dropout = dropout)
        
    # 3) ----- decoding -----
    text_dim = attention.output_dim
    if decoder_version == 'class' :
        if type(n_labels) == int :
            classes_decoder = ClassDecoder(text_dim, n_labels)
        elif type(n_labels) == list :
            classes_decoder = MultiTaskClassDecoder(text_dim, n_labels, weight_list)
    elif decoder_version == 'multilabel_1' :
        classes_decoder = MultilabelDecoder(text_dim, n_labels)
    elif decoder_version == 'multilabel_2' :
        classes_decoder = MultilabelDecoderV2(text_dim, n_labels)
    
    # 4) ----- model -----
    text_cl = TextClassifier(device, text_encoder, attention, classes_decoder)
    text_cl = text_cl.to(device)
    
    return text_cl




class TextClassifierTrainer(object):
    def __init__(self, 
                 criterion = nn.NLLLoss(), #nn.BCEWithLogitsLoss(), #nn.BCELoss(), 
                 optimizer = optim.SGD,
                 print_every=100):
        
        # relevant quantities
        self.criterion = criterion
        self.optimizer = optimizer
        self.print_every = print_every
        
        
    def asMinutes(self, s):
        m = math.floor(s / 60)
        s -= m * 60
        return '%dm %ds' % (m, s)


    def timeSince(self, since, percent):
        now = time.time()
        s = now - since
        es = s / (percent)
        rs = es - s
        return '%s (- %s)' % (self.asMinutes(s), self.asMinutes(rs))        
        
        
    def distance(self, agent, agent_output, target) :
        """ Compute cumulated error between predicted output and ground answer."""
        tar = Variable(torch.LongTensor(target.reshape(-1))).to(agent.device)
        out = F.log_softmax(agent_output).view(1, -1) if agent.classes_decoder.version == 'class' else agent_output
        loss = self.criterion(out, tar)
        if agent.classes_decoder.version == 'class' :
            classes = F.softmax(agent_output) 
            topv, topi = classes.data.topk(1)
            ni = topi[0][0]
            loss_diff = 1 if ni != target.item() else 0
        else :
            classes = F.sigmoid(agent_output)
            loss_diff = 0
            for i in range(len(target)):
                loss_diff += 1 if abs(classes[i]-target[i]) >= 0.5 else 0
        return loss, loss_diff
    
    
    def distanceTot(self, agent, agent_output, target) :
        """ Compute cumulated error between predicted output and ground answer."""
        if agent.classes_decoder.version == 'class' and type(agent_output) == list :
            loss = 0
            for i in range(len(target)) :
                lossbis, loss_diffbis = self.distance(agent, agent_output[i], target[i])
                loss = loss + agent.classes_decoder.weight_list[i]*lossbis
                if i == 0:
                    loss_diff = loss_diffbis
            return loss, loss_diff
            
        else :
            return self.distance(agent, agent_output, target)
        
        
    def trainLoop(self, agent, description_batch, target_batch, optimizer, learning_rate, rand):
        """Performs a training loop, with forward pass and backward pass for gradient optimisation."""
        optimizer.zero_grad()
        target_length = len(target_batch[0])
        loss = 0
        loss_diff = 0
        for description, target in zip(description_batch, target_batch) :
            agent_output, attn1_weights_list, attn2_weights_list = agent.answerTrain(description, rand)
            loss2, loss_diff2 = self.distanceTot(agent, agent_output, target)
            loss = loss + loss2
            loss_diff += loss_diff2
        loss.backward()
        optimizer.step()
        return loss.data[0], loss_diff
        
        
    def train(self, agent, 
              descriptions, 
              targets, 
              n_iters = 100, 
              n_epochs = None,
              mini_batch_size = 1,
              learning_rate=0.01,
              dic = None,
              rand = 0,
              random_state = 42
             ):
        """Performs training over a given dataset and along a specified amount of loops."""
        np.random.seed(random_state)
        start = time.time()
        optimizer = self.optimizer([param for param in agent.parameters() if param.requires_grad == True], lr=learning_rate)
        
        language = set(agent.text_encoder.lang.word2index.keys())
        print_loss_total = 0  
        print_loss_diff_mots_total = 0
        L = len(descriptions)
        if n_epochs is None :
            for iter in range(1, n_iters + 1):
                description_batch = []
                target_batch = []
                if dic is not None :
                    while len(description_batch) < mini_batch_size :
                        j = np.random.choice(range(len(dic)))
                        if dic[j] != [] :
                            i = np.random.choice(dic[j])
                            description = descriptions[i]
                            #if self.test(description, language) :
                            description_batch.append(description)
                            target_batch.append(targets[i]) 
                else :
                    while len(description_batch) < mini_batch_size:
                        i = np.random.choice(range(L))
                        description = descriptions[i]
                        #if self.test(description, language) :
                        description_batch.append(description)
                        target_batch.append(targets[i])

                loss, loss_diff_mots = self.trainLoop(agent, description_batch, target_batch, optimizer, learning_rate, rand)
                # quantité d'erreurs sur la réponse i
                print_loss_total += loss
                print_loss_diff_mots_total += loss_diff_mots       
                if iter % (self.print_every / mini_batch_size) == 0:
                    print_loss_avg = print_loss_total * mini_batch_size / self.print_every
                    print_loss_diff_mots_avg = print_loss_diff_mots_total / self.print_every
                    print_loss_total = 0
                    print_loss_diff_mots_total = 0
                    print('%s (%d %d%%) %.4f %.2f' % (self.timeSince(start, iter / n_iters),
                                                 iter * mini_batch_size, iter / n_iters * 100, 
                                                      print_loss_avg, print_loss_diff_mots_avg))
        
        else :
            Liste = [k for k in range(L)]
            for epoch in range(1, n_epochs + 1):
                print('epoch ' + str(epoch))
                np.random.shuffle(Liste)
                for i in range(int(L/mini_batch_size)) :
                    description_batch = []
                    target_batch = []
                    for j in Liste[i*mini_batch_size: (i+1)*mini_batch_size]:
                        description = descriptions[j]
                        #if self.test(description, language) :
                        description_batch.append(description)
                        target_batch.append(targets[j])
                        
                    loss, loss_diff_mots = self.trainLoop(agent, description_batch, target_batch, optimizer, learning_rate, rand)
                    # quantité d'erreurs sur la réponse i
                    print_loss_total += loss
                    print_loss_diff_mots_total += loss_diff_mots 
                    if i> 0 and i % (self.print_every / mini_batch_size) == 0:
                        print_loss_avg = print_loss_total * mini_batch_size / self.print_every
                        print_loss_diff_mots_avg = print_loss_diff_mots_total / self.print_every
                        print_loss_total = 0
                        print_loss_diff_mots_total = 0
                        print('%s (%d %d%%) %.4f %.2f' % (self.timeSince(start, i / int(L/mini_batch_size)),
                                                     i * mini_batch_size, i / int(L/mini_batch_size) * 100, 
                                                          print_loss_avg, print_loss_diff_mots_avg))   

Overwriting models/Text_Classifier.py


<a id="Convertisseurs"></a>

## 1.2 Convertisseurs de texte en distribution

[Retour à la table des matières](#plan)

In [28]:
%%writefile models/Root_Converter.py

# Pacific

import math
import time
import random

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker #, FuncFormatter
#%matplotlib inline
from termcolor import colored

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable


class RootConverter(nn.Module):
    '''Transforms a sentence into a distribution over words of a given document'''
    def __init__(self, lang, embedding_dim, pre_entrainement = None, freeze = False, softmax = False) :
        super(RootConverter, self).__init__() 
        
        self.lang = lang
        self.lang_size = lang.n_words          # size of word vocabulary
        self.embedding_dim = embedding_dim     # dimension of embedded words
        self.softmax = F.softmax if softmax else None
        self.embedding = nn.Embedding(self.lang_size, embedding_dim)
        if pre_entrainement is not None :
            self.embedding.load_state_dict({'weight': torch.Tensor(pre_entrainement)})
        if freeze :
            self.embedding.weight.requires_grad = False

            
    def indexesFromSentence(self, sentence, max_length = None, rand = 0):
        '''Turn a given sentence into a list of indices according to a given language'''
        indexes=[]
        unknowns = 0
        for word in sentence:
            p = random.random()
            if word not in self.lang.word2index.keys() :
                pass
                #indexes.append(UNK_token) #pass
            elif p >= rand :
                indexes.append(self.lang.word2index[word])
            elif p < rand :
                e = random.choice([1, 2, 3])
                if e == 1 :  # doesn't put any word
                    pass
                elif e == 2 :# hide word with UNK_Token
                    indexes.append(UNK_token)
                else :       # put word followed with randomly selected other word
                    indexes.append(self.lang.word2index[word])
                    indexes.append(random.choice(list(range(self.lang.n_words))))

        # remove exceeding words, exept first word and the two last words or symbols
        if max_length is not None :
            print(max_length)
            while len(indexes) > max_length:
                indexes.pop(random.randint(1,len(indexes)-2))
        return indexes


    def variableFromSentence(self, sentence, max_length = None, rand = 0):
        '''Turn a sentence into a torch variable, containing a list of indices according
           to a given language.
        '''
        indexes = self.indexesFromSentence(sentence, max_length, rand)                                 
        result = Variable(torch.LongTensor(indexes).view(-1, 1)) if len(indexes) != 0 else None
        return result


    def variableFromDescription(self, description, max_length = None, rand = 0):
        '''Turn a whole dialogue into a list of torch tensors, according to an input
           and an output languages'''
        sortie = []
        for line in description :
            variable = self.variableFromSentence(line, max_length, rand)
            if variable is not None :
                sortie.append(variable)
        return sortie
    
    
    def flatten(self, description) :
        for i, line in enumerate(description) :
            if i == 0:
                flatten = line
            else :
                flatten = torch.cat((flatten, line), dim = 0)
        return flatten


    def lendes(self, description) :
        length = 0
        for line in description :
            length += len(line)
        return length
    
    
    def complete(self, description, weights) :
        final_weights = []
        count = 0
        for i in range(len(description)) :
            line = description[i]
            w = torch.zeros(len(line))
            for j, word in enumerate(line) :
                if word in self.lang.word2index.keys():
                    w[j] = weights[0, count]
                    count += 1
            final_weights.append(w)
        return final_weights
                     
            
        
    def convert(self, description, sentence):
        
        # acquisition
        var = self.variableFromDescription(description)
        var = self.flatten(var)
        var = self.embedding(var)                                      #dim = (description_length, 1, embedding_dim)
        
        # calcul de distribution par mot du résumé
        if self.softmax is None :
            var = F.normalize(var, p = 2, dim = 2)                     # <---- normalize on last dimension
        var = torch.transpose(var, 0, 1)                               #dim = (1, description_length, embedding_dim)
        var = torch.transpose(var, 1, 2)                               #dim = (1, embedding_dim, description_length)
        
        query = self.embedding(self.variableFromSentence(sentence))    #dim = (sentence_length, 1, embedding_dim)
        if self.softmax is None :
            query = F.normalize(query, p = 2, dim = 2)                 # <---- normalize on last dimension
        query = torch.transpose(query, 0, 1)                           #dim = (1, sentence_length, embedding_dim)
        
        weights = torch.bmm(query, var)                                # size (1, sentence_length, description_length)
        if self.softmax is not None :
            weights = self.softmax(weights, dim = 2)
            
        # calcul de distribution par phrase du résumé
        weights, _ = torch.max(weights, dim = 1)                       # size (1, description_length)
        #weights = torch.sqrt(weights)                                 # size (1, description_length)
        # ou torch.sum(weights, dim = 1)
        weights = weights/sum(weights.data[0]) # <--- probability distribution over words of the description

        final_weights = self.complete(description, weights)
        return final_weights
    
    
    def computeSentenceAttention(self, description, sentence):
        attn_weights = self.convert(description, sentence)
        l = len(description)
        sentence_attn = np.zeros((l))
        for i in range(l) :
            sentence_attn[i] = torch.sum(attn_weights[i]).data[0] #torch.mean
        return attn_weights, sentence_attn
            
    
    
    def showConvertion(self, description, sentence, maxi = None):
        attn_weights, sentence_attn = self.computeSentenceAttention(description, sentence)
        if maxi is None :
            maxi = max([torch.max(attn_weights[i]).data[0] for i in range(len(attn_weights))])
        count = 0
        l = len(description)
        L = max([len(line) for line in description])
        fig = plt.figure(figsize = [L+1, l]) #
        for i, line in enumerate(description):
            vals = np.zeros((1, L))
            for j in range(len(line)) :
                vals[0, j] = attn_weights[i][j].data[0]
            #vals = attn_weights[0, count : count + b].unsqueeze(0).numpy()
            ax = fig.add_subplot(l, 1, i+1)
            cax = ax.matshow(vals, vmin=0, vmax=maxi, cmap='YlOrBr') #, extent = (0, l, 0, 0.3)) # 'bone_r'
            #fig.colorbar(cax)
            text = [' '] + line + [' ' for k in range(L-len(line))] if L>len(line) else [' '] + line
            ax.set_xticklabels(text, ha='left')
            sc = str(sentence_attn[i]*100)[:5]
            ax.set_yticklabels(' ' + sc)
            ax.tick_params(axis = 'x', which = u'both',length = 0, labelrotation = 30, labelright = True)
            ax.tick_params(axis = 'y', which = u'both',length = 0, labelrotation = 0)
            ax.grid(b = False, which="minor", color="w", linestyle='-', linewidth=1)
            ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
            plt.subplots_adjust(wspace = 0.5, top = 1.2, bottom = 0)
        plt.show()
        return
    
    
    def select(self, sentence_attn, liste_nwords, ratio = 3/4, percentage = None):
        if percentage is not None :
            listo = list(sentence_attn)
            lister = list(sentence_attn)
            retenus = []
            count = 0
            while count <= percentage and listo != [] :
                maxi = max(listo)
                i = lister.index(maxi)
                if count + liste_nwords[i] <= percentage :
                    retenus.append(i)
                listo.remove(maxi)
                count += liste_nwords[i]
            return retenus
        else :
            listo = list(sentence_attn)
            lister = list(sentence_attn)
            #print(tot)
            retenus = []
            count = 0
            while count < ratio and listo != [] :
                maxi = max(listo)
                i = lister.index(maxi)
                #if count + maxi <= 3*tot/4 :
                retenus.append(i)
                listo.remove(maxi)
                count += maxi
                #print(count)
            return retenus
    
    
    def showSelection(self, description, sentence, ratio = 3/4, percentage = None):
        attn_weights, sentence_attn = self.computeSentenceAttention(description, sentence)
        nwords = [len(line) for line in description]
        nwords = [el*100/sum(nwords) for el in nwords]
        retenus = self.select(sentence_attn, nwords, ratio, percentage)
        for i, line in enumerate(description) :
            if i in retenus :
                print(' '.join(line))
            else :
                print(colored(' '.join(line), 'blue'))
        return

Overwriting models/Root_Converter.py


<a id="modules"></a>



# 2 Modules

[Retour à la table des matières](#plan)

In [29]:
%mkdir modules

Un sous-r‚pertoire ou un fichier modules existe d‚j….


In [30]:
%%writefile modules/__init__.py

from .Encoder_Words_Recurrent import RecurrentWordsEncoder
from .Encoder_Text import TextEncoder, HierarchicalTextEncoder

from .Attention_Additive import AdditiveAttention
from .Attention_MultiHead import MultiHeadAttention
from .Attention_MultiHoped import MultiHopedAttention
from .Attention_Hierarchical_Recurrent import RecurrentHierarchicalAttention

from .Decoder_Multilabel import MultilabelDecoder, MultilabelDecoderV2 
from .Decoder_Class import ClassDecoder, MultiTaskClassDecoder
from .Decoder_Words import WordsDecoder


__all__ = [
    'RecurrentWordsEncoder',
    'TextEncoder',
    'HierarchicalTextEncoder',
    
    'AdditiveAttention',
    'MultiHeadAttention',
    'MultiHopedAttention',
    'RecurrentHierarchicalAttention',
    
    'MultilabelDecoder',
    'MultilabelDecoderV2',
    'ClassDecoder',
    'MultiTaskClassDecoder',
    'WordsDecoder']

Overwriting modules/__init__.py


<a id="encodeursDeTexte"></a>




## 2.1 Encodeurs de texte


[Retour à la table des matières](#plan)

<a id="encodeursDeMots"></a>

### 2.1.1 Encodeur de mots

[Retour à la table des matières](#plan)

Le module **RecurrentWordsEncoder** encode une séquence de mots $w_1, ..., w_T$ en une séquence de vecteurs $h_1, ..., h_T$ en appliquant un plongement suivi d'une couche GRU bi-directionnelle. On peut représenter son fonctionnement par la figure suivante :


![WordEncoder](figs/WordEncoder.png)

In [31]:
%%writefile modules/Encoder_Words_Recurrent.py

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable


class RecurrentWordsEncoder(nn.Module):
    def __init__(self, device, embedding, hidden_dim, n_layers = 1, dropout = 0): 
        super(RecurrentWordsEncoder, self).__init__()
        # relevant quantities
        self.device = device
        self.hidden_dim = hidden_dim           # dimension of hidden state of GRUs 
        self.dropout_p = dropout
        self.n_layers = n_layers               # number of stacked GRU layers
        self.output_dim = hidden_dim * 2       # dimension of outputed rep. of words and utterance
        # parameters
        self.embedding = embedding
        for p in embedding.parameters() :
            embedding_dim = p.data.size(1)
        self.dropout = nn.Dropout(p = dropout)
        self.bigru = nn.GRU(embedding_dim, 
                            hidden_dim, 
                            n_layers,
                            dropout=(0 if n_layers == 1 else dropout), 
                            bidirectional=True)

        
    def initHidden(self): 
        return Variable(torch.zeros(2 * self.n_layers, 1, self.hidden_dim)).to(self.device)

    def forward(self, utterance, hidden = None):
        embeddings = self.embedding(utterance)                          # dim = (input_length, 1, embedding_dim)
        embeddings = self.dropout(embeddings)                           # dim = (input_length, 1, embedding_dim)
        outputs, hidden = self.bigru(embeddings, hidden)
        outputs = self.dropout(outputs)
        hidden = self.dropout(hidden)
        return outputs, hidden                                          # dim = (input_length, 1, hidden_dim * 2)

Overwriting modules/Encoder_Words_Recurrent.py


<a id="encodeurDeTexte"></a>

### 2.1.2 Encodeur de texte

[Retour à la table des matières](#plan)

Le module **TextEncoder** applique un module WordsEncoder pour l'encodage de séquences sur chaque phrase d'un document, en passant l'état caché du GRU bi-directionnel en fin d'encodage d'une phrase pour l'initiation de l'encodage de la phrase suivante. Ceci peut être représenté par la figure :


![TextEncoder](figs/TextEncoder.png)

In [32]:
%%writefile modules/Encoder_Text.py

# Modins

import random

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable



class TextEncoder(nn.Module):
    
    def __init__(self,device, lang, word_encoder):
        super(TextEncoder, self).__init__()
    
        self.device = device
        self.words_memory_dim = word_encoder.output_dim
        self.lang = lang      
        self.word_encoder = word_encoder
        
        
    def indexesFromSentence(self, sentence, max_length = None, rand = 0):
        '''Turn a given sentence into a list of indices according to a given language'''
        indexes=[]
        unknowns = 0
        for word in sentence:
            p = random.random()
            if word not in self.lang.word2index.keys() :
                if 'UNK' in self.lang.word2index.keys() :
                    indexes.append(self.lang.word2index['UNK'])
                else :
                    pass
            elif p >= rand :
                indexes.append(self.lang.word2index[word])
            elif p < rand :
                e = random.choice([1, 2, 3])
                if e == 1 :  # doesn't put any word
                    pass
                elif e == 2 :# hide word with UNK_Token
                    indexes.append(self.lang.word2index['UNK'])
                else :       # put word followed with randomly selected other word
                    indexes.append(self.lang.word2index[word])
                    indexes.append(random.choice(list(range(self.lang.n_words))))

        # remove exceeding words, exept first word and the two last words or symbols
        if max_length is not None :
            #print(max_length)
            while len(indexes) > max_length:
                indexes.pop(random.randint(1,len(indexes)-2))
        return indexes


    def variableFromSentence(self, sentence, max_length = None, rand = 0):
        '''Turn a sentence into a torch variable, containing a list of indices according
           to a given language.
        '''
        indexes = self.indexesFromSentence(sentence, max_length, rand)                                 
        result = Variable(torch.LongTensor(indexes).view(-1, 1)) if len(indexes) != 0 else None
        result = result.to(self.device)
        return result
        
        
    def forward(self, input, rand = 0):
        """Parameters are a single current query as string, and the model learns to generate the current answer. 
           Attention weights over words and past utterances can be provided with the 'provideAttention' option.
        """
        description = self.variableFromSentence(input, rand = rand)
        words_memory = {}
        sentences_memory = {}
        query_hidden = self.word_encoder.initHidden()
        
        words_memory, query_hidden = self.word_encoder(description, query_hidden)
        sentences_memory = query_hidden
            
        return words_memory.unsqueeze(1), sentences_memory 




class HierarchicalTextEncoder(nn.Module):
    
    def __init__(self, device, lang, word_encoder):
        super(HierarchicalTextEncoder, self).__init__()
    
        self.device = device
        self.words_memory_dim = word_encoder.output_dim
        self.lang = lang      
        self.word_encoder = word_encoder
        
        
    def indexesFromSentence(self, sentence, max_length = None, rand = 0):
        '''Turn a given sentence into a list of indices according to a given language'''
        indexes=[]
        unknowns = 0
        for word in sentence:
            p = random.random()
            if word not in self.lang.word2index.keys() :
                if 'UNK' in self.lang.word2index.keys() :
                    indexes.append(self.lang.word2index['UNK'])
                else :
                    pass
            elif p >= rand :
                indexes.append(self.lang.word2index[word])
            elif p < rand :
                e = random.choice([1, 2, 3])
                if e == 1 :  # doesn't put any word
                    pass
                elif e == 2 :# hide word with UNK_Token
                    indexes.append(self.lang.word2index['UNK'])
                else :       # put word followed with randomly selected other word
                    indexes.append(self.lang.word2index[word])
                    indexes.append(random.choice(list(range(self.lang.n_words))))

        # remove exceeding words, exept first word and the two last words or symbols
        if max_length is not None :
            print(max_length)
            while len(indexes) > max_length:
                indexes.pop(random.randint(1,len(indexes)-2))
        return indexes


    def variableFromSentence(self, sentence, max_length = None, rand = 0):
        '''Turn a sentence into a torch variable, containing a list of indices according
           to a given language.
        '''
        indexes = self.indexesFromSentence(sentence, max_length, rand)                                 
        result = Variable(torch.LongTensor(indexes).view(-1, 1)) if len(indexes) != 0 else None
        return result


    def variablesFromDescription(self, description, max_length = None, rand = 0):
        '''Turn a whole dialogue into a list of torch tensors, according to an input
           and an output languages'''
        sortie = []
        for line in description :
            variable = self.variableFromSentence(line, max_length, rand)
            if variable is not None :
                variable = variable.to(self.device)
                sortie.append(variable)
        return sortie
        
        
    def forward(self, input, rand = 0):
        """Parameters are a single current query as string, and the model learns to generate the current answer. 
           Attention weights over words and past utterances can be provided with the 'provideAttention' option.
        """
        description = self.variablesFromDescription(input, rand = rand)
        words_memory = {}
        sentences_memory = {}
        query_hidden = self.word_encoder.initHidden()
        
        for i in range(len(description)) :
            utterance = description[i]
            words_hiddens, query_hidden = self.word_encoder(utterance, query_hidden)
            words_memory[i] = words_hiddens
            sentences_memory[i] = query_hidden
        
            
        return words_memory, sentences_memory  

Overwriting modules/Encoder_Text.py


<a id="modulesAttention"></a>


## 2.2 Modules d'attention simple

[Retour à la table des matières](#plan)


<a id="attentionSimple"></a>


### 2.2.1 Module d'attention additive

[Retour à la table des matières](#plan)

![AttentionAdditive](figs/Attention_Additive.png)

In [33]:
%%writefile modules/Attention_Additive.py

import torch
import torch.nn as nn
import torch.nn.functional as F


class AdditiveAttention(nn.Module):

    def __init__(self, query_dim, targets_dim, n_layers = 1): 
        super(AdditiveAttention, self).__init__()
        # relevant quantities
        self.n_level = 1
        self.query_dim = query_dim
        self.targets_dim = targets_dim
        self.output_dim = targets_dim
        self.n_layers = n_layers
        # parameters
        self.attn_layer = nn.Linear(query_dim + targets_dim, targets_dim) if n_layers >= 1 else None
        self.attn_layer2 = nn.Linear(targets_dim, targets_dim) if n_layers >= 2 else None
        self.attn_v = nn.Linear(targets_dim, 1, bias = False) if n_layers >= 1 else None
        self.act = F.softmax

        
    def forward(self, query = None, targets = None):
        '''takes as parameters : 
                a query tensor conditionning the attention,     size = (1, minibatch_size, query_dim)
                a tensor containing attention targets           size = (targets_length, minibatch_size, targets_dim)
           returns : 
                the resulting tensor of the attention process,  size = (1, minibatch_size, targets_dim)
                the attention weights,                          size = (1, targets_length)
        '''
        if targets is not None :
            # concat method 
            if self.n_layers >= 1 :
                poids = torch.cat((query.expand(targets.size(0), -1, -1), targets), 2) if query is not None else targets
                poids = self.attn_layer(poids).tanh()                 # size (targets_length, minibatch_size, targets_dim)
                if self.n_layers >= 2 :
                    poids = self.attn_layer2(poids).tanh()            # size (targets_length, minibatch_size, targets_dim)
                attn_weights = self.attn_v(poids)                     # size (targets_length, minibatch_size, 1)
                attn_weights = torch.transpose(attn_weights, 0,1)     # size (minibatch_size, targets_length, 1)
                targets = torch.transpose(targets, 0,1)               # size (minibatch_size, targets_length, targets_dim)
            # dot method
            else :
                targets = torch.transpose(targets, 0,1)               # size (minibatch_size, targets_length, targets_dim)
                query = torch.transpose(query, 0, 1)                  # size (minibatch_size, 1, query_dim)
                query = torch.transpose(query, 1, 2)                  # size (minibatch_size, query_dim, 1)
                attn_weights = torch.bmm(targets, query)              # size (minibatch_size, targets_length, 1)
                
            attn_weights = self.act(attn_weights, dim = 1)        # size (minibatch_size, targets_length, 1)
            attn_weights = torch.transpose(attn_weights, 1,2)     # size (minibatch_size, 1, targets_length)
            attn_applied = torch.bmm(attn_weights, targets)       # size (minibatch_size, 1, targets_dim)
            attn_applied = torch.transpose(attn_applied, 0,1)     # size (1, minibatch_size, targets_dim)

        else :
            attn_applied = query
            attn_weights = None
        return attn_applied, attn_weights

Overwriting modules/Attention_Additive.py



<a id="attentionAdditiveMultitete"></a>


### 2.2.2 Module d'attention additive multi-tête

[Retour à la table des matières](#plan)

In [34]:
%%writefile modules/Attention_MultiHead.py

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from . import AdditiveAttention


class MultiHeadAttention(nn.Module):
    '''Module performing additive attention over a sequence of vectors stored in
       a memory block, conditionned by some vector. At instanciation it takes as imput :
       
                - query_dim : the dimension of the conditionning vector
                - targets_dim : the dimension of vectors stored in memory
                
      Other ideas on Multi head attention on 
      https://github.com/jadore801120/attention-is-all-you-need-pytorch/blob/master/transformer/SubLayers.py
      https://github.com/tlatkowski/multihead-siamese-nets/blob/master/layers/attention.py
    '''
    def __init__(self, device, n_heads, query_dim, targets_dim, n_layers = 2): 
        super(MultiHeadAttention, self).__init__()
        # relevant quantities
        self.device = device
        self.n_level = 1
        self.n_heads = n_heads
        self.n_layers = n_layers
        # parameters
        self.attn_modules_list = nn.ModuleList([AdditiveAttention(query_dim, targets_dim, n_layers) for i in range(n_heads)])

        
    def forward(self, query = None, targets = None):
        '''takes as parameters : 
                a query tensor conditionning the attention,     size = (1, n_heads, query_dim)
                a tensor containing attention targets           size = (targets_length, n_heads, targets_dim)
           returns : 
                the resulting tensor of the attention process,  size = (1, n_heads, targets_dim)
                the attention weights,                          size = (n_heads, 1, targets_length)
        '''
        print("multihead attention")
        targets_length = targets.size(0)
        targets_dim    = targets.size(2)
        attn_applied   = Variable(torch.zeros(1, self.n_heads, targets_dim)).to(self.device)
        attn_weights   = torch.zeros(self.n_heads, 1, targets_length).to(self.device)
        for i, attn in enumerate(self.attn_modules_list) :
            que = query[:, i, :] if query is not None else None
            print(que.size())
            tar = targets[:, i, :].unsqueeze(1)
            print(tar.size())
            attn_appl, attn_wghts = attn(que, tar)
            print(attn_appl.size())
            print(attn_wghts.size())
            attn_applied[:, i, :] = attn_appl.squeeze(1)
            attn_weights[i, :, :] = attn_wghts.squeeze(0)
        return attn_applied, attn_weights

Overwriting modules/Attention_MultiHead.py


<a id="attentionAdditiveMultihoped"></a>


### 2.2.3 Module d'attention additive multi-hopée

[Retour à la table des matières](#plan)

In [35]:
%%writefile modules/Attention_MultiHoped.py

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from . import AdditiveAttention


class MultiHopedAttention(nn.Module):
    '''Module performing additive attention over a sequence of vectors stored in
       a memory block, conditionned by some vector. At instanciation it takes as imput :
       
                - query_dim : the dimension of the conditionning vector
                - targets_dim : the dimension of vectors stored in memory
    '''
    def __init__(self, 
                 device,
                 targets_dim,
                 base_query_dim = 0,
                 hops = 1,
                 share = True,
                 transf = False,
                 dropout = 0
                ):
        super(MultiHopedAttention, self).__init__()
        
        # dimensions
        self.targets_dim = targets_dim
        self.output_dim = targets_dim
        self.hops_query_dim = self.output_dim if hops > 1 else 0
        self.query_dim = base_query_dim + self.hops_query_dim
        
        # structural coefficients
        self.device = device
        self.n_level = 1
        self.hops = hops
        self.share = share
        self.transf = transf
        self.dropout_p = dropout
        if dropout > 0 :
            self.dropout = nn.Dropout(p = dropout)
        
        # parameters
        self.attn = AdditiveAttention(self.query_dim, self.targets_dim) 
        self.transf = nn.GRU(self.targets_dim, self.targets_dim) if transf else None
        
        
    def initQuery(self): 
        if self.hops_query_dim > 0 :
            return Variable(torch.zeros(1, 1, self.hops_query_dim)).to(self.device)
        return None
    
    
    def update(self, hops_query, decision_vector):
        if self.transf is not None :
            _ , update = self.transf(decision_vector, hops_query)
        else :
            update = hops_query + decision_vector
        return update
    
    
    def forward(self, words_memory, base_query = None):
        attn_weights_list = []
        if self.hops > 1 and self.share :
            hops_query = self.initQuery()
        else :
            hops_query = None
            
        for hop in range(self.hops) :
            if base_query is not None and hops_query is not None :
                query = torch.cat((base_query, hops_query), 2) # size (1, self.n_heads, self.query_dim)
            elif base_query is not None :
                query = base_query
            elif hops_query is not None :
                query = hops_query
            else :
                query = None
            
            decision_vector, attn_weights = self.attn(query, words_memory)
            attn_weights_list.append(attn_weights)
            if self.hops > 1 :
                hops_query = self.update(hops_query, decision_vector) if hops_query is not None else decision_vector                          # size (L, n_classes, output_dim)
            else :
                hops_query = decision_vector
  
        
        # output decision vector
        return hops_query, attn_weights_list

Overwriting modules/Attention_MultiHoped.py



<a id="attentionHierarchique"></a>


## 2.3 Attention hiérarchique

[Retour à la table des matières](#plan)

![HierarchicalAttention](figs/Hierarchical_Attention.png)

In [36]:
%%writefile modules/Attention_Hierarchical_Recurrent.py

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from . import AdditiveAttention, MultiHeadAttention


class RecurrentHierarchicalAttention(nn.Module):
    '''Ce module d'attention est :
    
    - hiérarchique avec bi-GRU entre chaque niveau d'attention
    - multi-tête sur chaque niveau d'attention
    - globalement multi-hopé, où il est possible d'effectuer plusieurs passes pour accumuler de l'information
    '''

    def __init__(self, 
                 device,
                 word_hidden_dim, 
                 sentence_hidden_dim,
                 query_dim = 0, 
                 n_heads = 1,
                 n_layers = 1,
                 hops = 1,
                 share = True,
                 transf = False,
                 dropout = 0
                ):
        super(RecurrentHierarchicalAttention, self).__init__()
        
        # dimensions
        self.query_dim = query_dim
        self.word_hidden_dim = word_hidden_dim
        self.sentence_input_dim = self.word_hidden_dim
        self.sentence_hidden_dim = sentence_hidden_dim
        self.context_vector_dim = sentence_hidden_dim * 2
        self.output_dim = sentence_hidden_dim * 2
        
        # structural coefficients
        self.device = device
        self.n_level = 2
        self.n_heads = n_heads
        self.n_layers = n_layers
        self.hops = hops
        self.share = share
        self.dropout_p = dropout
        self.dropout = nn.Dropout(p = dropout)
        
        # first attention module
        attn1_list = []
        if share :
            attn1 = MultiHeadAdditiveAttention(n_heads, self.query_dim, self.word_hidden_dim) if n_heads > 1 else \
                    AdditiveAttention(self.query_dim, self.word_hidden_dim) 
            for hop in range(hops):
                attn1_list.append(attn1)
            self.attn1 = nn.ModuleList(attn1_list)
        else :
            for hop in range(hops):
                attn1 = MultiHeadAdditiveAttention(n_heads, self.query_dim, self.word_hidden_dim) if n_heads > 1 else \
                        AdditiveAttention(self.query_dim, self.word_hidden_dim) 
                attn1_list.append(attn1)
            self.attn1 = nn.ModuleList(attn1_list)
        
        # intermediate encoder module
        self.bigru = nn.GRU(self.sentence_input_dim, 
                            self.sentence_hidden_dim, 
                            n_layers,
                            dropout=(0 if n_layers == 1 else dropout), 
                            bidirectional=True)
        
        # second attention module
        attn2_list = []
        if share :
            attn2 = MultiHeadAdditiveAttention(n_heads, self.query_dim, self.context_vector_dim) if n_heads > 1 else \
                    AdditiveAttention(self.query_dim, self.context_vector_dim) 
            for hop in range(hops):
                attn2_list.append(attn2)
            self.attn2 = nn.ModuleList(attn2_list)
        else :
            for hop in range(hops):
                attn2 = MultiHeadAdditiveAttention(n_heads, self.query_dim, self.context_vector_dim) if n_heads > 1 else \
                        AdditiveAttention(self.query_dim, self.context_vector_dim) 
                attn2_list.append(attn2)
            self.attn2 = nn.ModuleList(attn2_list)
        
        # accumulation step
        self.transf = nn.Linear(self.output_dim, self.output_dim, bias = False) if transf \
                      or self.hops > 1 else None


    def initQuery(self): 
        if self.query_dim > 0 :
            return Variable(torch.zeros(1, self.n_heads, self.query_dim)).to(self.device)
        return None
        
                
    def initHidden(self): 
        return Variable(torch.zeros(2 * self.n_layers, self.n_heads, self.sentence_hidden_dim)).to(self.device)
        
        
    def singlePass(self, words_memory, query, attn1, attn2): 
        L = len(words_memory)
        attn1_weights = {}
        bigru_inputs = Variable(torch.zeros(L, self.n_heads, self.sentence_input_dim)).to(self.device)
        # first attention layer
        for i in range(L) :
            targets = words_memory[i]                              # size (N_i, 1, 2*word_hidden_dim)
            targets = targets.repeat(1, self.n_heads, 1)           # size (N_i, n_heads, 2*word_hidden_dim)
            attn1_output, attn1_wghts = attn1(query, targets)
            attn1_output = self.dropout(attn1_output)
            attn1_weights[i] = attn1_wghts
            bigru_inputs[i] = attn1_output.squeeze(0)              # size (n_heads, 2*word_hidden_dim)
        # intermediate biGRU
        bigru_hidden = self.initHidden()
        attn2_inputs, bigru_hidden = self.bigru(bigru_inputs, bigru_hidden)  # size (L, n_heads, 2*word_hidden_dim)
        # second attention layer
        attn2_inputs = self.dropout(attn2_inputs)
        decision_vector, attn2_weights = attn2(query = query, targets = attn2_inputs)
        attn2_weights = attn2_weights.view(-1)
        decision_vector = self.dropout(decision_vector)
        # output decision vector
        return decision_vector, attn1_weights, attn2_weights
    
    
    
    def update(self, query, decision_vector):
        if self.transf is not None :
            update = query + self.transf(decision_vector) if query is not None else self.transf(decision_vector)
        else :
            update = query + decision_vector if query is not None else decision_vector
        return update
        
        
    def forward(self, words_memory, query = None):
        '''takes as parameters : 
                a tensor containing words_memory vectors        dim = (words_memory_length, word_hidden_dim)
                a tensor containing past queries                dim = (words_memory_length, query_dim)
           returns : 
                the resulting decision vector                   dim = (1, 1, query_dim)
                the weights of first attention layer (dict)     
                the weights of second attention layer (dict)
        '''
        attn1_weights_list = []
        attn2_weights_list = []
        if len(words_memory) > 0 :
            if query is not None :
                query = query.repeat(1, self.n_heads, 1)
            elif self.hops > 1 :
                query = self.initQuery()
            
            for hop in range(self.hops) :
                decision_vector, attn1_weights, attn2_weights = self.singlePass(words_memory, 
                                                                                query, 
                                                                                self.attn1[hop], 
                                                                                self.attn2[hop])
                attn1_weights_list.append(attn1_weights)
                attn2_weights_list.append(attn2_weights)
                
                query = self.update(query, decision_vector)  # size (L, self.n_heads, self.output_dim)
                query = self.dropout(query)

        # output decision vector
        return query, attn1_weights_list, attn2_weights_list

Overwriting modules/Attention_Hierarchical_Recurrent.py


<a id="decodeurs"></a>


# 2.4 Modules de décodage


[Retour à la table des matières](#plan)

<a id="decodeursSelectifs"></a>


### 2.4.1 Décodeurs sélectifs

[Retour à la table des matières](#plan)

In [37]:
%%writefile modules/Decoder_Multilabel.py

# Modins

import torch
import torch.nn as nn
import torch.nn.functional as F


class MultilabelDecoder(nn.Module):
    
    def __init__(self, text_dim, n_labels) :
        super(MultilabelDecoder, self).__init__() 
        self.version = 'multilabel_1'
        self.n_labels = n_labels
        self.classes_decoder = nn.Linear(text_dim, n_labels)

    def forward(self, text_vector, train_mode = False):
        classes_vector = self.classes_decoder(text_vector).view(-1)
        if train_mode :
            return classes_vector
        else :
            classes = F.sigmoid(classes_vector)
            result = []
            for el in classes.data :
                a = 1 if el > 0.5 else 0
                result.append(a)
            return result  
        
        

class MultilabelDecoderV2(nn.Module):
    
    def __init__(self, text_dim, n_labels) :
        super(MultilabelDecoderV2, self).__init__() 
        self.version = 'multilabel_2'
        self.n_labels = n_labels
        self.classes_decoder = nn.ModuleList([nn.Linear(text_dim, 1) for i in range(n_labels)]) 

    def forward(self, text_vector, train_mode = False):
        classes_vector = Variable(torch.zeros(self.n_labels))
        for i, dec in enumerate(self.classes_decoder) :
            classes_vector[i] = dec(text_vector[:, i, :].unsqueeze(1)).view(-1)
        if train_mode :
            return classes_vector
        else :
            classes = F.sigmoid(classes_vector)
            result = []
            for el in classes.data :
                a = 1 if el > 0.5 else 0
                result.append(a)
            return result 

Overwriting modules/Decoder_Multilabel.py


In [38]:
%%writefile modules/Decoder_Class.py

# Modins

import torch
import torch.nn as nn
import torch.nn.functional as F


class ClassDecoder(nn.Module):
    
    def __init__(self, text_dim, n_classes) :
        super(ClassDecoder, self).__init__() 
        self.version = 'class'
        self.n_classes = n_classes
        self.classes_decoder = nn.Linear(text_dim, n_classes)

    def forward(self, text_vector, train_mode = False):
        classes_vector = self.classes_decoder(text_vector).view(-1)
        if train_mode :
            return classes_vector
        else :
            probas = F.softmax(classes_vector) 
            topv, topi = probas.data.topk(1)
            result = topi[0][0].cpu().numpy()
            return result, probas
        
        
        
class MultiTaskClassDecoder(nn.Module):
    
    def __init__(self, text_dim, n_class_list, weight_list = None) :
        super(MultiTaskClassDecoder, self).__init__() 
        self.version = 'class'
        self.decoder_list = nn.ModuleList([ClassDecoder(text_dim, N) for N in n_class_list])
        self.weight_list = weight_list if weight_list is not None else [1 for i in range(len(n_class_list))]

    def forward(self, text_vector, train_mode = False):
        classes_vector_list = [mod(text_vector, train_mode) for mod in self.decoder_list]
        return classes_vector_list 

Overwriting modules/Decoder_Class.py


<a id="decodeursGeneratifs"></a>


### 2.4.2 Décodeurs génératifs

[Retour à la table des matières](#plan)

In [39]:
%%writefile modules/Decoder_Words.py

# Modins

import random

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable


class WordsDecoder(nn.Module):
    '''Transforms a vector into a sequence of words'''
    def __init__(self, text_dim, 
                 output_dim, 
                 lang, 
                 pre_entrainement = None, 
                 freeze = False,
                 teacher_forcing_ratio = 0.5,
                 dropout = 0.1):
        
        super(WordsDecoder, self).__init__()
        
        # relevant quantities
        self.lang = lang
        self.text_dim = text_dim
        self.teacher_forcing_ratio = teacher_forcing_ratio
        self.n_layers = 1
        
        # modules
        self.embedding = nn.Embedding(lang.n_words, output_dim)
        if pre_entrainement is not None : 
            self.embedding.load_state_dict({'weight': torch.Tensor(pre_entrainement)})
        if freeze :
            self.embedding.weight.requires_grad = False
        self.gru = nn.GRU(output_dim + text_dim, text_dim)
        self.out = nn.Linear(text_dim, lang.n_words)
        self.dropout = nn.Dropout(dropout)
        
        
    def indexesFromSentence(self, sentence, max_length = None, rand = 0):
        '''Turn a given sentence into a list of indices according to a given language'''
        indexes=[]
        unknowns = 0
        for word in sentence:
            p = random.random()
            if word not in self.lang.word2index.keys() :
                pass
                #indexes.append(UNK_token) #pass
            elif p >= rand :
                indexes.append(self.lang.word2index[word])
            elif p < rand :
                e = random.choice([1, 2, 3])
                if e == 1 :  # doesn't put any word
                    pass
                elif e == 2 :# hide word with UNK_Token
                    indexes.append(UNK_token)
                else :       # put word followed with randomly selected other word
                    indexes.append(self.lang.word2index[word])
                    indexes.append(random.choice(list(range(self.lang.n_words))))
        indexes.append(self.lang.word2index['EOS'])

        # remove exceeding words, exept first word and the two last words or symbols
        if max_length is not None :
            print(max_length)
            while len(indexes) > max_length:
                indexes.pop(random.randint(1,len(indexes)-2))
        return indexes


    def variableFromSentence(self, sentence, max_length = None, rand = 0):
        '''Turn a sentence into a torch variable, containing a list of indices according
           to a given language.
        '''
        indexes = self.indexesFromSentence(sentence, max_length, rand)                                 
        result = Variable(torch.LongTensor(indexes).view(-1, 1)) if len(indexes) != 0 else None
        return result
        
        
    def generateWord(self, text_vector, current_word_index, decoder_hidden_vector):
        '''takes parameters : 
        
                the index of last decoded word as input                  
                the query vector                                dim = (1, 1, decision_dim)
                the last hidden decoder state                   dim = (1, 1, decision_dim)
        
           returns : 
        
                the index of next decoded word
                the updated hidden decoder state                 dim = (1, 1, decision_dim)
        '''
        current_word = self.embedding(current_word_index)
        embedded = torch.cat((current_word, text_vector), dim = 2)
        #embedded = self.dropout(embedded)
        for i in range(self.n_layers):
            output, decoder_hidden_vector = self.gru(embedded, decoder_hidden_vector)
        output = F.log_softmax(self.out(output[0]))
        return output, decoder_hidden_vector
    
    
    def forward(self, text_vector, target_answer = None) :
        """Génère une réponse à partir d'un état caché initialisant le décodeur,
        en utilisant une réponse cible pour un mode 'teacher forcing-like' si celle-ci est fournie """
        bound = 50
        decoder_outputs = []
        answer = []
        di = 0
        SOS_token = self.lang.word2index['SOS']
        EOS_token = self.lang.word2index['EOS']
        if target_answer is not None :
            target_answer = target_answer[0]
            target_answer = self.variableFromSentence(target_answer)
        target_answer = target_answer if random.random() < self.teacher_forcing_ratio else None
        current_word_index = Variable(torch.LongTensor([[SOS_token]]))
        decoder_hidden_vector = text_vector
        decoder_hidden_vector = self.dropout(decoder_hidden_vector)
        for di in range(bound) :
            output, decoder_hidden_vector = self.generateWord(text_vector, 
                                                              current_word_index, 
                                                              decoder_hidden_vector)
            topv, topi = output.data.topk(1)
            decoder_outputs.append(output)
            ni = topi[0][0] # index of current generated word
            answer.append(ni)
            if ni == EOS_token :
                break
            elif target_answer is not None : # Teacher forcing
                if di < target_answer.size()[0] :
                    current_word_index = target_answer[di].view(1,-1)  
                else :
                    break
            else :
                current_word_index = Variable(torch.LongTensor([[ni]]))
                
        # transform 'answer' into a sentence with self.lang
 
        return answer, decoder_outputs


Overwriting modules/Decoder_Words.py


Retour dans le répertoire courant du tableau de bord :

In [40]:
%cd ..

C:\Users\Jb\Desktop\Scripts\notebooks


[Retour à la table des matières](#plan)