# Semantic Role Labeling

NLP - Spring Semester of 2024 at University of Tehran - CA3

In [None]:
import pandas as pd

import torch
import torch.utils.data as data
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from sklearn.metrics import f1_score

import matplotlib.pyplot as plt

import numpy as np

import json
from collections import Counter
from itertools import chain

## Q1

We will implement Semantic Role Labeling LSTM, GRU, and Encoder-Decoder models. The role labels that we use are as follows:

- `Arg0`: Agent
- `Arg1`: Patient
- `Arg2`: Instrument
- `ArgM-LOC`: Location of the verb
- `ArgM-TMP`: Time of the verb

Let's take a look into the datasets before we proceed to the next parts.

### Dataset

We have three files: [`train.json`](./data/train.json), [`valid.json`](./data/valid.json), and [`test.json`](./data/test.json).

These three have four fields `text`, which is the input sentence, `verb_index`, which is zero-based index of the verb in the sentence, `srl_label` which is the label for each word in the sentence, and `word_indices`, which is the index of each word in the sentence.

### Part 1. Preparing the dataset

Let's first write a function that simply reads the json file and store each list object as a column.

In [None]:
def read_json(file_name: str) -> pd.DataFrame:
    with open(file_name, 'r') as f:
        data = json.load(f)
    return pd.DataFrame(data)

In [None]:
train_data = read_json('./data/train.json')

In the next step we need a function to transform the given `srl_frames` into numerical values. We don't really have to do anything but call the `map` function from `pandas`.

In [None]:
SRL_TO_NUM = {'O': 0,  'B-ARG0': 1,  'I-ARG0': 2,  'B-ARG1': 3,  'I-ARG1': 4,  'B-ARG2': 5,
              'I-ARG2': 6,  'B-ARGM-LOC': 7,  'I-ARGM': 8,  'B-ARGM-TMP': 9, 'I-ARGM-TMP': 10}

Next we'll have to write a function that pads the sentences to make them have equal length with each other. For this purpose we'll add `'[pad]'` to the sentences until they reach the size of the biggest sentence. we'll also fill the other columns with other proper values corresponding to the pad token.

In [None]:
def pad_data_sequence(sequences: list[list[str]], pad_token: str) -> pd.DataFrame:
    max_len = max(sequences, key=len)
    result = [seq + [pad_token] * (max_len - len(seq)) for seq in sequences]
    return result

Now let's implement the `Vocab` class. This class is simply a place to hold vocabulary and provide some tools to work with. We'll add its methods along the way in different code cells by inheriting `Vocab` from itself.
The first method with be constructor. The constructor takes an optional argument and sets the `word2id` which is just a map from words to indexes. It'll also add four default values in the vocabulary whether they are in the given `word2id` or not.

In [None]:
class Vocab:
    PAD:   str = '[PAD]'
    START: str = '[START]'
    END:   str = '[END]'
    UNK:   str = '[UNKOWN]'

    def __init__(self, word2id: dict[str, int] = None, default_tokens: bool = True) -> None:
        if word2id:
            self.word2id = word2id
        else:
            self.word2id = dict()
        
        if default_tokens:
            self._add_default_tokens()

        self.default_tokens = default_tokens
        self.id2word = {index: word for word, index in self.word2id.items()}
    
    def _add_default_tokens(self) -> None:
        non_existent_default_tokens = dict()
        if self.PAD not in self.word2id:
            non_existent_default_tokens[self.PAD] = 0
        if self.START not in self.word2id:
            non_existent_default_tokens[self.START] = 1
        if self.END not in self.word2id:
            non_existent_default_tokens[self.END] = 2
        if self.UNK not in self.word2id:
            non_existent_default_tokens[self.UNK] = 3
        
        self.word2id = {word: index + len(non_existent_default_tokens) for word, index in self.word2id.items()}
        self.word2id.update(non_existent_default_tokens)

Next step would be implementing the magic methods for this class. We'll only use the `__getitem__` and `__len__`.

In [None]:
class Vocab(Vocab):
    def __getitem__(self, word: str) -> int:
        unknown_index = 0
        if self.default_tokens:
            unknown_index = self.word2id[self.UNK]
        return self.word2id.get(word, unknown_index)
    
    def __len__(self) -> int:
        return len(self.word2id)

The `add` method will simply add a new word to the vocabulary. It'll also return its index in the vocab, which is equal to the current length of it.

In [None]:
class Vocab(Vocab):
    def add(self, word: str) -> int:
        assert word not in self.word2id

        index = len(self.word2id)
        self.word2id[word] = index
        self.id2word[index] = word
        return index

