# CNN

Les couches convolutives utilisent des filtres qui balaient une image et produisent une version traitée de l'image. Cette version traitée de l'image peut être introduite dans une autre couche convolutionnelle ou une couche linéaire. Chaque filtre a une forme, par ex. un filtre 3x3 couvre une zone de 3 pixels de large et 3 pixels de haut de l'image, et chaque élément du filtre a un poids qui lui est associé, le filtre 3x3 aurait 9 poids. Dans le traitement d'image traditionnel, ces poids ont été spécifiés à la main par des ingénieurs, mais le principal avantage des couches convolutives dans les réseaux de neurones est que ces poids sont appris par rétropropagation.

L'idée intuitive derrière l'apprentissage des poids est que les couches convolutives agissent comme des extracteurs de features, extrayant les parties de l'image les plus importantes pour l'objectif du CNN. Par exemple, si vous utilisez un CNN pour détecter des visages dans une image, le CNN peut rechercher des caractéristiques telles que l'existence d'un nez, d'une bouche ou d'une paire d'yeux dans l'image.

De la même manière qu'un filtre 3x3 peut regarder sur une image, un filtre 1x2 peut regarder sur 2 mots séquentiels dans un morceau de texte, c'est-à-dire un bi-gramme. Dans ce modèle CNN, nous utiliserons plusieurs filtres de tailles différentes qui examineront les bi-grammes (un 1x2 filter), tri-grammes (un filtre 1x3) et / ou n-grammes (un 1x $ n $ filtre) dans le texte.

L'intuition ici est que l'apparition de certains bi-grammes, tri-grammes et n-grammes dans la revue sera une bonne indication du sentiment final.

## Préparer les données

Comme les couches convolutionnelles s'attendent à ce que la dimension du batch soit la première, nous pouvons dire à TorchText de renvoyer les données déjà permutées en utilisant l'argument batch_first = True dans la méthode `Field`.


In [1]:
import pandas as pd
import torch
from torchtext import data

SEED = 1234
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

TEXT = data.Field(batch_first = True)
LABEL = data.LabelField(dtype = torch.float)

train_data, valid_data, test_data = data.TabularDataset.splits(
        path='./data/', train='train.csv',
        validation='valid.csv', test='test.csv', format='csv', skip_header=True,
        fields=[('text', TEXT), ('label', LABEL)])

MAX_VOCAB_SIZE = 25_000

TEXT.build_vocab(train_data, 
                 max_size = MAX_VOCAB_SIZE, 
                 vectors = "glove.6B.100d", 
                 unk_init = torch.Tensor.normal_)

LABEL.build_vocab(train_data)

## Itérateurs

In [2]:
BATCH_SIZE = 64

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE, 
    device = device, sort = False)

## Construire le modèle

Le texte est en 1 dimension. Cependant, nous savons que la première étape de presque tous nos notebooks précédents consiste à convertir les mots en embedding. C'est ainsi que nous pouvons visualiser nos mots en 2 dimensions, chaque mot le long d'un axe et les éléments de vecteurs sur l'autre dimension.
On peut alors utiliser un filtre qui est **[n x emb_dim]**. Cela couvrira entièrement les mots séquentiels $ n $, car leur largeur sera de dimension emb_dim.

La prochaine étape de notre modèle consiste à utiliser le pooling (en particulier le pooling max) sur la sortie des couches convolutives afin de prendre la valeur maximale sur une dimension.

 - Nous implémentons les couches convolutives avec nn.Conv2d. L'argument in_channels est le nombre de "channels" nous n'avons qu'un seul "channel", le texte lui-même. Le out_channels est le nombre de filtres et le kernel_size est la taille des filtres. Chacun de nos kernel_sizes va être [n x emb_dim] où $ n $ est la taille des n-grammes.
 - En PyTorch, les RNN veulent l'entrée avec la dimension batch en second, tandis que les CNN veulent d'abord la dimension batch - nous n'avons pas besoin de permuter les données ici car nous avons déjà défini batch_first = True dans notre champ TEXT.
 - Nous passons ensuite les tenseurs à travers les couches convolutives et de pooling, en utilisant la fonction d'activation ReLU après les couches convolutives. La taille de la sortie de la couche convolutive dépend de la taille de l'entrée, et différents lots contiennent des phrases de différentes longueurs.
 - Enfin, nous effectuons des dropouts sur les sorties de filtre concaténées, puis nous les faisons passer à travers une couche linéaire pour faire nos prédictions.

In [3]:
import torch.nn as nn
import torch.nn.functional as F

class CNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, 
                 dropout, pad_idx):
        
        super().__init__()
                
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)  
        self.convs = nn.ModuleList([
                                    nn.Conv2d(in_channels = 1, 
                                              out_channels = n_filters, 
                                              kernel_size = (fs, embedding_dim)) 
                                    for fs in filter_sizes
                                    ])
        
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)  
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):           
        
        embedded = self.embedding(text)  
        embedded = embedded.unsqueeze(1) 
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]        
        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved] 
        cat = self.dropout(torch.cat(pooled, dim = 1))
            
        return self.fc(cat)
    
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
N_FILTERS = 100
FILTER_SIZES = [3,4,5]
OUTPUT_DIM = 1
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]

model = CNN(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'Le modèle a {count_parameters(model):,} paramètres à entraîner')

Le modèle a 2,620,801 paramètres à entraîner


In [4]:
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)

In [5]:
import torch.optim as optim

optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()

model = model.to(device)
criterion = criterion.to(device)

