In [2]:
from lib.dataset_utils import *
import pickle
import numpy as np
import nltk
import string
nltk.download('punkt')
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
from lib.scores import *
from lib.plot_utils import *
from lib.models import bootstrap_test
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt

In [None]:
MODELS = ["Bayes", "DecisionTree", "RandomForest", "Bert", "Roberta", "Llama3"]
DATASETS = ["GoEmotions", "TwitterData", "GoEmotionsGrouped"]
BASE_MODELS_DIR = "./checkpoints/"

GOEMOTIONS_MODELS_PATH = {
    "Bayes": f"{BASE_MODELS_DIR}GoEmotions/nb_classifier.pkl",
    "DecisionTree": f"{BASE_MODELS_DIR}GoEmotions/dt_classifier.pkl",
    "RandomForest": f"{BASE_MODELS_DIR}GoEmotions/rf_classifier.pkl",
    "Bert": f"{BASE_MODELS_DIR}GoEmotions/bert_model.pth",
    "Roberta": f"{BASE_MODELS_DIR}GoEmotions/GoEmotions_Roberta.pth"
}

GOEMOTIONS_GROUPED_MODELS_PATH = {
    "Bayes": f"{BASE_MODELS_DIR}GoEmotions/GoEmotions_goruped_bayes.pkl",
    "DecisionTree": f"{BASE_MODELS_DIR}GoEmotions/GoEmotions_goruped_dt.pkl",
    "RandomForest": f"{BASE_MODELS_DIR}GoEmotions/GoEmotions_goruped_rf.pkl",
    "Bert": f"{BASE_MODELS_DIR}GoEmotions/grouped_bert_model.pth",
    "Roberta": f"{BASE_MODELS_DIR}GoEmotions/GoEmotions_grouped_Roberta.pth"
}

TWITTER_MODELS_PATH = {
    "Bayes": f"{BASE_MODELS_DIR}TwitterData/nb_classifier.pkl",
    "DecisionTree": f"{BASE_MODELS_DIR}TwitterData/dt_classifier.pkl",
    "RandomForest": f"{BASE_MODELS_DIR}TwitterData/rf_classifier.pkl",
    "Bert": f"{BASE_MODELS_DIR}TwitterData/bert_model.pth",
    "Roberta": f"{BASE_MODELS_DIR}TwitterData/Twitter_Roberta.pth"
}

DATASET_TO_PATH_DICT = {
    "GoEmotions": GOEMOTIONS_MODELS_PATH,
    "TwitterData": TWITTER_MODELS_PATH,
    "GoEmotionsGrouped": GOEMOTIONS_GROUPED_MODELS_PATH
}

DATASET_TO_ENUM_DICT = {
    "GoEmotions": DatasetEnum.GoEmotionsCleaned,
    "TwitterData": DatasetEnum.TwitterDataCleaned,
    "GoEmotionsGrouped": DatasetEnum.GoEmotionsCleaned
}

In [None]:
def bayes_predict(dataset):
    # file path
    model_path = DATASET_TO_PATH_DICT[dataset]["Bayes"]
    # load model
    with open(model_path, 'rb') as f:
        model = pickle.load(f)
    # load test data
    _, _, test_df = load_dataset(DATASET_TO_ENUM_DICT[dataset])
    # predict
    predictions = model.predict(test_df['text'])
    # transform to dataframe
    predictions_df = pd.DataFrame(predictions, columns=test_df.columns[1:])
    return predictions_df

def clean_content(text):
    lemmatizer = WordNetLemmatizer()
    # tokenize
    clean_text = nltk.word_tokenize(text)
    # pos tag
    clean_text = nltk.pos_tag(clean_text)
    TAG_MAP = {'N': 'n', 'V': 'v', 'R': 'r', 'J': 'a'}
    clean_text = [(word, TAG_MAP.get(tag[0], 'n')) for word, tag in clean_text]
    # lemmatize
    clean_text = [lemmatizer.lemmatize(word, tag) for word, tag in clean_text]
    # remove punctuation marks
    clean_text = [w for w in clean_text if w not in string.punctuation]
    return ' '.join(clean_text)

def clean_df(df, text_col, out_col):
    df[out_col] = df[text_col].apply(clean_content)
    return df