The `words2indices` takes a list of sentences, which are a list of tokens, and returns a list of indices (it's actually a list of list of numbers) corresponding to the indices of each word in each sentence.

In [None]:
class Vocab(Vocab):
    def words2indices(self, sentences: list[list[str]]) -> list[list[int]]:
        return [
            [self[word] for word in sentence] for sentence in sentences
        ]

Let's implement the reverse of the `words2indices` as well. That would be the `indices2word`.

In [None]:
class Vocab(Vocab):
    def indices2words(self, sentences_indices: list[list[int]]) -> list[list[str]]:
        return [
            [self.id2word.get(index, self.UNK) for index in indices] for indices in sentences_indices
        ]

The `to_input_tensor` will take a list of sentences, which are a list of tokens, and pad them to have equal length. It'll then return the tensor of word indices.

In [None]:
class Vocab(Vocab):
    def to_input_tensor(self, sentences: list[list[str]]) -> list[list[int]]:
        max_len = max([len(sentence) for sentence in sentences])
        padded_sentences = [sentence + [self.PAD] * (max_len - len(sentence)) for sentence in sentences]
        return self.words2indices(padded_sentences)

It's time for the static function, `from_corpus`. This function will take a corpus, which is a list of list of words, and then create the vocabulary class based on it. It also takes some arguments which are described below:

- `corpus`: This is a list of sentences to create the corpus from.
- `size`: This is the maximum number of unique words allowed in the vocabulary.
- `remove_frac`: It tells us what fraction of the less frequent words should be removed.
- `freq_cutoff`: This causes the words that are repeated less than the cutoff to be ignored.

In [None]:
class Vocab(Vocab):
    def from_corpus(corpus: list[list[str]], size: int, remove_frac: float, freq_cutoff: float) -> Vocab:
        word_freq = {}
        for sentence in corpus:
            if len(word_freq) > size:
                break
            for word in sentence:
                if word in word_freq:
                    word_freq[word] += 1
                else:
                    word_freq[word] = 1
                if len(word_freq) > size:
                    break

        word_freq_list = sorted(word_freq.items(), key=lambda x: x[1])
        word_freq_list = list(filter(lambda x: x[1] > freq_cutoff, word_freq_list))
        word_freq_list = word_freq_list[int(len(word_freq_list) * remove_frac):]

        vocab = Vocab()
        for word, _ in word_freq_list:
            vocab.add(word)
        
        return vocab

### Part 2. LSTM Encoder Model

In this section we'll implement and LSTM model using the pytorch library. Our model will have an embedding layer connected to the LSTM. We will then concatenate the hidden layers from each step and pass them to a linear layer.

In [None]:
VOCAB_SIZE = 2000
REMOVE_FRAC = 0.3

EMBEDDING_DIM = 64
HIDDEN_DIM = 64
LEARNING_RATE = 0.1
EPOCH_COUNT = 50
BATCH_SIZE = 64

In [None]:
class LSTMSemanticRoleLabeler(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, labels_count):
        super(LSTMSemanticRoleLabeler, self).__init__()

        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.hidden2label = nn.Linear(hidden_dim * 2, labels_count)

    def forward(self, sentence, verb_indices):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds)
        
        verb_hidden_states = lstm_out[torch.arange(lstm_out.size(0)), verb_indices]
        verb_hidden_states_expanded = verb_hidden_states.unsqueeze(1).expand(-1, lstm_out.size(1), -1)
        concatenated_states = torch.cat((lstm_out, verb_hidden_states_expanded), dim=2)

        label_space = self.hidden2label(concatenated_states)
        label_scores = F.log_softmax(label_space, dim=1)
        return label_scores

Before proceeding to training and testing sections, we need to prepare our data. We will first read the vocabulary and then transform the train data into their corresponding indices. We will then change the tags to their mapped indexes.

In [None]:
class SRLDataSet(data.Dataset):
    def __init__(self, df: pd.DataFrame, text_vocab: Vocab, labels_vocab: Vocab):
        self.text_vocab: Vocab = text_vocab
        self.text = self.text_vocab.to_input_tensor(df['text'].tolist())
        
        self.labels_vocab: Vocab = labels_vocab
        self.labels = self.labels_vocab.to_input_tensor(df['srl_frames'].tolist())

        self.verb_indices =  df['verb_index'].tolist()
    
    def __len__(self):
        return len(self.text)

    def __getitem__(self, index) -> tuple[torch.tensor, torch.tensor]:
        return torch.tensor(self.text[index]), self.verb_indices[index], torch.tensor(self.labels[index])

