This notebook can be used to evaluate the models that were trained on the task of labeling Telegram messages with misconceptions related to COVID-19. It was adapted from a [notebook created by Ronak Patel](https://towardsdatascience.com/transformers-for-multilabel-classification-71a1a0daf5e1).

In [None]:
# Choose the model to evaluate
BASE_MODEL = "distilbert-base-german-cased"
TRAINED_MODEL_NAME = "2021-12-20T17-54-59_MULT_DistilBERT"

In [None]:
import nltk
import numpy as np
import os
import pandas as pd
import tensorflow as tf
import torch
from sklearn.metrics import classification_report, f1_score, accuracy_score
from torch.utils.data import TensorDataset, DataLoader, SequentialSampler
from transformers import AutoTokenizer, AutoModelForSequenceClassification

In [None]:
# Check for GPU
device_name = tf.test.gpu_device_name()
if device_name != "/device:GPU:0":
    raise SystemError("GPU device not found")
print("Found GPU at: {}".format(device_name))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def process_message(message):
    """
    Process a message by removing German stop words from it. Since the proportion of a 
    message that can be taken into account by a model is often limited, it might make 
    sense to do this to consider the more significant words of the message.

    Args:
        message (str): The message to process

    Returns:
        str: The processed message
    """
    german_stop_words = nltk.corpus.stopwords.words("german")
    input_message_split = message.split()
    processed_message = " ".join(
        [w for w in input_message_split if not w in german_stop_words]
    )
    return processed_message

In [None]:
# Initialization

import pickle
from IPython.display import display

# TODO: Get this from model/json file
MAX_TOKEN_LENGTH = 280
BATCH_SIZE = 32

TRAINED_MODEL_DIR = f"../models/{TRAINED_MODEL_NAME}"

# Load test data
df_test = pd.read_csv(f"{TRAINED_MODEL_DIR}/test.csv")
display(df_test.head())

BASE_COLUMNS_COUNT = 2
base_columns = list(df_test.columns[:BASE_COLUMNS_COUNT])
misconception_columns = list(
    df_test.columns[BASE_COLUMNS_COUNT:-1]
)  # exclude the last column containing the one-hot vector of labels
print("Misconceptions:", misconception_columns)

# Load model and tokenizer
model = AutoModelForSequenceClassification.from_pretrained(
    f"{TRAINED_MODEL_DIR}/model", num_labels=len(misconception_columns)
)
model.cuda()
model.eval()  # Put model into evaluation mode
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, use_fast=True)

# Load mappings between labels and label ids
id2label = pickle.load(open(f"{TRAINED_MODEL_DIR}/id2label.p", "rb"))
label2id = pickle.load(open(f"{TRAINED_MODEL_DIR}/label2id.p", "rb"))

model.config.id2label = id2label
model.config.label2id = label2id

In [None]:
# Add one-hot-encoding column
df_test["one_hot_labels"] = list(df_test[misconception_columns].astype("int").values)

one_hot_labels = list(df_test.one_hot_labels.values)
messages_list = list(df_test.content.values)
messages_list = [process_message(message) for message in messages_list]

# Encode input data
encodings = tokenizer.batch_encode_plus(
    messages_list, max_length=MAX_TOKEN_LENGTH, padding=True, truncation=True
)
input_ids = encodings["input_ids"]
attention_masks = encodings["attention_mask"]

In [None]:
# Make tensors out of data
inputs = torch.tensor(input_ids)
labels = torch.tensor(one_hot_labels)
masks = torch.tensor(attention_masks)
# Create test dataloader
test_data = TensorDataset(inputs, masks, labels)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=BATCH_SIZE)
# Save test dataloader
# torch.save(test_dataloader,'test_data_loader')

In [None]:
# track variables
logit_preds, true_labels, pred_labels, tokenized_texts = [], [], [], []

# Predict
for i, batch in enumerate(test_dataloader):
    batch = tuple(t.to(device) for t in batch)
    # Unpack the inputs from our dataloader
    b_input_ids, b_input_mask, b_labels = batch
    with torch.no_grad():
        # Forward pass
        outs = model(b_input_ids, attention_mask=b_input_mask)
        b_logit_pred = outs[0]
        pred_label = torch.sigmoid(b_logit_pred)

        b_logit_pred = b_logit_pred.detach().cpu().numpy()
        pred_label = pred_label.to("cpu").numpy()
        b_labels = b_labels.to("cpu").numpy()

    tokenized_texts.append(b_input_ids)
    logit_preds.append(b_logit_pred)
    true_labels.append(b_labels)
    pred_labels.append(pred_label)

