In [None]:
!pip install transformers==4.30.0

Collecting transformers==4.30.0
  Downloading transformers-4.30.0-py3-none-any.whl (7.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.2/7.2 MB[0m [31m51.7 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.14.1 (from transformers==4.30.0)
  Downloading huggingface_hub-0.17.3-py3-none-any.whl (295 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m295.0/295.0 kB[0m [31m34.7 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1 (from transformers==4.30.0)
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m102.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting safetensors>=0.3.1 (from transformers==4.30.0)
  Downloading safetensors-0.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m 

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report, f1_score, accuracy_score
from sklearn.model_selection import train_test_split
import collections
import time
import pathlib
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import random
import ast

import torch
from torch import nn, cuda
from torch.utils.data import DataLoader
import torch.nn.functional as F
from scipy.stats import spearmanr, pearsonr

import transformers
from transformers import BertModel, BertTokenizerFast, DistilBertModel, DistilBertTokenizerFast
from transformers import get_linear_schedule_with_warmup
from torch.utils.data import DataLoader, ConcatDataset
import copy

device = 'cuda' if cuda.is_available() else 'cpu'
if device == 'cuda':
    torch.cuda.empty_cache()
print(device)

cuda


In [None]:
"""
Dataset class
"""
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, encodings_2, len):
        self.encodings = encodings
        self.encodings_2 = encodings_2
        self.len = len

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        if not self.encodings_2 is None:
            item2 = {f"{key}_2": torch.tensor(val[idx]) for key, val in self.encodings_2.items()}
            item = {**item, **item2}
        return item

    def __len__(self):
        return self.len

"""
BERT model
"""
class BERTClassifier(nn.Module):
    def __init__(self, num_labels = 2):
        super(BERTClassifier, self).__init__()
        self.num_labels = num_labels
        self.bert = BertModel.from_pretrained("bert-base-uncased")
        self.classifier = nn.Linear(768, self.num_labels)
        self.dropout = torch.nn.Dropout(0.1)

    def forward(self, input_ids, attention_mask, token_type_ids):
        x = self.bert(input_ids,
                      token_type_ids = token_type_ids,
                      attention_mask = attention_mask)
        hidden = self.dropout(nn.LeakyReLU()(x.pooler_output))
        return self.classifier(hidden)

def prepare_input_for_test(argument_list, topic_list, tokenizer, argument_list_2 = None, bs = 32, shuffle = False, max_len = 256):

    encodings_argument_1 = tokenizer(argument_list,
                          topic_list,
                          truncation=True,
                          padding='max_length',
                          max_length=max_len)

    encodings_argument_2 = None
    if not argument_list_2 is None:
        encodings_argument_2 = tokenizer(argument_list_2,
                          topic_list,
                          truncation=True,
                          padding='max_length',
                          max_length=max_len)

    dst = CustomDataset(encodings_argument_1, encodings_argument_2, len(argument_list))
    dataloader = DataLoader(dst, batch_size = bs, shuffle = shuffle)

    return dataloader

def get_batch_predictions(model, batch):
    # mini-batch predictions
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    input_type_ids = batch['token_type_ids'].to(device)

    # get predictions for the mini-batch
    outputs_1 = model(input_ids, attention_mask, input_type_ids) # outputs BS x 2

    outputs_2 = None

    if 'input_ids_2' in batch.keys():
        input_ids = batch['input_ids_2'].to(device)
        attention_mask = batch['attention_mask_2'].to(device)
        input_type_ids = batch['token_type_ids_2'].to(device)

        outputs_2 = model(input_ids, attention_mask, input_type_ids) # outputs BS x 2

    return outputs_1, outputs_2

def test_model(model, dataloader, dual):

    model.eval()

    predictions = np.array([], int)
    predictions_2 = np.array([], int)
    logits = np.zeros((0, 2), dtype=np.float32)
    logits_2 = np.zeros((0, 2), dtype=np.float32)

    with torch.no_grad():
        for batch in dataloader:
            outputs1, outputs2 = get_batch_predictions(model, batch)

            logits = np.concatenate((logits, outputs1.cpu().numpy()), axis = 0)
            predictions = np.concatenate((predictions, torch.argmax(outputs1, dim = 1).int().cpu().numpy()), axis = 0)

            if outputs2 is not None:
                logits_2 = np.concatenate((logits_2, outputs2.cpu().numpy()), axis = 0)
                predictions_2 = np.concatenate((predictions_2, torch.argmax(outputs2, dim = 1).int().cpu().numpy()), axis = 0)

    if dual:
        return predictions, predictions_2, logits, logits_2

    return predictions, logits

In [None]:
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

##########################################

REL_BEST_MODEL = '/content/drive/MyDrive/Doctorado/Beca Doctoral Fede Schmidt/Proyectos/Proyecto Y - Unit Segmentation and Error Categorization/Error Categorization - Datos/trained_model_ukp_relnorel_exp1.pt'

model_state_for_inference = torch.load(REL_BEST_MODEL, map_location=torch.device(device))

rel_model = BERTClassifier()
rel_model.load_state_dict(model_state_for_inference)
rel_model.to(device)

# rel_predictions, rel_logits = test_model(rel_model, test_loader)

##########################################

STANCE_BEST_MODEL = '/content/drive/MyDrive/Doctorado/Beca Doctoral Fede Schmidt/Proyectos/Proyecto Y - Unit Segmentation and Error Categorization/Error Categorization - Datos/trained_model_ukp_attsup_exp1.pt'

model_state_for_inference = torch.load(STANCE_BEST_MODEL, map_location=torch.device(device))

stance_model = BERTClassifier()
stance_model.load_state_dict(model_state_for_inference)
stance_model.to(device)

# stance_predictions, stance_logits = test_model(stance_model, test_loader)

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight',

BERTClassifier(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_af

In [None]:
def get_results_df(dataframe, rel_predictions, stance_predictions, dual):
    results_df = dataframe.copy()

    if dual:
        preds_arg_1, preds_arg_2, logits_1, logits_2 = rel_predictions
        results_df['rel_preds'] = list(zip(preds_arg_1, preds_arg_2))
        results_df['rel_logits'] = list(zip(logits_1.tolist(), logits_2.tolist()))

        preds_arg_1, preds_arg_2, logits_1, logits_2 = stance_predictions
        results_df['stance_preds'] = list(zip(preds_arg_1, preds_arg_2))
        results_df['stance_logits']  = list(zip(logits_1.tolist(), logits_2.tolist()))
    else:
        preds_arg_1, logits_1 = rel_predictions
        results_df['rel_preds'] = list(zip(preds_arg_1))
        results_df['rel_logits']  = list(zip(logits_1.tolist()))

        preds_arg_1, logits_1 = stance_predictions
        results_df['stance_preds'] = list(zip(preds_arg_1))
        results_df['stance_logits']  = list(zip(logits_1.tolist()))

    return results_df

def get_relTag(row):
    if row.combinationType in ['C1', 'C2']:
        return tuple([row.relTag[1]])
    elif row.combinationType == 'C3':
        return row.relTag[1:3]
    else:
        return tuple([row.relTag[1], row.relTag[3]])

def get_stanceTag(row):
    if row.combinationType in ['C1', 'C2']:
        return tuple([row.stanceTag[1]])
    elif row.combinationType == 'C3':
        return row.stanceTag[1:3]
    else:
        return tuple([row.stanceTag[1], row.stanceTag[3]])

In [None]:
# data_type_list = ['MISP']
# data_type_list = ['MG']
# data_type_list = ['SPLIT']
# data_type_list = ['PM', 'MU']
data_type_list = ['IMP']

result_df_info = []

for data_type in data_type_list:

    dataframe = pd.read_csv(f'/content/drive/MyDrive/Doctorado/Beca Doctoral Fede Schmidt/Proyectos/Proyecto Y - Unit Segmentation and Error Categorization/Error Categorization - Datos/Synthetic Data/data_{data_type}.csv')
    dataframe['indices'] = dataframe['indices'].apply(ast.literal_eval)
    dataframe['arg_component_indices'] = dataframe['arg_component_indices'].apply(ast.literal_eval)
    dataframe['indices_with_error'] = dataframe['indices_with_error'].apply(ast.literal_eval)
    dataframe['relTag'] = dataframe['relTag'].apply(ast.literal_eval)
    dataframe['stanceTag'] = dataframe['stanceTag'].apply(ast.literal_eval)

    dataframe = dataframe.drop(columns = ['indices'])

    topic_list = list(dataframe.topic.values)
    predicted_argument_1, predicted_argument_2 = None, None

    cant_arguments_per_sentence = len(dataframe['indices_with_error'].values[0])

    indices_with_error  = [x[0] for x in dataframe.indices_with_error]
    sentences = dataframe.sentence.values
    predicted_argument_1 = [" ".join(sentence.split()[start:end]) for sentence, (start, end) in zip(sentences, indices_with_error)]

    if cant_arguments_per_sentence == 2:
        indices_with_error  = [x[1] for x in dataframe.indices_with_error]
        predicted_argument_2 = [" ".join(sentence.split()[start:end]) for sentence, (start, end) in zip(sentences, indices_with_error)]

    test_loader = prepare_input_for_test(predicted_argument_1, topic_list, tokenizer,
                                         argument_list_2 = predicted_argument_2,
                                         bs = 32, shuffle = False)

    dual = cant_arguments_per_sentence == 2
    predictions_of_rel_model = test_model(rel_model, test_loader, dual)
    predictions_of_stance_model = test_model(stance_model, test_loader, dual)

    dataframe['relTag'] = dataframe.apply(lambda r : get_relTag(r), axis = 1)
    dataframe['stanceTag'] = dataframe.apply(lambda r : get_stanceTag(r), axis = 1)
    results_df = get_results_df(dataframe, predictions_of_rel_model, predictions_of_stance_model, dual)

    results_df.to_csv(f'results_{data_type}.csv', index = False)