In [None]:
train_text_vocab = Vocab.from_corpus(train_data['text'].tolist(), VOCAB_SIZE, REMOVE_FRAC, 0)
labels_vocab = Vocab(SRL_TO_NUM, False)

dataset = SRLDataSet(train_data, train_text_vocab, labels_vocab)
data_loader = data.DataLoader(dataset, BATCH_SIZE)

Now let's take a look into the data batch. Note the `[UNKNOWN]`s that happened due to the `remove_frac`.

In [None]:
train_sentences, _, train_labels = next(iter(data_loader))
print(f'Sentences batch shape: {train_sentences.size()}')
print(f'Labels batch shape: {train_labels.size()}')

print(dataset.text_vocab.indices2words([train_sentences[0].tolist()]))
print(dataset.labels_vocab.indices2words([train_labels[0].tolist()]))

Now that we have a function to transform data and labels into correct indexes, let's proceed to the next step.

In [None]:
model = LSTMSemanticRoleLabeler(EMBEDDING_DIM, HIDDEN_DIM, len(dataset.text_vocab), len(SRL_TO_NUM))
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE)

Let's see the scores before training the model. The output scores will be the log of the probability distribution given by the softmax layer. Note that there will be an output of the shape [50, 11]. We have 50 rows because this is the output of each time the model was run over the sequence of 50 words. Each row has 11 columns for the 11 label classes.

In [None]:
with torch.no_grad():
    x_input, verb_index, _ = next(iter(data_loader))
    labels_scores = model(x_input[0].unsqueeze(0), verb_index[0].unsqueeze(0))
    print(f'Shape of the scores: {labels_scores.size()}\n', labels_scores)

Train the model.

In [None]:
def train_model(model, loss_function, optimizer, data_loader) -> tuple[list, list]:
    train_losses = []
    train_accuracies = []
    
    model.train()
    for epoch in range(EPOCH_COUNT):
        running_loss = 0.0
        correct_predictions = 0
        total_samples = 0
    
        for x_batch, verb_index, y_batch in data_loader:
            optimizer.zero_grad()
    
            label_scores = model(x_batch, verb_index)
    
            loss = loss_function(label_scores.view(-1, 11), y_batch.view(-1))
            loss.backward()
            optimizer.step()
    
            predicted = torch.argmax(label_scores, dim=2)
            correct_predictions += (predicted == y_batch).sum().item()
            total_samples += y_batch.numel()
    
            running_loss += loss.item()
        
        epoch_loss = running_loss / len(data_loader)
        epoch_accuracy = correct_predictions / total_samples
    
        train_losses.append(epoch_loss)
        train_accuracies.append(epoch_accuracy)
    
        print(f'Epoch {epoch} loss: {epoch_loss}')
        print(f'Epoch {epoch} accuracy: {epoch_accuracy}')
    
    return train_losses, train_accuracies

In [None]:
train_losses, train_accuracies = train_model(model, loss_function, optimizer, data_loader)

Now let's visualize the training process.

In [None]:
plt.figure()
plt.plot(train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.legend()
plt.show()

In [None]:
plt.figure()
plt.plot(train_accuracies, label='Training Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training Accuracy')
plt.legend()
plt.show()

Evaluate the model.

In [None]:
def calc_f1_score(model, data_loader):
    model.eval()
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for x, verb_index, y in data_loader:
            current_predictions = model(x, verb_index)[0]
            current_predictions = torch.argmax(current_predictions, dim=1)
            
            all_predictions.extend(current_predictions.view(-1).numpy())
            all_labels.extend(y.view(-1).numpy())
    
    return f1_score(all_labels, all_predictions, average="macro")

In [None]:
test_data = read_json('./data/train.json')
test_dataset = SRLDataSet(test_data, train_text_vocab, labels_vocab)
test_data_loader = data.DataLoader(test_dataset)

print(f'F1 score: {calc_f1_score(model, test_data_loader)}')

As we can see the model is functioning poorly in the F1 score. This is because the data has bias on some classes that occur more frequently.

In [None]:
labels_count = Counter(list(chain.from_iterable(train_data['srl_frames'].tolist())))
print(labels_count)

The label 'O' has occurred way more than any other class. This causes the model to learn these more frequent labels better and act poorly on others. In order to make this situation better we can pass the inverse frequency as the class weights to the loss function. We'll continue to do this with the other methods in the next sections.

In [None]:
train_labels = labels_vocab.to_input_tensor(train_data['srl_frames'].tolist())
train_labels_flat = list(chain.from_iterable(train_labels))

train_labels_freq = Counter(train_labels_flat)
train_labels_freq = {label: train_labels_freq.get(label, 1) for label in SRL_TO_NUM.values()}

class_weights = {label: 1/train_labels_freq[label] for label in SRL_TO_NUM.values()}.items()
class_weights = sorted(class_weights, key=lambda x: x[0])
class_weights = [weight for _, weight in class_weights]

In [None]:
model = LSTMSemanticRoleLabeler(EMBEDDING_DIM, HIDDEN_DIM, len(dataset.text_vocab), len(SRL_TO_NUM))
loss_function = nn.CrossEntropyLoss(weight=torch.tensor(class_weights))
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE)

