***Install Transformers in order to get the bert model with its tokenizer***

In [None]:
!pip install transformers

***Import the python modules***

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torchtext.legacy import data
from torchtext.legacy import datasets

from transformers import BertTokenizer, BertModel
from transformers import FlaubertModel, FlaubertTokenizer

import numpy as np

import time
import random
import functools

***Set the seed value to ensure the reproducibility***

In [None]:
SEED = 1234

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

***Import the Bert tokenizer to represent the input and privde the same vocabulary that match the pretrained model vocab.***

In [None]:
# Choose among ['flaubert/flaubert_small_cased', 'flaubert/flaubert_base_uncased', 
#               'flaubert/flaubert_base_cased', 'flaubert/flaubert_large_cased']

In [None]:
from transformers import FlaubertTokenizer, FlaubertModel
modelname = 'flaubert/flaubert_base_uncased' 
tokenizer = FlaubertTokenizer.from_pretrained(modelname)

***match the specific token such as [CLS] [PAD] [UNK] to those used in the pretrained model***

In [None]:
init_token = tokenizer.cls_token
pad_token = tokenizer.pad_token
unk_token = tokenizer.unk_token

print(init_token, pad_token, unk_token)

***integer representation of the specific tokens***

In [None]:
init_token_idx = tokenizer.convert_tokens_to_ids(init_token)
pad_token_idx = tokenizer.convert_tokens_to_ids(pad_token)
unk_token_idx = tokenizer.convert_tokens_to_ids(unk_token)

print(init_token_idx, pad_token_idx, unk_token_idx)

***Get the maximum length that the pretrained model was trained on***

In [None]:
max_input_length = tokenizer.max_model_input_sizes['flaubert/flaubert_base_uncased']
print(max_input_length)

***we'll define two helper functions that make use of our vocabulary.***

In [None]:
def cut_and_convert_to_id(tokens, tokenizer, max_input_length):
    tokens = tokens[:max_input_length-1] # cut the sequence of tokens to the desired maximum length (TEXT)
    tokens = tokenizer.convert_tokens_to_ids(tokens) # convert the tokens into indexes
    return tokens
def cut_to_max_length(tokens, max_input_length):
    tokens = tokens[:max_input_length-1] # cuts the sequence to the maximum length.(TAGS)
    return tokens

***Create abstraction functions by the help of TorchText***

In [None]:
text_preprocessor = functools.partial(cut_and_convert_to_id,
                                      tokenizer = tokenizer,
                                      max_input_length = max_input_length)

tag_preprocessor = functools.partial(cut_to_max_length,
                                     max_input_length = max_input_length)

***We define the FILEDS***

In [None]:
TEXT = data.Field(use_vocab = False,
                  lower = True,
                  preprocessing = text_preprocessor,
                  init_token = init_token_idx,
                  pad_token = pad_token_idx,
                  unk_token = unk_token_idx)

UD_TAGS = data.Field(unk_token = None,
                     init_token = '<pad>',
                     preprocessing = tag_preprocessor)

***Match the fields to our data***

In [None]:
fields = (("text", TEXT), ("udtags", UD_TAGS))

***Load the data***

In [None]:
class UDPOSFR(datasets.SequenceTaggingDataset):
    # Universal Dependencies French Web Treebank.
    # Download original at http://universaldependencies.org/
    # License: http://creativecommons.org/licenses/by-sa/4.0/
    urls = ['https://github.com/Dahouabdelhalim/udpos2/raw/main/data/fr-gsd-ud-15032020.zip'] # change to the dataset of your choice
    dirname = 'fr-gsd-ud'  # don't forget to change me too !
    name = 'udpos'         # not obligatory to change here

    @classmethod
    def splits(cls, fields, root=".data", 
               train="fr_gsd-ud-train.txt",
               validation="fr_gsd-ud-dev.txt",
               test="fr_gsd-ud-test.txt", **kwargs):
        """Downloads and loads the Universal Dependencies Version 2 POS Tagged
        data.
        """

        return super(UDPOSFR, cls).splits(
            fields=fields, root=root, train=train, validation=validation,
            test=test, **kwargs)

train_data, valid_data, test_data = UDPOSFR.splits(fields)

***Example from the data***

In [None]:
print(vars(train_data.examples[0]))

***Build tags Vocab with the help of filed funciton***

In [None]:
UD_TAGS.build_vocab(train_data)

print(UD_TAGS.vocab.stoi)
len(UD_TAGS.vocab.stoi)

***Define our iterators (batch sizes and Device)***

In [None]:
BATCH_SIZE = 8

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    device = device)

build the Model : The model is relatively simple, with all of the complicated parts contained inside the BERT module which we do not have to worry about. We can think of the BERT as an embedding layer and all we do is add a linear layer on top of these embeddings to predict the tag for each token in the input sequence.
ARCHITECUTRE.png