# Flatten outputs
tokenized_texts = [item for sublist in tokenized_texts for item in sublist]
pred_labels = [item for sublist in pred_labels for item in sublist]
true_labels = [item for sublist in true_labels for item in sublist]
# Converting flattened binary values to boolean values
true_bools = [tl == 1 for tl in true_labels]

In [None]:
pred_bools = [pl > 0.3 for pl in pred_labels]  # boolean output after thresholding

# Print and save classification report
print("F1-Score: ", f1_score(true_bools, pred_bools, average="micro"))
print("Accuracy: ", accuracy_score(true_bools, pred_bools), "\n")
clf_report = classification_report(
    true_bools, pred_bools, target_names=misconception_columns
)
# Save report
if not os.path.isdir(f"{TRAINED_MODEL_DIR}/results/"):
    os.makedirs(f"{TRAINED_MODEL_DIR}/results/")
with open(f"{TRAINED_MODEL_DIR}/results/classification_report.txt", "w") as f:
    f.write(clf_report)

print(clf_report)

In [None]:
# Getting indices of where boolean one hot vector true_bools is True so we can use 
# id2label to gather label names
true_label_idxs, pred_label_idxs = [], []
for vals in true_bools:
    true_label_idxs.append(np.where(vals)[0].flatten().tolist())
for vals in pred_bools:
    pred_label_idxs.append(np.where(vals)[0].flatten().tolist())

In [None]:
# Gathering vectors of label names using id2label
true_label_texts, pred_label_texts = [], []
for vals in true_label_idxs:
    if vals:
        true_label_texts.append([id2label[val] for val in vals])
    else:
        true_label_texts.append(vals)

for vals in pred_label_idxs:
    if vals:
        pred_label_texts.append([id2label[val] for val in vals])
    else:
        pred_label_texts.append(vals)

In [None]:
# Decoding input ids to comment text
comment_texts = [
    tokenizer.decode(text, skip_special_tokens=True, clean_up_tokenization_spaces=False)
    for text in tokenized_texts
]

In [None]:
# Converting lists to df
comparisons_df = pd.DataFrame(
    {
        "comment_text": comment_texts,
        "true_labels": true_label_texts,
        "pred_labels": pred_label_texts,
    }
)
comparisons_df.to_csv(
    f"{TRAINED_MODEL_DIR}/results/classification_model_true_predicted_comparison.csv"
)
comparisons_df

In [None]:
# Calculate Accuracy - maximize F1 accuracy by tuning threshold values. 
# First with 'macro_thresholds' on the order of e^-1 
# then with 'micro_thresholds' on the order of e^-2

macro_thresholds = np.array(range(1, 10)) / 10

f1_results, flat_acc_results = [], []
for th in macro_thresholds:
    pred_bools = [pl > th for pl in pred_labels]
    test_f1_accuracy = f1_score(true_bools, pred_bools, average="micro")
    test_flat_accuracy = accuracy_score(true_bools, pred_bools)
    f1_results.append(test_f1_accuracy)
    flat_acc_results.append(test_flat_accuracy)

best_macro_th = macro_thresholds[np.argmax(f1_results)]  # best macro threshold value

micro_thresholds = (
    np.array(range(10)) / 100
) + best_macro_th  # calculating micro threshold values

f1_results, flat_acc_results = [], []
for th in micro_thresholds:
    pred_bools = [pl > th for pl in pred_labels]
    test_f1_accuracy = f1_score(true_bools, pred_bools, average="micro")
    test_flat_accuracy = accuracy_score(true_bools, pred_bools)
    f1_results.append(test_f1_accuracy)
    flat_acc_results.append(test_flat_accuracy)

best_f1_idx = np.argmax(f1_results)  # best threshold value

# Printing and saving classification report
print("Best Threshold: ", micro_thresholds[best_f1_idx])
print("F1-Score: ", f1_results[best_f1_idx])
print("Accuracy: ", flat_acc_results[best_f1_idx], "\n")

best_pred_bools = [pl > micro_thresholds[best_f1_idx] for pl in pred_labels]
clf_report_optimized = classification_report(
    true_bools, best_pred_bools, target_names=misconception_columns
)
# Save report
if not os.path.isdir(f"{TRAINED_MODEL_DIR}/results/"):
    os.makedirs(f"{TRAINED_MODEL_DIR}/results/")
with open(f"{TRAINED_MODEL_DIR}/results/classification_report_optimized.txt", "w") as f:
    f.write(clf_report_optimized)

print(clf_report_optimized)