# Reconnaissance d'entités nommées avec un Bi-LSTM (pytorch)

Avant toute chose, n'oubliez pas de choisir un environnement GPU dans Colab (`Exécution` $\rightarrow$ `Modifier le type d'exécution`)

Xavier Tannier

In [1]:
# Colab & Drive libraries 
from google.colab import files
from googleapiclient.http import MediaIoBaseDownload
from google.colab import drive
# Mount Google drive. This will prompt for authorization.
drive.mount('/content/drive', force_remount=True)

ModuleNotFoundError: No module named 'google'

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs

In [None]:
!pip install pytorch-lightning --quiet
!pip install torchmetrics --quiet

In [None]:
from os.path import isfile, isdir, join

import torch
from torch import nn
from torch import autograd
from torch import optim
from torch.utils.data import Dataset
import torch.nn.functional as F

import torchmetrics

import pytorch_lightning as pl

from tqdm.auto import tqdm

from collections import Counter
import codecs 

# Manual seed to ensure reproducibility
torch.manual_seed(1)

In [None]:
# Path to dataset
train_file = '/content/drive/My Drive/data/conll/eng/train.txt'
val_file = '/content/drive/My Drive/data/conll/eng/valid.txt'
test_file = '/content/drive/My Drive/data/conll/eng/test.txt'

# minimum frequency for a word to have its own embeddings
min_word_freq = 2
# Batch size
batch_size = 64

# how big is each word vector (if not preloaded)
embed_size = 50 

# how many times to iterate over all samples
n_epochs = 15 

# CPU workers
workers = 1

# sanity check
assert isfile(train_file)
assert isfile(val_file)
assert isfile(test_file)

In [None]:
def read_words_tags(file, tag_ind, caseless=False):
    """
    Reads raw data in the CoNLL 2003 format and returns word and tag sequences.
    :param file: file with raw data in the CoNLL 2003 format
    :param tag_ind: column index of tag
    :param caseless: lowercase words?
    :return: word, tag sequences
    """
    with codecs.open(file, 'r', 'utf-8') as f:
        lines = f.readlines()
    words = []
    tags = []
    temp_w = []
    temp_t = []
    for line in lines:
        if not (line.isspace() or (len(line) > 10 and line[0:10] == '-DOCSTART-')):
            feats = line.rstrip('\n').split()
            temp_w.append(feats[0].lower() if caseless else feats[0])
            temp_t.append(feats[tag_ind])
        elif len(temp_w) > 0:
            assert len(temp_w) == len(temp_t)
            words.append(temp_w)
            tags.append(temp_t)
            temp_w = []
            temp_t = []
    # last sentence
    if len(temp_w) > 0:
        assert len(temp_w) == len(temp_t)
        words.append(temp_w)
        tags.append(temp_t)

    # Sanity check
    assert len(words) == len(tags)

    return words, tags


In [None]:
class NERDataset(Dataset):
    """
    PyTorch Dataset 
    """

    def __init__(self, word_inputs, tag_inputs, sent_lengths, masks):
        """
        :param word_inputs: padded word sequences
        :param tag_inputs: padded tag sequences 
        :param sent_lengths: word sequence lengths
        :param masks: masks
        """
        self.word_inputs = word_inputs
        self.tag_inputs = tag_inputs
        self.sent_lengths = sent_lengths
        self.masks = masks

        self.data_size = len(self.word_inputs)

    def __getitem__(self, i):
        return self.word_inputs[i], self.tag_inputs[i], \
               self.sent_lengths[i], self.masks[i]

    def __len__(self):
        return self.data_size


In [None]:
def create_conll_dataloader(conll_file, train=False,
                            word_map=None, tag_map=None,
                            min_word_freq=1, debug=True):
    """
    Create PyTorch DataLoader
    :param conll_file
    :param train: True if the dataset is for training purpose
              dataset not for training purpose do not participate to vocabulary
              definition
    :param word_map: word-to-id mapping (not None for non-training dataset)
    :param tag_map: tag-to-id mapping (not None for non-training dataset)
    :min_word_freq
    :debug: True for a small debugging dataset
    """
    assert train or (word_map is not None and tag_map is not None)
    # Read dataset file
    tokens, tags = read_words_tags(conll_file, -1)
    if debug:
        tokens = tokens[:150]
        tags = tags[:150]
        min_word_freq = 0

    # Build vocabulary
    if train:
        word_freq = Counter()
        tag_map = set()

        for sentence_index in range(len(tokens)):
            sentence_tokens = tokens[sentence_index]
            sentence_tags = tags[sentence_index]
            word_freq.update(sentence_tokens)
            tag_map.update(sentence_tags)

        word_map = {k: v + 1 for v, k in enumerate([w for w in word_freq.keys() if word_freq[w] > min_word_freq])}
        tag_map = {k: v for v, k in enumerate(tag_map)}
        word_map['<pad>'] = 0
        #word_map['<end>'] = len(word_map)
        word_map['<unk>'] = len(word_map)
        #tag_map['<pad>'] = 0
        #tag_map['<start>'] = len(tag_map)
        #tag_map['<end>'] = len(tag_map)


    # Encode sentences into word maps with <end> at the end
    word_inputs = list(map(lambda s: torch.LongTensor(list(map(lambda w: word_map.get(w, word_map['<unk>']), s))), tokens))
    # Encode tags into tag maps with <end> at the end
    tag_inputs = list(map(lambda s: torch.LongTensor(list(map(lambda t: tag_map[t], s))), tags))

    # Sentence lengths & masks
    sentence_lengths = [len(seq) for seq in word_inputs]

    sentence_masks = [torch.BoolTensor([1] * length) for length in sentence_lengths]

    # Pad
    pad_word_inputs = torch.nn.utils.rnn.pad_sequence(word_inputs, batch_first=True, padding_value=word_map['<pad>'])
    pad_tag_inputs = torch.nn.utils.rnn.pad_sequence(tag_inputs, batch_first=True, padding_value=0)
    pad_masks = torch.nn.utils.rnn.pad_sequence(sentence_masks, batch_first=True, padding_value=0)

    assert len(pad_word_inputs) == len(pad_tag_inputs) == len(sentence_lengths) == len(pad_masks)

    # Create DataLoader
    data_loader = torch.utils.data.DataLoader(NERDataset(pad_word_inputs, pad_tag_inputs, sentence_lengths, pad_masks), 
                                            batch_size=batch_size, shuffle=train,
                                            num_workers=workers, pin_memory=False)
    return data_loader, word_map, tag_map

In [None]:
class BiLSTM(pl.LightningModule):
    """
    Sequence classification module
    """
    def __init__(self, vocab_size, embed_size, label_number,
                 batch_size,
                 hidden_size=100, dropout=0.5):
        super(BiLSTM, self).__init__()
        self.automatic_optimization = True
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, bidirectional=False, 
                            batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.classif = nn.Linear(hidden_size, label_number)

        self.loss_fn= nn.NLLLoss(reduction='mean')

        self.train_metrics = torchmetrics.MetricCollection({
            'precision/train': torchmetrics.Precision(num_classes=label_number, average='macro'),
            'recall/train': torchmetrics.Recall(num_classes=label_number, average='macro'),
            'F1/train': torchmetrics.F1(num_classes=label_number, average='macro'),
            'accuracy/train': torchmetrics.Accuracy()
        })
        self.val_metrics = torchmetrics.MetricCollection({
            'precision/val': torchmetrics.Precision(num_classes=label_number, average='macro'),
            'recall/val': torchmetrics.Recall(num_classes=label_number, average='macro'),
            'F1/val': torchmetrics.F1(num_classes=label_number, average='macro'),
            'accuracy/val': torchmetrics.Accuracy()
        })
        self.hidden_size = hidden_size
        self.batch_size = batch_size


    def forward(self, x, lengths):
        h_embedding = self.embedding(x)
        h_embedding = torch.nn.utils.rnn.pack_padded_sequence(h_embedding,
                                                                lengths.cpu().numpy(),
                                                                batch_first=True,
                                                               enforce_sorted=False)
        hidden = None
        h_lstm, hidden = self.lstm(h_embedding, hidden)
        output, input_sizes = torch.nn.utils.rnn.pad_packed_sequence(h_lstm, batch_first=True)  
        conc = output
        conc = self.dropout(conc)
        out = self.classif(conc)
        return out

    def training_step(self, batch, batch_idx):
        # training_step defined the train loop.
        # It is independent of forward
        x, y, lengths, masks = batch
        out = self(x, lengths)

        pack_masks = torch.nn.utils.rnn.pack_padded_sequence(masks,
                                                        lengths.cpu().numpy(),
                                                        batch_first=True,
                                                        enforce_sorted=False)
        masks, _ = torch.nn.utils.rnn.pad_packed_sequence(pack_masks, batch_first=True)  
        pack_y = torch.nn.utils.rnn.pack_padded_sequence(y,
                                                        lengths.cpu().numpy(),
                                                        batch_first=True,
                                                        enforce_sorted=False)
        y, _ = torch.nn.utils.rnn.pad_packed_sequence(pack_y, batch_first=True)  
        masked_y = torch.masked_select(y, masks)
        masked_out = out[masks] 
        score = F.log_softmax(masked_out, 1)
        loss = self.loss_fn(score, masked_y)
        _, preds  = torch.max(score, 1)


        self.train_metrics(preds, masked_y)
        return loss

    def training_epoch_end(self, outs):
        m = self.train_metrics.compute()
        self.log_dict(m, on_step=False, on_epoch=True, prog_bar=True)
        print('train', m)
        self.train_metrics.reset()

    def validation_step(self, batch, batch_idx):
        # training_step defined the train loop.
        # It is independent of forward
        x, y, lengths, masks = batch
        out = self(x, lengths)

        pack_masks = torch.nn.utils.rnn.pack_padded_sequence(masks,
                                                        lengths.cpu().numpy(),
                                                        batch_first=True,
                                                        enforce_sorted=False)
        masks, _ = torch.nn.utils.rnn.pad_packed_sequence(pack_masks, batch_first=True)  
        pack_y = torch.nn.utils.rnn.pack_padded_sequence(y,
                                                        lengths.cpu().numpy(),
                                                        batch_first=True,
                                                        enforce_sorted=False)
        y, _ = torch.nn.utils.rnn.pad_packed_sequence(pack_y, batch_first=True)  

        masked_y = torch.masked_select(y, masks)
        masked_out = out[masks] 
        score = F.log_softmax(masked_out, 1)
        loss = self.loss_fn(score, masked_y)
        _, preds  = torch.max(score, 1)

        self.val_metrics(preds, masked_y)
        return loss

    def validation_epoch_end(self, outs):
        # log epoch metric
        m = self.val_metrics.compute()
        self.log_dict(m, on_step=False, on_epoch=True, prog_bar=True)
        print('val', m)
        self.val_metrics.reset()

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.015) 
        return optimizer



In [None]:
debug = False

train_loader, word_map, tag_map = create_conll_dataloader(train_file,
                                                        min_word_freq=min_word_freq, 
                                                        train=True,
                                                        debug=debug)
val_loader, _, _ = create_conll_dataloader(val_file,
                                            word_map=word_map, 
                                            tag_map=tag_map,
                                            debug=debug)
#test_loader, _, _ = create_conll_dataloader(test_file,
#                                            min_word_freq=min_word_freq, 
#                                            debug=True)

In [None]:
tag_map

In [None]:
model = BiLSTM(len(word_map), embed_size, len(tag_map), batch_size)

trainer = pl.Trainer(gpus=1,
                     max_epochs=n_epochs, check_val_every_n_epoch=1)
trainer.fit(model, 
            train_loader, 
            val_loader
        )