In [None]:
class BERTPoSTagger(nn.Module):
    def __init__(self,
                 bert,
                 output_dim, 
                 dropout):
        
        super().__init__()
        
        self.bert = bert
        
        embedding_dim = bert.config.to_dict()['emb_dim']
        
        self.fc = nn.Linear(embedding_dim, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):
  
        #text = [sent len, batch size]
    
        text = text.permute(1, 0) # Because bert get the batch size first
        
        #text = [batch size, sent len]
        
        embedded = self.dropout(self.bert(text)[0])
        
        #embedded = [batch size, seq len, emb dim]
                
        embedded = embedded.permute(1, 0, 2)
                    
        #embedded = [sent len, batch size, emb dim]
        
        predictions = self.fc(self.dropout(embedded))
        
        #predictions = [sent len, batch size, output dim]
        
        return predictions

***Load the Pretrained BERT model***

In [None]:
flaubert, log = FlaubertModel.from_pretrained(modelname, output_loading_info=True ,output_hidden_states=True)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(flaubert):,} trainable parameters')

***Freeze Model Parameters***

***Instantiate the Hyperparameters and the Model***

In [None]:
OUTPUT_DIM = len(UD_TAGS.vocab) # the dim of the output is the number of tags
DROPOUT = 0.25
LEARNING_RATE = 0.005 # lower learning rate to not fall on the catastrophic forgeting issue
model = BERTPoSTagger(flaubert,OUTPUT_DIM, DROPOUT)

optimizer = optim.Adam(model.parameters(), lr = LEARNING_RATE) # specify the optimizer
TAG_PAD_IDX = UD_TAGS.vocab.stoi[UD_TAGS.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index = TAG_PAD_IDX) # ignore the firt token on the tag representation 

model = model.to(device) # place the model on the Gpu if we have it 
criterion = criterion.to(device)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

***Function that calculate the accuracy per batch***

In [None]:
def categorical_accuracy(preds, y, tag_pad_idx):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    non_pad_elements = (y != tag_pad_idx).nonzero()
    correct = max_preds[non_pad_elements].squeeze(1).eq(y[non_pad_elements])
    return correct.sum() / torch.FloatTensor([y[non_pad_elements].shape[0]]).to(device)

***We then define our train and evaluate functions to train and test our model.***

In [None]:
def train(model, iterator, optimizer, criterion, tag_pad_idx):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        text = batch.text
        tags = batch.udtags
                
        optimizer.zero_grad()
        
        #text = [sent len, batch size]
        
        predictions = model(text)
        
        #predictions = [sent len, batch size, output dim]
        #tags = [sent len, batch size]
        
        predictions = predictions.view(-1, predictions.shape[-1])
        tags = tags.view(-1)
        
        #predictions = [sent len * batch size, output dim]
        #tags = [sent len * batch size]
        
        loss = criterion(predictions, tags)
                
        acc = categorical_accuracy(predictions, tags, tag_pad_idx)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
def evaluate(model, iterator, criterion, tag_pad_idx):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            text = batch.text
            tags = batch.udtags
            
            predictions = model(text)
            
            predictions = predictions.view(-1, predictions.shape[-1])
            tags = tags.view(-1)
            
            loss = criterion(predictions, tags)
            
            acc = categorical_accuracy(predictions, tags, tag_pad_idx)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

***Helper functions to see how much each epoch took time***

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

***Start the training and validation loop***

In [None]:
N_EPOCHS = 10

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion, TAG_PAD_IDX)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion, TAG_PAD_IDX)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut2-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

***Loading the trained model and test it on test data***

In [None]:
model.load_state_dict(torch.load('tut2-model.pt'))

test_loss, test_acc = evaluate(model, test_iterator, criterion, TAG_PAD_IDX)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

In [None]:
for name, param in flaubert.named_parameters():
	if 'classifier' not in name: # classifier layer
		param.requires_grad = False

***Inference Function***

In [None]:
def tag_sentence(model, device, sentence, tokenizer, text_field, tag_field):
    
    model.eval()
    
    if isinstance(sentence, str):
        tokens = tokenizer.tokenize(sentence)
    else:
        tokens = sentence
    
    numericalized_tokens = tokenizer.convert_tokens_to_ids(tokens)
    numericalized_tokens = [text_field.init_token] + numericalized_tokens
        
    unk_idx = text_field.unk_token
    
    unks = [t for t, n in zip(tokens, numericalized_tokens) if n == unk_idx]
    
    token_tensor = torch.LongTensor(numericalized_tokens)
    
    token_tensor = token_tensor.unsqueeze(-1).to(device)
         
    predictions = model(token_tensor)
    
    top_predictions = predictions.argmax(-1)
    
    predicted_tags = [tag_field.vocab.itos[t.item()] for t in top_predictions]
    
    predicted_tags = predicted_tags[1:]
        
    assert len(tokens) == len(predicted_tags)
    
    return tokens, predicted_tags, unks



***Run an example on a sentence***

In [None]:
sentence = 'tu mange.'
tokens, tags, unks = tag_sentence(model, 
                                  device, 
                                  sentence,
                                  tokenizer,
                                  TEXT, 
                                  UD_TAGS)

In [None]:
print("Pred. Tag\tToken\n")

for token, tag in zip(tokens, tags):
    print(f"{tag}\t\t{token}")

Pred. Tag	Token

PRON		tu</w>
PRON		mange</w>
PRON		.</w>


***In the event that we want freeze the parameters, the following loop is used.***

In [None]:
for name, param in flaubert.named_parameters():
	if 'classifier' not in name: # classifier layer
		param.requires_grad = False