In [10]:
import Models
import numpy as np
from conllu import parse
from tensorflow.keras.preprocessing.sequence import pad_sequences
from transformers import AdamW
from transformers import get_linear_schedule_with_warmup
from seqeval.metrics import f1_score, accuracy_score
from tqdm import trange

import torch
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

In [11]:
pos_tag = False

In [12]:
class FormatData:

    def __init__(self, data, tag_col='upos'):
        self.formatted_data = [[(token['form'], str(token[tag_col]), token['misc']) for token in sentence] for sentence in data]

    def __getitem__(self, index):
        return self.formatted_data[index]

    def __len__(self):
        return len(self.formatted_data)



def get_data():
    with open("../Data/pos_tagging/no_bokmaal-ud-train.conllu.txt", 'r', encoding="utf-8") as file:
        nb_train = file.read()
    with open("../Data/pos_tagging/no_nynorsk-ud-train.conllu.txt", 'r', encoding="utf-8") as file:
        no_train = file.read()

    with open("../Data/pos_tagging/no_bokmaal-ud-test.conllu.txt", 'r', encoding="utf-8") as file:
        nb_test = file.read()
    with open("../Data/pos_tagging/no_nynorsk-ud-test.conllu.txt", 'r', encoding="utf-8") as file:
        no_test = file.read()


    sentences = parse(nb_test)
    test_len_nb = len(sentences)
    sentences_train = parse(nb_train)
    sentences.extend(sentences_train)

    sentences_no = parse(no_test)
    test_len_no = len(sentences_no)
    sentences_no_train = parse(no_train)
    sentences_no.extend(sentences_no_train)
    return sentences, test_len_nb, sentences_no, test_len_no

sentences, test_len, sentences_no, test_len_no = get_data()


In [13]:
sentences = FormatData(sentences, tag_col='upos')
sentences_no = FormatData(sentences_no)

In [14]:
def tokenize_and_preserve_labels(sentence, tokenizer, pos_tag=False):
    tokenized_sentence = []
    labels = []

    for word, tag, label in sentence:

        if pos_tag:
            label = tag
        else:
            label = label['name']

        # Tokenize the word and count # of subwords the word is broken into
        tokenized_word = tokenizer.tokenize(word)
        n_subwords = len(tokenized_word)

        # Add the tokenized word to the final tokenized word list
        tokenized_sentence.extend(tokenized_word)

        # Add the same label to the new list of labels `n_subwords` times
        labels.extend([label] * n_subwords)

    return tokenized_sentence, labels

def get_tag(sentences, pos_tag=False):
    tag_values = []
    for sentence in sentences:
        for s, t, l in sentence:
            if pos_tag:
                tag_values.append(t)
            else:
                tag_values.append(l['name'])

    tag_values = list(set(tag_values))
    tag_values.append("PAD")
    return tag_values

def get_tag2idx(set_of_t):
    return {t: i for i, t in enumerate(set_of_t)}, set_of_t

In [15]:
tag2idx, tag_values = get_tag2idx(get_tag(sentences, pos_tag=pos_tag))



In [16]:
def data_helper_tokenize_and_format(tokenizer, sentences, pos_tag=False):
    MAX_LEN = 75

    tokenized_bert_text_and_labels = [tokenize_and_preserve_labels(sentence, tokenizer, pos_tag=pos_tag)
                                      for sentence in sentences]

    bert_tokenized_text = [t for t, l in tokenized_bert_text_and_labels]
    bert_labels = [l for t, l in tokenized_bert_text_and_labels]

    input_ids = pad_sequences([tokenizer.convert_tokens_to_ids(txt) for txt in bert_tokenized_text],
                              maxlen=MAX_LEN, dtype='long', value=0.0, truncating='post',
                              padding='post')
    tags = pad_sequences([[tag2idx.get(l) for l in label] for label in bert_labels],
                         maxlen=MAX_LEN, value=tag2idx["PAD"], padding="post",
                         dtype="long", truncating="post")
    attention_masks = [[float(i != 0.0) for i in ii] for ii in input_ids]

    return input_ids, tags, attention_masks


def data_helper_torch_datasets(input_ids, tags, attention_masks):
    test_input, train_input = input_ids[:test_len], input_ids[test_len:]
    test_tags, train_tags = tags[:test_len], tags[test_len:]
    test_masks, train_masks = attention_masks[:test_len], attention_masks[test_len:]

    train_inputs = torch.tensor(train_input)
    test_inputs = torch.tensor(test_input)
    train_tags = torch.tensor(train_tags).long()
    test_tags = torch.tensor(test_tags).long()
    train_masks = torch.tensor(train_masks)
    test_masks = torch.tensor(test_masks)

    train_data = TensorDataset(train_inputs, train_masks, train_tags)
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=8)

    test_data = TensorDataset(test_inputs, test_masks, test_tags)
    test_sampler = SequentialSampler(test_data)
    test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=8)
    return train_dataloader, test_dataloader

