# Key Point Analysis

In [1]:
import pandas as pd
import numpy as np

from sentence_transformers import SentenceTransformer
from sentence_transformers.losses import ContrastiveLoss, CosineSimilarityLoss
from sentence_transformers.models import Transformer, Pooling
from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator

from torch.utils.data import DataLoader

import constants as c
import utilities as u

## 1. Data Preprocessing

In [2]:
for data_type in [c.TRAIN, c.TEST, c.DEV]:
    # labels, key point and arguments datasets are loaded
    labels_df = pd.read_csv(c.DATA_DIR + "labels_" + data_type + ".csv")
    kp_df = pd.read_csv(c.DATA_DIR + "key_points_" + data_type + ".csv")
    arg_df = pd.read_csv(c.DATA_DIR + "arguments_" + data_type + ".csv")

    # the datasets are merged together
    result_df = pd.merge(labels_df, arg_df)
    result_df = pd.merge(result_df, kp_df)

    # an additional "topic_key_point" column is created, as the concatenation of the topics and the key points themselves
    result_df[c.TOPIC_KP] = result_df.apply(lambda x: x[c.TOPIC] + " <SEP> " + x[c.KP], axis = 1)
    
    # the unnecessary information are discarded and the dataset is saved 
    result_df = result_df[[c.ARG, c.TOPIC_KP, c.LABEL, c.STANCE]]
    result_df.to_csv(c.DATA_DIR + data_type + ".csv", index = None)

## 2. Training the models

### RoBERTa-base with contrastive loss

In [3]:
def fit_model(model_folder_name, model_name=c.BASE, batch_size=c.DEF_BATCH_SIZE, loss=c.CONTRASTIVE_LOSS, epochs=c.DEF_EPOCHS, learning_rate=c.DEF_LEARNING_RATE):
    if u.is_model_present(model_folder_name):
        # load the pre-trained model
        model = SentenceTransformer(c.MODELS_DIR + model_folder_name)
        
    else:
        # empty the folder for the model if present
        u.create_folder(c.MODELS_DIR + model_folder_name)

        # use the RoBERTa pre-trained language model, fine-tuned for sentence embedding
        # word_embedding_model = Transformer("roberta-base", max_seq_length=c.DEF_MAX_SEQ_LENGTH)
        word_embedding_model = Transformer(model_name)

        # add the <SEP> token to the tokenizer (used to concatenate topic and key point)
        word_embedding_model.tokenizer.add_tokens(["<SEP>"], special_tokens=True)
        # resize the embedding matrix to include the new token
        word_embedding_model.auto_model.resize_token_embeddings(len(word_embedding_model.tokenizer))

        # pooling aggregates the embeddings of the tokens in a fixed-size vector
        pooling_model = Pooling(word_embedding_model.get_word_embedding_dimension())

        # the sentence transformer is the concatenation of the word embedding model and the pooling model
        model = SentenceTransformer(modules=[word_embedding_model, pooling_model])

        # train samples: list of InputExample objects
        train_samples = u.create_samples("train.csv")

        train_dataloader = DataLoader(train_samples, shuffle=False, batch_size=batch_size)

        # TODO: check if shuffle is needed

        # contrastive loss: the model is trained
        # to minimize the distance between the embeddings
        # of the topic+key point and the argument
        # if they are in the same stance,
        # and to maximize the distance if they are in different stances
        if loss == c.CONTRASTIVE_LOSS:
            train_loss = ContrastiveLoss(model)
        elif loss == c.COSINE_SIMILARITY_LOSS:
            train_loss = CosineSimilarityLoss(model)

        # TODO: check if we can use other losses, e.g. CosineSimilarityLoss

        # train the model with the train samples
        # default optimizer: AdamW (adaptive moment estimation + weight decay)
        # default learning rate: 2e-5
        # default weight decay: 0.01
        model.fit(
            train_objectives = [(train_dataloader, train_loss)],
            epochs = epochs,
            output_path = c.MODELS_DIR + model_folder_name,
            show_progress_bar = True,
            learning_rate = learning_rate
        )

    return model