_, _ = train_model(model, loss_function, optimizer, data_loader)

In [None]:
test_data = read_json('./data/train.json')
test_dataset = SRLDataSet(test_data, train_text_vocab, labels_vocab)
test_data_loader = data.DataLoader(test_dataset)

print(f'F1 score: {calc_f1_score(model, test_data_loader)}')

### Part 3. GRU Encoder Model

We'll repeat the last part with replacing LSTM with GRU.

In [None]:
class GRUSemanticRoleLabeler(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, labels_count):
        super(GRUSemanticRoleLabeler, self).__init__()

        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True)
        self.hidden2label = nn.Linear(hidden_dim * 2, labels_count)

    def forward(self, sentence, verb_indices):
        embeds = self.word_embeddings(sentence)
        gru_out, _ = self.gru(embeds)
        
        verb_hidden_states = gru_out[torch.arange(gru_out.size(0)), verb_indices]
        verb_hidden_states_expanded = verb_hidden_states.unsqueeze(1).expand(-1, gru_out.size(1), -1)
        concatenated_states = torch.cat((gru_out, verb_hidden_states_expanded), dim=2)

        label_space = self.hidden2label(concatenated_states)
        label_scores = F.log_softmax(label_space, dim=1)
        return label_scores

In [None]:
model = GRUSemanticRoleLabeler(EMBEDDING_DIM, HIDDEN_DIM, len(dataset.text_vocab), len(SRL_TO_NUM))
loss_function = nn.CrossEntropyLoss(weight=torch.tensor(class_weights))
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE)

In [None]:
train_losses, train_accuracies = train_model(model, loss_function, optimizer, data_loader)

In [None]:
plt.figure()
plt.plot(train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.legend()
plt.show()

In [None]:
plt.figure()
plt.plot(train_accuracies, label='Training Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training Accuracy')
plt.legend()
plt.show()

In [None]:
print(f'F1 score: {calc_f1_score(model, test_data_loader)}')

### Part 4. Encoder-Decoder Model

For this part we are going to generate a series of questions and answers. Then we'll feed these question and answers to an encoder-decoder model and use it to solve the SRL task. Let's first implement the functions to generate the question and answers from the datasets. generating questions would require extracting the predicate and adding the label to the end of the questions. This is the general format of a question: \[Predicate\] \[SEPT\] \[Sentence\] \[Label\]

In [None]:
SEPT_TOKEN = '[SEPT]'
START_TOKEN = '<s>'
END_TOKEN = '</s>'
labels = {label[2:] for label in SRL_TO_NUM.keys() if len(label) > 2}

In [None]:
def generate_questions(sentence: list[str], verb_index: int) -> list[list[str]]:
    return [
        [sentence[verb_index]] + [SEPT_TOKEN] + sentence + [label] for label in labels
    ]

print(generate_questions(['I', 'ran', 'a', 'code', 'printing', 'hello', 'world'], 1))

Next step would be generating answers for each question. we'll use the order in `labels` to generate answers for every question in a parallel list. Note that we have to parse the `B-label` and `I-Labels`, so we write the `parse_label` function for this purpose.

In [None]:
def parse_label(sentence: list[str], sentence_labels: list[str], label: str) -> list[str]:
    simplified_labels = [label[2:] for label in sentence_labels]

    if label not in simplified_labels:
        return []

    label_start_index = simplified_labels.index(label)
    
    if label not in simplified_labels[label_start_index + 1:]:
        return [sentence[label_start_index]]

    label_end_index = simplified_labels.index(label, label_start_index + 1)
    return sentence[label_start_index:label_end_index + 1]


test_sentence = ["In", "the", "summer", "of", "2005", ",", "a", "picture", "that", "people", "have", "long", "been", "looking",
                 "forward", "to", "started", "emerging", "with", "frequency", "in", "various", "major", "Hong", "Kong", "media", "."]
test_label = ["O", "O", "O", "O", "O", "O", "O", "O", "O", "B-ARG0", "O", "B-ARGM-TMP",
               "O", "O", "B-ARG1", "I-ARG1", "O", "O", "O", "O", "O", "O", "O", "O", "O", "O", "O"]

print(parse_label(test_sentence, test_label, 'ARG1'))

In [None]:
def generate_answers(sentence: list[str], sentence_labels: list[str]) -> list[list[str]]:
    return [
        [START_TOKEN] + parse_label(sentence, sentence_labels, label) + [END_TOKEN] for label in labels
    ]

print(generate_answers(test_sentence, test_label))

Now that we have functions to generate questions and answers, let's implement a dataset object that will return a question and an answer while being encoded as a tensor of their indexes. We'll do this with the help of `Vocab` class written in the part 1.

In [None]:
class QADataset(data.Dataset):
    def __init__(self, df: pd.DataFrame, vocab: Vocab):
        padded_sentences = pad_data_sequence(df['text'].tolist(), Vocab.PAD)
        padded_labels = pad_data_sequence(df['srl_frames'].tolist(), 'O')

        questions = []
        answers = []
        for index, row in df.iterrows():
            sentence = padded_sentences[index]
            sentence_labels = padded_labels[index]
            verb_index = row['verb_index']

            questions.extend(generate_questions(sentence, verb_index))
            answers.extend(generate_answers(sentence, sentence_labels))
        
        self.vocab = vocab
        QADataset._add_special_tokens_to_vocab(self.vocab)

        self.questions = self.vocab.to_input_tensor(questions)
        self.answers = self.vocab.to_input_tensor(answers)

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, index):
        return torch.tensor(self.questions[index]), torch.tensor(self.answers[index])
    
    def _add_special_tokens_to_vocab(vocab: Vocab) -> None:
        vocab.add(START_TOKEN)
        vocab.add(END_TOKEN)
        vocab.add(SEPT_TOKEN)

        for label in labels:
            vocab.add(label)