In [17]:
def train_and_test_model_on_ner(pipeline, name, sentences, epochs=3, pos_tag=False):

    model = pipeline.model

    input_ids, tags, attention_masks = data_helper_tokenize_and_format(pipeline.tokenizer, sentences, pos_tag=pos_tag)
    train_dataloader, test_dataloader = data_helper_torch_datasets(input_ids, tags, attention_masks)



    FULL_FINETUNING = True
    if FULL_FINETUNING:
        param_optimizer = list(model.named_parameters())
        no_decay = ['bias', 'gamma', 'beta']
        optimizer_grouped_parameters = [
            {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
             'weight_decay_rate': 0.01},
            {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)],
             'weight_decay_rate': 0.0}
        ]
    else:
        param_optimizer = list(model.classifier.named_parameters())
        optimizer_grouped_parameters = [{"params": [p for n, p in param_optimizer]}]

    optimizer = AdamW(
        optimizer_grouped_parameters,
        lr=3e-5,
        eps=1e-8
    )


    max_grad_norm = 1.0

    # Total number of training steps is number of batches * number of epochs.
    total_steps = len(train_dataloader) * epochs

    # Create the learning rate scheduler.
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=0,
        num_training_steps=total_steps
    )

    ## Store the average loss after each epoch so we can plot them.
    loss_values, testing_loss_values = [], []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for _ in trange(epochs, desc="Epoch"):
        # ========================================
        #               Training
        # ========================================
        # Perform one full pass over the training set.

        # Put the model into training mode.
        model.train()
        # Reset the total loss for this epoch.
        total_loss = 0

        # Training loop
        for step, batch in enumerate(train_dataloader):
            # add batch to gpu
            batch = tuple(t.to(device) for t in batch)
            b_input_ids, b_input_mask, b_labels = batch
            # Always clear any previously calculated gradients before performing a backward pass.
            model.zero_grad()
            # forward pass
            # This will return the loss (rather than the model output)
            # because we have provided the `labels`.
            outputs = model(b_input_ids, token_type_ids=None,
                            attention_mask=b_input_mask, labels=b_labels)

            # get the loss
            loss = outputs[0]
            # Perform a backward pass to calculate the gradients.
            loss.backward()
            # track train loss
            total_loss += loss.item()
            # Clip the norm of the gradient
            # This is to help prevent the "exploding gradients" problem.
            torch.nn.utils.clip_grad_norm_(parameters=model.parameters(), max_norm=max_grad_norm)
            # update parameters
            optimizer.step()
            # Update the learning rate.
            scheduler.step()

        # Calculate the average loss over the training data.
        avg_train_loss = total_loss / len(train_dataloader)
        print("Average train loss: {}".format(avg_train_loss))

        # Store the loss value for plotting the learning curve.
        loss_values.append(avg_train_loss)


        # ========================================
        #               testing
        # ========================================
        # After the completion of each training epoch, measure our performance on
        # our testing set.

        # Put the model into evaluation mode
        model.eval()
        # Reset the testing loss for this epoch.
        eval_loss, eval_accuracy = 0, 0
        nb_eval_steps, nb_eval_examples = 0, 0
        predictions , true_labels = [], []
        for batch in test_dataloader:
            batch = tuple(t.to(device) for t in batch)
            b_input_ids, b_input_mask, b_labels = batch
            # Telling the model not to compute or store gradients,
            # saving memory and speeding up testing
            with torch.no_grad():
                # Forward pass, calculate logit predictions.
                # This will return the logits rather than the loss because we have not provided labels.
                outputs = model(b_input_ids, token_type_ids=None,
                                attention_mask=b_input_mask, labels=b_labels)
            # Move logits and labels to CPU
            logits = outputs[1].detach().cpu().numpy()
            label_ids = b_labels.to('cpu').numpy()

            # Calculate the accuracy for this batch of test sentences.
            eval_loss += outputs[0].mean().item()
            predictions.extend([list(p) for p in np.argmax(logits, axis=2)])
            true_labels.extend(label_ids)

        eval_loss = eval_loss / len(test_dataloader)
        testing_loss_values.append(eval_loss)
        print(f"Model: {name} scores")
        print("testing loss: {}".format(eval_loss))
        pred_tags = []
        test_tags = []
        for p, l in zip(predictions, true_labels):
            curr_p = []
            curr_l = []
            for p_i, l_i in zip(p, l):
                if tag_values[l_i] != "PAD":
                    curr_p.append(tag_values[p_i])
                    curr_l.append(tag_values[l_i])
            pred_tags.append(curr_p)
            test_tags.append(curr_l)

        print("testing Accuracy: {}".format(accuracy_score(pred_tags, test_tags)))
        print("testing F1-Score: {}".format(f1_score(pred_tags, test_tags)))


In [18]:
train_and_test_model_on_ner(Models.get_nor_bert(len(tag2idx), task='ner'), "NOR-BERT", sentences, pos_tag=pos_tag)
train_and_test_model_on_ner(Models.get_nb_bert(len(tag2idx), task='ner'), "NB-BERT", sentences, pos_tag=pos_tag)
train_and_test_model_on_ner(Models.get_mbert(len(tag2idx), task='ner'), "mBert", sentences, pos_tag=pos_tag)


KeyboardInterrupt: 