def decision_tree_predict(dataset):
    # file path
    model_path = DATASET_TO_PATH_DICT[dataset]["DecisionTree"]
    # load model
    with open(model_path, 'rb') as f:
        model = pickle.load(f)
    # load test data
    _, _, test_df = load_dataset(DATASET_TO_ENUM_DICT[dataset])
    # apply cleaning
    predict_df = clean_df(test_df, 'text', 'text')
    # predict
    predictions = model.predict(predict_df['text'])
    # transform to dataframe
    predictions_df = pd.DataFrame(predictions, columns=predict_df.columns[1:])
    return predictions_df

def random_forest_predict(dataset):
    # file path
    model_path = DATASET_TO_PATH_DICT[dataset]["RandomForest"]
    # load model
    with open(model_path, 'rb') as f:
        model = pickle.load(f)
    # load test data
    _, _, test_df = load_dataset(DATASET_TO_ENUM_DICT[dataset])
    # apply cleaning
    predict_df = clean_df(test_df, 'text', 'text')
    # predict
    predictions = model.predict(predict_df['text'])
    # transform to dataframe
    predictions_df = pd.DataFrame(predictions, columns=predict_df.columns[1:])
    return predictions_df

def bert_predict(dataset):
    # file path
    model_path = DATASET_TO_PATH_DICT[dataset]["Bert"]
    # load model
    model = torch.load(model_path)
    # load test data
    _, val_df, test_df = load_dataset(DATASET_TO_ENUM_DICT[dataset])
    # predict on validation data
    predictions = model.predict(val_df)
    thresh, _ = tune_sigmoid_threshold(val_df[1:], predictions, f1_score, metric_params={"average": "macro", "zero_division": 0})
    # predict on test data
    predictions = model.predict(test_df)
    # apply threshold
    predictions = (predictions > thresh).astype(int)
    # transform to dataframe
    predictions_df = pd.DataFrame(predictions, columns=test_df.columns[1:])
    return predictions_df

def roberta_predict(dataset):
    # file path
    model_path = DATASET_TO_PATH_DICT[dataset]["Roberta"]
    # load model
    model = torch.load(model_path)
    # load test data
    _, val_df, test_df = load_dataset(DATASET_TO_ENUM_DICT[dataset])
    # predict on validation data
    predictions = model.predict(val_df)
    thresh, _ = tune_sigmoid_threshold(val_df[1:], predictions, f1_score, metric_params={"average": "macro", "zero_division": 0})
    # predict on test data
    predictions = model.predict(test_df)
    # apply threshold
    predictions = (predictions > thresh).astype(int)
    # transform to dataframe
    predictions_df = pd.DataFrame(predictions, columns=test_df.columns[1:])
    return predictions_df

def llama3_predict(dataset):
    # load predictions csv
    predictions_df = pd.read_csv(f"./results/llama3_{dataset}_predictions.csv")
    return predictions_df

PREDICTOR_DICT = {
    "Bayes": bayes_predict,
    "DecisionTree": decision_tree_predict,
    "RandomForest": random_forest_predict,
    "Bert": bert_predict,
    "Roberta": roberta_predict,
    "Llama3": llama3_predict
}

def predict(model, dataset):
    return PREDICTOR_DICT[model](dataset)

In [None]:
def custom_classification_report(scores_dict, labels_list):
    # print classification report
    print("Classification Report:")
    # print header
    print(f"{'Label':<20}{'Precision':<20}{'Recall':<20}{'F1-Score':<20}{'Jaccard':<20}{'Support':<20}")
    # print scores for each label
    for label in labels_list:
        print(f"{label:<20}{scores_dict[label]['precision']:<20}{scores_dict[label]['recall']:<20}{scores_dict[label]['f1-score']:<20}{scores_dict[label]['jaccard']:<20}{scores_dict[label]['support']:<20}")
    # print aggregated scores
    print(f"{'Macro avg':<20}{scores_dict['macro avg']['precision']:<20}{scores_dict['macro avg']['recall']:<20}{scores_dict['macro avg']['f1-score']:<20}{scores_dict['jaccard']['macro']:<20}{scores_dict['macro avg']['support']:<20}")
    print(f"{'Micro avg':<20}{scores_dict['micro avg']['precision']:<20}{scores_dict['micro avg']['recall']:<20}{scores_dict['micro avg']['f1-score']:<20}{scores_dict['jaccard']['micro']:<20}{scores_dict['micro avg']['support']:<20}")
    print(f"{'Weighted avg':<20}{scores_dict['weighted avg']['precision']:<20}{scores_dict['weighted avg']['recall']:<20}{scores_dict['weighted avg']['f1-score']:<20}{scores_dict['jaccard']['weighted']:<20}{scores_dict['weighted avg']['support']:<20}")
    # print membership score and jaccard samples
    print(f"{'Membership Score':<20}{scores_dict['membership']:<20}")
    print(f"{'Jaccard Samples':<20}{scores_dict['jaccard']['samples']:<20}")