In [None]:
train_text_vocab = Vocab.from_corpus(train_data['text'], VOCAB_SIZE, 0, 0)

dataset = QADataset(train_data, train_text_vocab)
data_loader = data.DataLoader(dataset, BATCH_SIZE)

Let's take a look into out data.

In [None]:
questions_batch, answers_batch = next(iter(data_loader))

print(f'First question: {dataset.vocab.indices2words([questions_batch[0].tolist()])}')
print(f'First answer: {dataset.vocab.indices2words([answers_batch[0].tolist()])}')

Now let's load the GloVe vectors and create the embedding matrix so that we can use it in our embedding layer.

In [None]:
EMBEDDING_DIM = 50

glove_embeddings = {}
with open('./glove/glove.6B.50d.txt', encoding='utf8') as f:
    for line in f:
        line.strip()
        line_tokens = line.split()
        glove_embeddings[line_tokens[0]] = np.asarray(line_tokens[1:], 'float32')

In [None]:
embedding_weights_matrix = np.zeros((len(dataset.vocab), EMBEDDING_DIM))
words_found = 0

for word, index in dataset.vocab.word2id.items():
    try:
        embedding_weights_matrix[index] = glove_embeddings[word]
        words_found += 1
    except KeyError:
        embedding_weights_matrix[index] = np.random.normal(scale=0.6, size=(EMBEDDING_DIM, ))

print(f'{words_found} words found out of {len(dataset.vocab)} words in vocab')

Now let's create the encoder part of the model.

In [None]:
class LSTMEncoder(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size):
        super(LSTMEncoder, self).__init__()

        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)

    def forward(self, question):
        embeds = self.word_embeddings(question)
        lstm_out, (hidden, cell) = self.lstm(embeds)

        return lstm_out, hidden, cell

    def init_embeddings(self, pre_trained_embeddings: np.ndarray):
        self.word_embeddings.load_state_dict({'weight': pre_trained_embeddings})

Implement the decoder with attention mechanism.

In [None]:
class LSTMDecoderWithAttention(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, labels_count):
        super(LSTMDecoderWithAttention, self).__init__()

        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.linear = nn.Linear(hidden_dim, labels_count)

    def forward(self, seq_input, encoder_hidden_state):
        lstm_out, (hidden, cell) = self.lstm(seq_input, encoder_hidden_state)
        attn_out = self.attention(lstm_out, hidden)
        self.linear(attn_out.squeeze(0))

        return attn_out

    def attention(self, lstm_out, final_state):
        hidden = final_state.squeeze(0)
        attn_weights = torch.bmm(lstm_out, hidden.unsqueeze(2)).squeeze(2)
        soft_attn_weights = F.softmax(attn_weights, dim=1)
        new_hidden_state = torch.bmm(
            lstm_out.transpose(1, 2), soft_attn_weights.unsqueeze(2)).squeeze(2)
        
        return new_hidden_state