<a href="https://colab.research.google.com/github/andreunifi/Bert-POS-Tagging-Thesis/blob/main/Main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

[BERT](https://arxiv.org/abs/1810.04805) is known to be good at Sequence tagging tasks like Named Entity Recognition. Let's see if it's true for POS-tagging.

In [None]:
__author__ = "kyubyong"
__address__ = "https://github.com/kyubyong/nlp_made_easy"
__email__ = "kbpark.linguist@gmail.com"

In [None]:
import os
from tqdm import tqdm_notebook as tqdm
import numpy as np
import torch
import torch.nn as nn
from torch.utils import data
import torch.optim as optim
from transformers import BertTokenizer

In [None]:
torch.__version__

'2.1.0+cu121'

# Data preparation

Thanks to the great NLTK, we don't have to worry about datasets. Some of Penn Tree Banks are included in it. I believe they serves for the purpose.

In [None]:
import nltk
nltk.download('treebank')
tagged_sents = nltk.corpus.treebank.tagged_sents()
len(tagged_sents)

[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Package treebank is already up-to-date!


3914

In [None]:
tagged_sents[1]

In [None]:
tags = list(set(word_pos[1] for sent in tagged_sents for word_pos in sent))

In [None]:
",".join(tags)

"#,,,VB,FW,$,CC,-RRB-,.,WDT,``,NNPS,RB,LS,CD,VBP,POS,DT,PRP,VBG,'',WRB,WP$,NNS,VBN,NNP,:,JJR,EX,IN,TO,RP,SYM,-NONE-,MD,UH,RBS,-LRB-,NN,PDT,WP,VBZ,JJ,RBR,VBD,PRP$,JJS"

In [None]:
# By convention, the 0'th slot is reserved for padding.
tags = ["<pad>"] + tags

In [None]:
tag2idx = {tag:idx for idx, tag in enumerate(tags)}
idx2tag = {idx:tag for idx, tag in enumerate(tags)}

In [None]:
# Let's split the data into train and test (or eval)
from sklearn.model_selection import train_test_split
train_data, test_data = train_test_split(tagged_sents, test_size=.1)
len(train_data), len(test_data)

(3522, 392)

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

# Data loader


In [None]:
# Provare a cambiare i tokenizers e tracciare il comportamento come tempi ed accuratezza.
tokenizer = BertTokenizer.from_pretrained('bert-base-cased', do_lower_case=False)

# **PosDataset class extends Pythorch Dataset**

>
It looks like you're defining a custom dataset class, PosDataset, for part-of-speech tagging using BERT. Let's break down the key components of this class:

Initialization (__init__ method):

tagged_sents: The input parameter representing a list of tagged sentences. Each sentence is a list of tuples, where each tuple contains a word and its corresponding part-of-speech tag.
Data Processing in Initialization:

sents and tags_li: Lists to store tokenized sentences and their corresponding part-of-speech tags. The special tokens [CLS] and [SEP] are added to the beginning and end of each sentence.
Tokenization is performed using the BERT tokenizer, and the tokenized sentences (sents) and part-of-speech tags (tags_li) are stored in the class.
Length Method (__len__):

Returns the number of sentences in the dataset.
Get Item Method (__getitem__):

Retrieves an item from the dataset by index.
words and tags: Original words and part-of-speech tags for the current sentence.
x, is_heads, and y: Lists for tokenized words, indicator of whether a token is the first piece of a word, and corresponding part-of-speech tag indices, respectively.
Tokenization and conversion to indices are performed using the BERT tokenizer and the provided tag2idx mapping.
The method returns the original words, tokenized word IDs (x), indicator for the first piece of each word (is_heads), original part-of-speech tags, part-of-speech tag IDs (y), and the sequence length.



In [None]:
class PosDataset(data.Dataset):
    def __init__(self, tagged_sents):
        sents, tags_li = [], [] # list of lists
        for sent in tagged_sents:
            words = [word_pos[0] for word_pos in sent]
            tags = [word_pos[1] for word_pos in sent]
            sents.append(["[CLS]"] + words + ["[SEP]"])
            tags_li.append(["<pad>"] + tags + ["<pad>"])
        self.sents, self.tags_li = sents, tags_li

    def __len__(self):
        return len(self.sents)

    def __getitem__(self, idx):
        words, tags = self.sents[idx], self.tags_li[idx] # words, tags: string list

        # We give credits only to the first piece.
        x, y = [], [] # list of ids
        is_heads = [] # list. 1: the token is the first piece of a word
        for w, t in zip(words, tags):
            tokens = tokenizer.tokenize(w) if w not in ("[CLS]", "[SEP]") else [w]
            xx = tokenizer.convert_tokens_to_ids(tokens)

            is_head = [1] + [0]*(len(tokens) - 1)

            t = [t] + ["<pad>"] * (len(tokens) - 1)  # <PAD>: no decision
            yy = [tag2idx[each] for each in t]  # (T,)

            x.extend(xx)
            is_heads.extend(is_head)
            y.extend(yy)

        assert len(x)==len(y)==len(is_heads), "len(x)={}, len(y)={}, len(is_heads)={}".format(len(x), len(y), len(is_heads))

        # seqlen
        seqlen = len(y)

        # to string
        words = " ".join(words)
        tags = " ".join(tags)
        return words, x, is_heads, tags, y, seqlen


In [None]:
dataset = PosDataset(tagged_sents)
dataset[4]

# **Commento:**

> Il tokenizer di Bert non mi è familiare, creo qui sotto una cella per effettuare, su una sentence di debug, una possibile esecuzione



In [None]:
# già importato from transformers import BertTokenizer

# Load the BERT tokenizer
# già caricato in memoria tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize a sentence
text = "Hello, how does BERT tokenizer work?"
for t in text:
  tokens = tokenizer.tokenize(t)
  print("Tokens:", tokens)

# Convert tokens to IDs
token_ids = tokenizer.convert_tokens_to_ids(tokens)

# Print the results
print("Original Text:", text)
print("Token IDs:", token_ids)

Hello, -> ['Hello', ',']
how -> ['how']
does -> ['does']
BERT -> ['B', '##ER', '##T']
tokenizer -> ['token', '##izer']
work? -> ['work', '?']
Hello, how does BERT tokenizer work? -> ['[CLS]', 'Hello', ',', 'how', 'does', 'B', '##ER', '##T', 'token', '##izer', 'work', '?', '[SEP]']


# **Pad function**


> The pad function appears to be a data processing function that pads the input batch to the length of the longest sequence in the batch. Let's break down the key components of this function:

Input Parameters:

> batch: A batch of samples, where each sample is a tuple containing information about words, tokenized word IDs (x), indicator for the first piece of each word (is_heads), original part-of-speech tags, part-of-speech tag IDs (y), and sequence length.
Processing Steps:

>Extract relevant information from the batch using lambda functions (f). f(0), f(2), f(3), and f(-1) extract words, indicator for the first piece of each word, original part-of-speech tags, and sequence lengths, respectively.

Finds the maximum sequence length (maxlen) in the batch.

>Define a lambda function f that pads sequences to a specified length (seqlen). This function is used to pad both the tokenized word IDs (x) and part-of-speech tag IDs (y). Padding is done with zeros (0), which likely corresponds to the <pad> token.

Apply the padding function to the tokenized word IDs (x) and part-of-speech tag IDs (y) using the maximum sequence length (maxlen).

Convert the padded tokenized word IDs (x) and part-of-speech tag IDs (y) to PyTorch LongTensors using torch.LongTensor.

>Return the padded words, tokenized word IDs (x), indicator for the first piece of each word (is_heads), original part-of-speech tags, padded part-of-speech tag IDs (y), and sequence lengths.

>In summary, this function is used to pad a batch of sequences to the length of the longest sequence in the batch, making it suitable for input to a neural network where all sequences in a batch must have the same length.



In [None]:
def pad(batch):
    '''Pads to the longest sample'''
    f = lambda x: [sample[x] for sample in batch]
    words = f(0)
    is_heads = f(2)
    tags = f(3)
    seqlens = f(-1)
    maxlen = np.array(seqlens).max()

    f = lambda x, seqlen: [sample[x] + [0] * (seqlen - len(sample[x])) for sample in batch] # 0: <pad>
    x = f(1, maxlen)
    y = f(-2, maxlen)


    f = torch.LongTensor

    return words, f(x), is_heads, tags, f(y), seqlens

# Model

In [None]:
from transformers import BertModel

In [None]:
%%time
class Net(nn.Module):
    def __init__(self, vocab_size=None):
        super().__init__()
        # Provare a cambiare i modelli e tracciare il comportamento come tempi ed accuratezza.
        self.bert = BertModel.from_pretrained('bert-base-cased')
        # Qui puoi divertirti a cambiare o embedding size se si puo'
        # o a cambiare la rete neurale di uscita, che adesso e' un
        # semplice singolo layer linear.
        self.fc = nn.Linear(768, vocab_size)
        #self.fc = [nn.Dense(768, 512), nn.Linear(512, vocab_size)]
        self.device = device

    def forward(self, x):
        '''
        x: (N, T). int64
        y: (N, T). int64
        '''
        x = x.to(self.device)
        if self.training:
            self.bert.train()
            encoded_data = self.bert(x).last_hidden_state
        else:
            self.bert.eval()
            with torch.no_grad():
              encoded_data = self.bert(x).last_hidden_state
        logits = self.fc(encoded_data)
        y_hat = logits.argmax(-1)
        return logits, y_hat

CPU times: user 48 µs, sys: 0 ns, total: 48 µs
Wall time: 54.4 µs


# Train an evaluate

In [None]:
def train(model, iterator, optimizer, criterion):
    model.train()
    for i, batch in enumerate(iterator):
        words, x, is_heads, tags, y, seqlens = batch
        _y = y # for monitoring
        optimizer.zero_grad()
        logits, _ = model(x) # logits: (N, T, VOCAB), y: (N, T)

        logits = logits.view(-1, logits.shape[-1]) # (N*T, VOCAB)
        y = y.view(-1)  # (N*T,)

        loss = criterion(logits, y)
        loss.backward()

        optimizer.step()

        if i%10==0: # monitoring
            print("step: {}, loss: {}".format(i, loss.item()))

In [None]:
def eval(model, iterator):
    model.eval()

    Words, Is_heads, Tags, Y, Y_hat = [], [], [], [], []
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            words, x, is_heads, tags, y, seqlens = batch

            _, y_hat = model(x)  # y_hat: (N, T)

            Words.extend(words)
            Is_heads.extend(is_heads)
            Tags.extend(tags)
            Y.extend(y.numpy().tolist())
            Y_hat.extend(y_hat.cpu().numpy().tolist())

    ## gets results and save
    with open("result", 'w') as fout:
        for words, is_heads, tags, y_hat in zip(Words, Is_heads, Tags, Y_hat):
            y_hat = [hat for head, hat in zip(is_heads, y_hat) if head == 1]
            preds = [idx2tag[hat] for hat in y_hat]
            assert len(preds)==len(words.split())==len(tags.split())
            for w, t, p in zip(words.split()[1:-1], tags.split()[1:-1], preds[1:-1]):
                fout.write("{} {} {}\n".format(w, t, p))
            fout.write("\n")

    ## calc metric
    y_true =  np.array([tag2idx[line.split()[1]] for line in open('result', 'r').read().splitlines() if len(line) > 0])
    y_pred =  np.array([tag2idx[line.split()[2]] for line in open('result', 'r').read().splitlines() if len(line) > 0])

    acc = (y_true==y_pred).astype(np.int32).sum() / len(y_true)

    print("acc=%.2f"%acc)


## Load model and train

In [None]:
model = Net(vocab_size=len(tag2idx))
model.to(device)
model = nn.DataParallel(model)

In [None]:
train_dataset = PosDataset(train_data)
eval_dataset = PosDataset(test_data)

train_iter = data.DataLoader(dataset=train_dataset,
                             batch_size=8,
                             shuffle=True,
                             num_workers=1,
                             collate_fn=pad)
test_iter = data.DataLoader(dataset=eval_dataset,
                             batch_size=8,
                             shuffle=False,
                             num_workers=1,
                             collate_fn=pad)

optimizer = optim.Adam(model.parameters(), lr = 0.0001)

criterion = nn.CrossEntropyLoss(ignore_index=0)

In [None]:
for words, x, is_heads, tags, y, seqlen in train_iter:
  print(model(x))

In [None]:
train(model, train_iter, optimizer, criterion)
eval(model, test_iter)

step: 0, loss: 3.855541944503784
step: 10, loss: 3.8497655391693115
step: 20, loss: 3.848134994506836
step: 30, loss: 3.854391574859619
step: 40, loss: 3.859161138534546
step: 50, loss: 3.887922525405884
step: 60, loss: 3.899444818496704
step: 70, loss: 3.8609259128570557


KeyboardInterrupt: ignored

Check the result.

In [None]:
open('result', 'r').read().splitlines()[:100]

['Bonds NNS NNS',
 'due JJ JJ',
 'in IN IN',
 '2005 CD CD',
 'have VBP VBP',
 'a DT DT',
 '7 CD CD',
 '1\\/2 CD CD',
 '% NN NN',
 'coupon NN NN',
 'and CC CC',
 'are VBP VBP',
 'priced VBN VBN',
 '*-1 -NONE- -NONE-',
 'at IN IN',
 'par NN NN',
 '. . .',
 '',
 'Mr. NNP NNP',
 'Sidak NNP NNP',
 'served VBD VBD',
 'as IN IN',
 'an DT DT',
 'attorney NN NN',
 'in IN IN',
 'the DT DT',
 'Reagan NNP NNP',
 'administration NN NN',
 '. . .',
 '',
 'Municipal NNP NNP',
 'Issues NNPS NNPS',
 '',
 'Viacom NNP NNP',
 'denies VBZ VBZ',
 '0 -NONE- -NONE-',
 'it PRP PRP',
 "'s VBZ VBZ",
 'using VBG VBG',
 'pressure NN NN',
 'tactics NNS NNS',
 '. . .',
 '',
 'Tokyo NNP NNP',
 "'s POS POS",
 'leading VBG VBG',
 'program NN NN',
 'traders NNS NNS',
 'are VBP VBP',
 'the DT DT',
 'big JJ JJ',
 'U.S. NNP NNP',
 'securities NNS NNS',
 'houses NNS NNS',
 ', , ,',
 'though IN IN',
 'the DT DT',
 'Japanese NNP NNS',
 'are VBP VBP',
 'playing VBG VBG',
 'catch-up NN JJ',
 '. . .',
 '',
 'That DT DT',
 "'s VBZ