def get_model_scores(model, dataset):
    predictions = predict(model, dataset)
    _, _, test_df = load_dataset(DATASET_TO_ENUM_DICT[dataset])
    labels_list = test_df.columns[1:]
    # collect scores in a dictionary
    scores = classification_report(test_df[labels_list].values, predictions.values, target_names=labels_list, output_dict=True)
    # add additional metrics
    # compute jaccard scores
    to_add = jaccard_score(test_df[labels_list].values, predictions.values, zero_division=0, average=None)
    for i, label in enumerate(labels_list):
        scores[label]['jaccard'] = to_add[i]
    # add aggregated jaccard scores
    scores['jaccard']['samples'] = jaccard_score(test_df[labels_list].values, predictions.values, zero_division=0, average='samples')
    scores['jaccard']['macro'] = jaccard_score(test_df[labels_list].values, predictions.values, zero_division=0, average='macro')
    scores['jaccard']['micro'] = jaccard_score(test_df[labels_list].values, predictions.values, zero_division=0, average='micro')
    scores['jaccard']['weighted'] = jaccard_score(test_df[labels_list].values, predictions.values, zero_division=0, average='weighted')
    # add membership score
    scores['membership'] = membership_score(test_df[labels_list].values, predictions.values)
    return scores

def print_results(model_name, dataset_name, predictions_df, targets_df):
    print(f"Model: {model_name}")
    print(f"Dataset: {dataset_name}")
    plot_multilabel_confusion_heatmap(targets_df.values, predictions_df.values, label_true=targets_df.columns, label_pred=targets_df.columns, normalize=True)
    # bar plot over classes
    plot_score_barplot(targets_df.values, predictions_df.values, targets_df.columns)
    # compute scores
    scores = get_model_scores(model_name, dataset_name)
    # print scores
    custom_classification_report(scores, targets_df.columns)
    return scores

def comparison_bar_plot(dataset):
    scores_dict = {}
    for model in MODELS:
        scores = get_model_scores(model, dataset)
        scores_dict[model] = scores['macro avg']['f1-score']
    # create bar plot
    plt.bar(scores_dict.keys(), scores_dict.values())
    plt.xlabel("Model")
    plt.ylabel("F1-Score")
    plt.title(f"Comparison of models on {dataset}")
    plt.show()

def print_dataset_results(dataset):
    scores = {}
    for model in MODELS:
        predictions_df = predict(model, dataset)
        _, _, targets_df = load_dataset(DATASET_TO_ENUM_DICT[dataset])
        cur_scores = print_results(model, dataset, predictions_df, targets_df[1:])
        scores[model] = cur_scores
    return scores

## Performance comparison on test data

In [None]:
# print results for all models on all datasets
all_scores = {}
for dataset in DATASETS:
    all_scores[dataset] = print_dataset_results(dataset)
    comparison_bar_plot(dataset)

## Statistical testing

In [None]:
# get best model for each dataset
best_models = {}
for dataset in DATASETS:
    best_model = max(all_scores[dataset].items(), key=lambda x: x[1]['macro avg']['f1-score'])
    best_models[dataset] = best_model
    # compare best model with all other models according to bootstrap test
    for model in MODELS:
        if model != best_model[0]:
            print(f"Bootstrap test between {best_model[0]} and {model} on {dataset}")
            bootstrap_test(all_scores[dataset][best_model[0]], all_scores[dataset][model], f1_score, metric_params={"average": "macro", "zero_division": 0})