In [4]:
base_model = fit_model(c.BASE)
high_lr_model = fit_model(c.BASE + "_high-lr", learning_rate=3e-5)
low_lr_model = fit_model(c.BASE + "_low-lr", learning_rate=1e-5)
big_batch_model = fit_model(c.BASE + "_big-batch", batch_size=32)
small_batch_model = fit_model(c.BASE + "_small-batch", batch_size=4)
more_epochs_model = fit_model(c.BASE + "_more-epochs", epochs=12)
less_epochs_model = fit_model(c.BASE + "less-epochs", epochs=4)
cs_loss_model = fit_model(c.BASE + "_cs-loss", loss=c.COSINE_SIMILARITY_LOSS)

models = [base_model, high_lr_model, low_lr_model, big_batch_model, small_batch_model, more_epochs_model, less_epochs_model, cs_loss_model]

Some weights of the model checkpoint at roberta-base were not used when initializing RobertaModel: ['lm_head.bias', 'lm_head.decoder.weight', 'lm_head.layer_norm.bias', 'lm_head.dense.weight', 'lm_head.layer_norm.weight', 'lm_head.dense.bias']
- This IS expected if you are initializing RobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


### Siamese BERT

## 3. Selecting best model

In [4]:
# evaluate the model with dev samples
dev_samples = u.create_samples("dev.csv", size=200)
dev_evaluator = EmbeddingSimilarityEvaluator.from_input_examples(dev_samples, name="dev")
model.evaluate(dev_evaluator, output_path=c.MODELS_DIR + c.BASE + "/eval/")

0.8175020931854976

In [11]:
# for each argument, output its match score for each of the key points under the same topic and in the same stance towards the topic
# the match score is the cosine similarity between the embedding of the argument and the embedding of the topic+key point
# the output is a csv file with the following columns:
# - argument
# - topic
# - key point
# - stance
# - match score
def evaluate(model, csv_file, output_file):
    # load the dataset
    df = pd.read_csv(c.DATA_DIR + csv_file)
    
    # create a new column with the embedding of the topic+key point
    df[c.TOPIC_KP + "_emb"] = df[c.TOPIC_KP].apply(lambda x: model.encode(x))
    
    # create a new column with the embedding of the argument
    df[c.ARG + "_emb"] = df[c.ARG].apply(lambda x: model.encode(x))
    
    # create a new column with the match score (cosine similarity)
    df[c.SCORE] = df.apply(lambda x: u.cosine_similarity(x[c.TOPIC_KP + "_emb"], x[c.ARG + "_emb"]), axis = 1)
    
    # discard the unnecessary information
    df = df[[c.ARG, c.TOPIC_KP, c.STANCE, c.SCORE]]

    # save the dataset
    df.to_csv(c.EVAL_DIR + output_file, index=None)


# evaluate the model with the dev dataset
evaluate(model, "dev.csv", c.BASE + "_eval.csv")

KeyboardInterrupt: 

## 4. Testing the best model

In [None]:
# evaluate the model on the test set
test_samples = u.create_samples("test.csv", type=1)

# Evaluate the model on each test example
correct_predictions = 0
total_predictions = len(test_samples)

for sample in test_samples:
    # Get the embeddings for the argument and keypoint
    arg_embedding = model.encode(sample[c.ARG], convert_to_tensor=True)
    key_embedding = model.encode(sample[c.TOPIC_KP], convert_to_tensor=True)
    
    # Compute the cosine similarity between the embeddings
    similarity = 1 - np.cos(arg_embedding, key_embedding)
    
    # Predict the label based on the similarity
    predicted_label = int(similarity > 0.5)
    
    # Compare the predicted label to the true label and count the number of correct predictions
    if predicted_label == sample[c.LABEL]:
        correct_predictions += 1
        
# Compute the accuracy
accuracy = correct_predictions / total_predictions

print(f"Accuracy: {accuracy:.2f}")