def binary_accuracy(preds, y):
    """
    Retourne l'accuracy
    """

    #round predictions to the closest integer
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float() #convert into float for division 
    acc = correct.sum() / len(correct)
    return acc


def recall(preds, y):
    '''
    Retourne le recall
    '''
    y_pred = torch.round(torch.sigmoid(preds))
    y_true = (y_pred == y).float()       
    
    tp = (y_true * y_pred).sum().float()
    tn = ((1 - y_true) * (1 - y_pred)).sum().float()
    fp = ((1 - y_true) * y_pred).sum().float()
    fn = (y_true * (1 - y_pred)).sum().float()
    
    if (tp + fn) == 0:
        recall = torch.zeros(1)
        
    recall = tp / (tp + fn)
    return recall



def f1_loss(preds, y):
    '''
    Retourne le score F1
    '''  
    y_pred = torch.round(torch.sigmoid(preds))
    y_true = (y_pred == y).float() 
            
    tp = (y_true * y_pred).sum().float()
    tn = ((1 - y_true) * (1 - y_pred)).sum().float()
    fp = ((1 - y_true) * y_pred).sum().float()
    fn = (y_true * (1 - y_pred)).sum().float()
    
    recall = tp / (tp + fn)
    precision = tp / (tp + fp)
    
    if (tp + fn) == 0 or (tp + fp) == 0 or (recall + precision == 0):
        f1 = torch.zeros(1)
    else:
        f1 = 2* (precision*recall) / (precision + recall)
    
    return f1

In [6]:
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    epoch_rec = 0
    epoch_f1 = 0
    
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()  
        predictions = model(batch.text).squeeze(1)  
        loss = criterion(predictions, batch.label)
        
        acc = binary_accuracy(predictions, batch.label)
        rec = recall(predictions, batch.label)
        f1 = f1_loss(predictions, batch.label)
        
        loss.backward()    
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        epoch_rec += rec.item()
        epoch_f1 += f1.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator), epoch_rec / len(iterator), epoch_f1 / len(iterator)

def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    epoch_rec = 0
    epoch_f1 = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            predictions = model(batch.text).squeeze(1)
            loss = criterion(predictions, batch.label)
            
            acc = binary_accuracy(predictions, batch.label)
            rec = recall(predictions, batch.label)
            f1 = f1_loss(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
            epoch_rec += rec.item()
            epoch_f1 += f1.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator), epoch_rec / len(iterator), epoch_f1 / len(iterator)


import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

N_EPOCHS = 10

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train_loss, train_acc, train_rec, train_f1 = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc, valid_rec, valid_f1 = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut4-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%| Train Recall: {train_rec*100:.2f}%| Train F1: {train_f1*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%| Val. Recall: {valid_rec*100:.2f}%| Val. F1: {valid_f1*100:.2f}%')

Epoch: 01 | Epoch Time: 0m 24s
	Train Loss: 0.666 | Train Acc: 59.94%| Train Recall: 35.68%| Train F1: 42.66%
	 Val. Loss: 0.583 |  Val. Acc: 71.54%| Val. Recall: 33.39%| Val. F1: 46.06%
Epoch: 02 | Epoch Time: 0m 23s
	Train Loss: 0.489 | Train Acc: 76.36%| Train Recall: 41.62%| Train F1: 53.28%
	 Val. Loss: 0.429 |  Val. Acc: 80.94%| Val. Recall: 48.35%| Val. F1: 58.55%
Epoch: 03 | Epoch Time: 0m 23s
	Train Loss: 0.353 | Train Acc: 84.67%| Train Recall: 43.80%| Train F1: 57.23%
	 Val. Loss: 0.374 |  Val. Acc: 84.61%| Val. Recall: 42.00%| Val. F1: 56.18%
Epoch: 04 | Epoch Time: 0m 23s
	Train Loss: 0.262 | Train Acc: 89.27%| Train Recall: 44.48%| Train F1: 58.95%
	 Val. Loss: 0.372 |  Val. Acc: 85.08%| Val. Recall: 44.52%| Val. F1: 57.72%
Epoch: 05 | Epoch Time: 0m 22s
	Train Loss: 0.187 | Train Acc: 92.85%| Train Recall: 44.78%| Train F1: 60.05%
	 Val. Loss: 0.389 |  Val. Acc: 84.48%| Val. Recall: 42.25%| Val. F1: 56.28%
Epoch: 06 | Epoch Time: 0m 23s
	Train Loss: 0.131 | Train Acc: 95

In [9]:
model.load_state_dict(torch.load('tut4-model.pt'))

test_loss, test_acc, test_rec, test_f1 = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%| Test Recall: {test_rec*100:.2f}%| Test F1: {test_f1*100:.2f}%')

Test Loss: 0.352 | Test Acc: 84.97%| Test Recall: 46.92%| Test F1: 60.15%


In [10]:
import spacy
nlp = spacy.load('en')

def predict_sentiment(model, sentence, min_len = 5):
    model.eval()
    tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
    if len(tokenized) < min_len:
        tokenized += ['<pad>'] * (min_len - len(tokenized))
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]
    tensor = torch.LongTensor(indexed).to(device)
    tensor = tensor.unsqueeze(0)
    prediction = torch.sigmoid(model(tensor))
    
    return prediction.item()

In [11]:
predict_sentiment(model, "This film is terrible")

0.9583509564399719

In [12]:
predict_sentiment(model, "This film is great")

0.07189281284809113

## Références :

 - https://github.com/bentrevett/pytorch-sentiment-analysis/blob/master/4%20-%20Convolutional%20Sentiment%20Analysis.ipynb
 - https://arxiv.org/pdf/1408.5882.pdf