# Support analysis

In [None]:
from libs.dataset_loader import MulTweEmoDataset
import pandas as pd
import numpy as np

In [None]:
labels = MulTweEmoDataset.get_labels()

def get_support(dataset, labels, bert=False):
    support = {label:0 for label in labels}
    support["total"] = 0
    for i, row in dataset.iterrows():
        for label in labels:
            if row[label]:
                support[label] += 1
                support["total"] += 1
    return support

dataset, _ = MulTweEmoDataset.load(csv_path="dataset/MulTweEmo.csv", test_split = None)
train, _ = MulTweEmoDataset.load(csv_path="dataset/train_MulTweEmo.csv", test_split = None)
test, _ = MulTweEmoDataset.load(csv_path="dataset/test_MulTweEmo.csv", test_split = None)
val, _ = MulTweEmoDataset.load(csv_path="dataset/val_MulTweEmo.csv", test_split = None)

In [None]:
np.array(test["labels"].to_list()).sum(axis=1).mean()

In [None]:
text_only = True
if text_only:
    dataset = dataset.drop_duplicates(subset=["id"])
    train = train.drop_duplicates(subset=["id"])
    test = test.drop_duplicates(subset=["id"])
    val = val.drop_duplicates(subset=["id"])


In [None]:
dataset_support = get_support(dataset, labels)
train_support = get_support(train, labels)
test_support = get_support(test, labels)
val_support = get_support(val, labels)

In [None]:
support_dict = {"Train": train_support, "Test": test_support, "Val": val_support, "Dataset": dataset_support}
support_table = pd.DataFrame(support_dict)
support_table.index = support_table.index.str.capitalize()

In [None]:
percent_support_dict = {key: {label: value[label]/value["total"] for label in value.keys()} for key, value in support_dict.items()}
percent_support_table = pd.DataFrame(percent_support_dict)
percent_support_table.index = percent_support_table.index.str.capitalize()

In [None]:
print(support_table.to_latex())

# Gold and silver label comparison

In [None]:
def create_silver_dataset(raw_dataset_path="./dataset/MulTweEmo_raw.pkl",
                        csv_path="./dataset/silver_MulTweEmo.csv",  
                        mode="label",
                        label_name="multi_label",
                        seed_threshold=0.81,
                        top_seeds:(int|dict)=None):
        
    if mode != "threshold" and mode != "label":
        raise ValueError("mode must be chosen between \"top\", \"threhsold\" or \"label\"")

    with open(raw_dataset_path, 'rb') as file:
        dataset = pd.compat.pickle_compat.load(file)

    dataset = dataset[dataset["M_Anger"].notnull()].copy().reset_index(drop=True)
    
    dataset = dataset.drop(columns = ["M_gold_multi_label", "T_gold_multi_label"])
    dataset["img_count"] = dataset["path_photos"].apply(len)

    labels = MulTweEmoDataset.get_labels(drop_something_else=True)
    labels.remove("neutral")
    
    emotions_m = {emotion: "M_"+emotion.capitalize() for emotion in labels}
    emotions_t = {emotion: "T_"+emotion.capitalize() for emotion in labels}
    
    label_columns = list(emotions_m.values()) + list(emotions_t.values())
    columns = ["id", "tweet", "img_count", "seeds"] + label_columns

    dataset[list(emotions_t.values())] = 0

    if mode=="label":
        columns
        def set_labels(row):
            if label_name == "multi_label":
                for label in row[label_name]:
                    row[emotions_t[label]] = 1
            elif label_name == "uni_label":
                label = row[label_name]
                row[emotions_t[label]] = 1
            else:
                raise ValueError()
            return row
    else:
        def set_labels(row):
            for e, d in row["seeds"].items():
                avg = sum(d.values())/len(d.values())
                if avg > seed_threshold:
                    row[emotions_t[e]] = 1
            return row
        
    def seeds_avg(row):
        avgs = {}
        for e, d in row["seeds"].items():
            avgs[e] = sum(d.values())/len(d.values())
        row["avg_seeds"] = avgs
        return row
    
    dataset = dataset.apply(set_labels, axis=1)
    
    dataset = dataset.apply(seeds_avg, axis=1)

    if top_seeds != None:
        labels.remove("neutral")
        labels.remove("something else")
        indices = []
        if type(top_seeds) == int:
            for label in labels:
                indices += pd.DataFrame(dataset["avg_seeds"].to_list()).sort_values(by=label, ascending=False).head(top_seeds).index.to_list()
        else:
            for label, top_n in top_seeds.items():
                indices += pd.DataFrame(dataset["avg_seeds"].to_list()).sort_values(by=label, ascending=False).head(top_n).index.to_list()
        dataset = dataset.iloc[indices].sort_index().drop_duplicates(subset="id")
    dataset = dataset[columns+["avg_seeds"]]

    
    for label in emotions_m.values():
        dataset[label] = dataset[label].apply(lambda x: 1 if x>=2 else 0)

    return dataset

In [None]:
import numpy as np
import sklearn.metrics as skm

labels = MulTweEmoDataset.get_labels()
labels.remove("neutral")

gs_dataset = create_silver_dataset(
    label_name="uni_label",
    # mode="threshold",
    # seed_threshold=0.84
    )

gs_dataset["label_silver"] = MulTweEmoDataset._build_label_matrix(gs_dataset, gs_dataset.columns[gs_dataset.columns.str.startswith("T_")])
gs_dataset["label_gold"] = MulTweEmoDataset._build_label_matrix(gs_dataset, gs_dataset.columns[gs_dataset.columns.str.startswith("M_")])

emotions_t = {emotion: "T_"+emotion.capitalize() for emotion in labels}
gs_dataset = gs_dataset[(gs_dataset[emotions_t.values()].sum(axis=1))!=0]

class_report = skm.classification_report(np.array(gs_dataset["label_gold"].to_list()), np.array(gs_dataset["label_silver"].to_list()), target_names=labels, zero_division=0, output_dict=True)
class_report = pd.DataFrame(class_report).T
class_report["support"] = class_report["support"].astype(int)
print(class_report.to_latex(float_format="%.3f"))

In [None]:
import numpy as np
import sklearn.metrics as skm
labels = MulTweEmoDataset.get_labels()
labels.remove("neutral")

gs_dataset = create_silver_dataset(
    label_name="multi_label",
    )

def get_precision(threshold, gs_dataset):
    
    emotions_t = {emotion: "T_"+emotion.capitalize() for emotion in labels}

    def set_labels(row):
        for e, v in row["avg_seeds"].items():
            if v > threshold:
                row[emotions_t[e]] = 1
        return row
    
    gs_dataset[list(emotions_t.values())] = 0
    gs_dataset = gs_dataset.apply(set_labels, axis=1)

    gs_dataset["label_silver"] = MulTweEmoDataset._build_label_matrix(gs_dataset, gs_dataset.columns[gs_dataset.columns.str.startswith("T_")])
    gs_dataset["label_gold"] = MulTweEmoDataset._build_label_matrix(gs_dataset, gs_dataset.columns[gs_dataset.columns.str.startswith("M_")])

    tmp_dataset = gs_dataset[(gs_dataset["label_silver"].apply(sum))!=0]

    class_report = skm.classification_report(np.array(tmp_dataset["label_gold"].to_list()), np.array(tmp_dataset["label_silver"].to_list()), target_names=labels, zero_division=0, output_dict=True)
    class_report = pd.DataFrame(class_report).T
    class_report["support"] = class_report["support"].astype(int)
    return class_report["precision"]["samples avg"]

def get_support(threshold, gs_dataset):
    
    emotions_t = {emotion: "T_"+emotion.capitalize() for emotion in labels}

    def set_labels(row):
        for e, v in row["avg_seeds"].items():
            if v > threshold:
                row[emotions_t[e]] = 1
        return row
    
    gs_dataset[list(emotions_t.values())] = 0
    gs_dataset = gs_dataset.apply(set_labels, axis=1)

    gs_dataset["label_silver"] = MulTweEmoDataset._build_label_matrix(gs_dataset, gs_dataset.columns[gs_dataset.columns.str.startswith("T_")])
    gs_dataset["label_gold"] = MulTweEmoDataset._build_label_matrix(gs_dataset, gs_dataset.columns[gs_dataset.columns.str.startswith("M_")])

    tmp_dataset = gs_dataset[(gs_dataset["label_silver"].apply(sum))!=0]

    return tmp_dataset.shape[0]


Number of labels by emotion for each tweet

In [None]:
labels = MulTweEmoDataset.get_labels()
labels.remove("neutral")

count_dict = {e: [0]*len(labels) for e in labels}
for e1 in labels:
    tmp_labels = labels.copy()
    tmp_labels.remove(e1)
    for i, row in gs_dataset.iterrows():
        e_count = 0
        if row[emotions_t[e1]]:
            for e2 in tmp_labels:
                if row[emotions_t[e2]]:
                    e_count += 1
            count_dict[e1][e_count] += 1
count_dict

In [None]:
n_labels = list(range(1,len(labels)+1))
avg_count_dict = {e: sum([a*b for a,b in zip(n_labels, v)])/sum(v) for e,v in count_dict.items()}
avg_count_dict

In [None]:
import matplotlib.pyplot as plt
import numpy as np

x = np.linspace(0.4, 0.95, 100)

y = [get_precision(v, gs_dataset) for v in x]
fig = plt.figure(figsize=(5,3))
ax = fig.gca()

ax.set_xlabel("Threshold")
ax.set_ylabel("Samples average precision")
plt.grid()

ax.axhline(0, color='#777777')
ax.axvline(0, color='#777777')
ax.set_xticks([x/10 for x in range(4, 11)])
ax.set_xlim((0.5, 1))
plt.plot(x, y)
plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

x = np.linspace(0.4, 0.95, 100)

y = [get_support(v, gs_dataset) for v in x]
fig = plt.figure(figsize=(5,3))
ax = fig.gca()

ax.set_xlabel("Threshold")
ax.set_ylabel("Number of tweets")
plt.grid()

ax.axhline(0, color='#777777')
ax.axvline(0, color='#777777')
ax.set_xticks([x/10 for x in range(4, 11)])
ax.set_xlim((0.5, 1))
plt.plot(x, y)
plt.show()

# Number of labels by emotion for each tweet

In [None]:
labels = MulTweEmoDataset.get_labels()
dataset, _ = MulTweEmoDataset.load(csv_path="dataset/MulTweEmo.csv", test_split = None)

count_dict = {e: [0]*len(labels) for e in labels}
for e1 in labels:
    tmp_labels = MulTweEmoDataset.get_labels()
    tmp_labels.remove(e1)
    for i, row in dataset.iterrows():
        e_count = 0
        if row[e1] == 1:
            for e2 in tmp_labels:
                if row[e2]:
                    e_count += 1
            count_dict[e1][e_count] += 1
count_dict

In [None]:
(dataset.drop_duplicates(subset="id")[labels].sum(axis=1)).sum()/804

In [None]:
n_labels = list(range(1,len(labels)+1))
avg_count_dict = {e: sum([a*b for a,b in zip(n_labels, v)])/sum(v) for e,v in count_dict.items()}
avg_count_dict

# Caption examples

In [None]:
from libs.dataset_loader import MulTweEmoDataset
import pandas as pd
import regex as re

In [None]:
labels = MulTweEmoDataset.get_labels()
dataset, _ = MulTweEmoDataset.load(csv_path="dataset/MulTweEmo.csv", test_split = None)

In [None]:
sampled_dataset = pd.DataFrame()
for e in labels:
    sampled_dataset = pd.concat([sampled_dataset, dataset[dataset[e]==1].sample(random_state=1)])
sampled_dataset = pd.concat([sampled_dataset, dataset[dataset[e]==1].sample(n=3, random_state=1)])

In [None]:
def get_labels(row):
    id2label = MulTweEmoDataset.get_id2label()
    labels = []
    for i, label in enumerate(row["labels"]):
        if label:
            labels.append(id2label[i].capitalize())
    return labels

In [None]:
table = ""
image_list = []
build_path = lambda x: "images/dataset/captions/" + re.search("(?<=./dataset/images/)(.*)", x).group(1)
includegraphics = lambda x: f"\\rowincludegraphics[width=0.2\\textwidth]{{{x}}}"
count = 0
for i in range(len(sampled_dataset)//4):
    table += "\\textbf{Image}"
    for j, row in sampled_dataset[i*4: (i+1)*4].iterrows():
        table += " & " + includegraphics(build_path(row["img_path"]))
        image_list.append(row["img_path"])
    table += "\\\\\n\\addlinespace \\hline \\addlinespace\n"
    
    table += "\\textbf{Caption}"
    for j, row in sampled_dataset[i*4: (i+1)*4].iterrows():
        table += " & " + row["caption"]
    table += "\\\\\n\\addlinespace \\hline \\addlinespace\n"
    
    table += "\\textbf{Labels}"
    for j, row in sampled_dataset[i*4: (i+1)*4].iterrows():
        table += " & " + ", ".join(get_labels(row))
    table += "\\\\\n\\addlinespace \\hline \\addlinespace\n"


In [None]:
print(table)

In [None]:
import os
import shutil
src_files = image_list
for file_name in src_files:
    # full_file_name = os.path.join(src, file_name)
    if os.path.isfile(file_name):
        shutil.copy(file_name, "C:/Users/Utente/Desktop/Multimodal-Sentiment-Analysis/Tesi/images/dataset/captions")

# Top 10 trials for each metric baseline

In [None]:
import pandas as pd

In [None]:
file = "bert"
df = pd.read_csv(f"Report1/{file}_study.csv")
df = df.drop(columns=["State", "Number"])
df

In [None]:
def remove_prefix(text, prefix):
    if text.startswith(prefix):
        return text[len(prefix):]
    return text
columns = df.columns.to_list()
columns = [remove_prefix(column, "Param ") for column in columns]
columns

In [None]:
df.sort_values(by="loss", ascending=True if "loss"=="loss" else False).head(10).corr()

In [None]:
# metric = "loss"
# metric = "f1_score"
metric = "exact_match"

print((df.sort_values(by=metric, ascending=True if metric=="loss" else False).head(10).to_latex(header=columns,
                                                                        caption=f"Top 10 {metric.replace('_', ' ')} results for {file.replace('_', ' ')}",
                                                                        # label=f"tab:{file}_top_10",
                                                                        longtable=False)).replace("_", "\_"))

# Analysis of best trials

Implement widget drop down menu from studies

In [None]:
from ipywidgets import widgets
import optuna
from IPython.display import clear_output
import pandas as pd

In [None]:
# storage_name = "sqlite:///final_study_2.db"
storage_name = "sqlite:///MulTweEmo_study_new_split.db"

options = optuna.study.get_all_study_names(storage_name)
dropdown = widgets.Dropdown(options=options) 
study_name=dropdown.value

study = optuna.create_study(study_name=study_name, storage=storage_name, load_if_exists=True, directions=["minimize", "maximize", "maximize"])
trials = study.best_trials

def on_change(change):
    global study_name
    global study
    global trials
    if change['type'] == 'change' and change['name'] == 'value':
        study_name = change['new']
        clear_output()
        display(dropdown)
        study = optuna.create_study(study_name=study_name, storage=storage_name, load_if_exists=True, directions=["minimize", "maximize", "maximize"])
        trials = study.best_trials

dropdown.observe(on_change)
display(dropdown)


In [None]:
# trials_dict = {model:studies[model].best_trials for model in models}

In [None]:
# trials = trials_dict["base"]
# trials.sort(key = lambda x, : x.user_attrs["samples avg"]["f1-score"], reverse=True)
# trials

In [None]:
params = list(study.best_trials[0].params.keys())
keys = ["number"] + study.metric_names + params
# keys

In [None]:
def trials2dict(trials, keys):
    best_trials_dict = {key:[] for key in keys}
    
    for trial in trials:
        best_trials_dict["number"].append(trial.number)
        for i, metric in enumerate(study.metric_names):
            best_trials_dict[metric].append(trial.values[i])
        for param in params:
            # if model == "siglip" and param == "batch_size":
            #     best_trials_dict[param].append(8)
            # else:
            best_trials_dict[param].append(trial.params[param])

    return best_trials_dict
best_trials = trials2dict(study.best_trials, keys)
# best_trials

In [None]:
def format_eye(x):
    return x
def format_float(x):
    return '%.4f' % x
def format_long_float(x):
    return '%.2e' % x

formatters={key: format_eye for key in keys}
formatters["learning_rate"] = format_long_float
formatters["Loss"] = format_float
formatters["F1-score"] = format_float
formatters["exact_match"] = format_float
formatters["Accuracy"] = formatters.pop("exact_match")
formatters["dropout"] = format_float
# formatters = list(formatters.values())

In [None]:
tmp_df = pd.DataFrame.from_dict(best_trials).rename(columns={"number": "Trial", "loss": "Loss", "f1_score": "F1-score", "exact_match": "Accuracy"})
print("\\begin{adjustbox}{width=\\textwidth,center=\\textwidth}")
tmp_df = tmp_df.style.highlight_max(axis=0, props="textit:--rwrap;", subset=["F1-score", "Accuracy"])
tmp_df = tmp_df.highlight_min(axis=0, props="textit:--rwrap;", subset=["Loss"])
tmp_df = tmp_df.format(formatter=formatters)
tmp_df = tmp_df.hide(axis="index")
print((tmp_df.to_latex(hrules=True, column_format="r|rrr|"+"r"*len(params))).replace("_", "\_"))
print("\\end{adjustbox}")

In [None]:
tmp_df = pd.DataFrame.from_dict(best_trials).rename(columns={"exact_match": "accuracy", "number": "trial"})
tmp_df = tmp_df[["trial", "loss", "f1_score", "accuracy"]]
print("\\begin{adjustbox}{width=\\textwidth,center=\\textwidth}")
tmp_df = tmp_df.style.highlight_max(axis=0, props="textbf:--rwrap;", subset=["f1_score", "accuracy"])
tmp_df = tmp_df.highlight_min(axis=0, props="textbf:--rwrap;", subset=["loss"])
tmp_df = tmp_df.format(formatter=formatters)
tmp_df = tmp_df.hide(axis="index")
print((tmp_df.to_latex(hrules=True)).replace("_", " "))
print("\\end{adjustbox}")

In [None]:
pd.DataFrame.from_dict(best_trials)

# Individual emotion analysis

In [None]:
from ipywidgets import widgets
import optuna
from IPython.display import clear_output
import pandas as pd

In [None]:
# storage_name = "sqlite:///final_study.db"
storage_name = "sqlite:///MulTweEmo_study_new_split.db"

metric = "loss"

options = optuna.study.get_all_study_names(storage_name)
dropdown = widgets.Dropdown(options=options) 
study_name=dropdown.value

study = optuna.create_study(study_name=study_name, storage=storage_name, load_if_exists=True, directions=["minimize", "maximize", "maximize"])
trials = study.trials
trials.sort(key = lambda x, : x.values[1], reverse=True)

def on_change(change):
    global study_name
    global study
    global trials
    if change['type'] == 'change' and change['name'] == 'value':
        study_name = change['new']
        clear_output()
        display(dropdown)
        study = optuna.create_study(study_name=study_name, storage=storage_name, load_if_exists=True, directions=["minimize", "maximize", "maximize"])
        trials = study.trials
        trials.sort(key = lambda x, : x.values[1], reverse=True)

dropdown.observe(on_change)
display(dropdown)


In [None]:
from plotly.io import show

fig = optuna.visualization.plot_param_importances(study, target_name=["Loss", "F1-score", "Accuracy"])

fig.update_layout(
    autosize=False,
    width=1350,
    height=450,
    title=None,
    margin=dict(l=20, r=20, t=20, b=20),
    font_size=17,
)

newnames = {"loss": "Loss", "f1_score": "F1-score", "exact_match" :"Accuracy"}
fig.for_each_trace(lambda t: t.update(name = newnames[t.name]))
show(fig)

In [None]:
import json
from plotly.io import from_json

targets = ["Loss", "F1-score", "Accuracy"]
fig = optuna.visualization.plot_parallel_coordinate(study, target=lambda x: x.values[0], target_name="Loss")
fig_json = json.loads(fig.to_json())

for i in range(1, len(targets)):
    tmp_fig = optuna.visualization.plot_parallel_coordinate(study, target=lambda x: x.values[i], target_name=targets[i])
    tmp_fig_json = json.loads(tmp_fig.to_json())
    fig_json["data"][0]["dimensions"].insert(i, tmp_fig_json["data"][0]["dimensions"][0])

fig = from_json(json.dumps(fig_json))
fig.update_layout(
    autosize=False,
    width=1200,
    height=450,
    title=None,
    # margin=dict(l=20, r=20, t=20, b=20),
    font_size=14,
)

import plotly.express as px
# print(px.colors.sequential.Reds)
fig.data[0].line.colorscale = px.colors.sequential.Reds
# fig.data[0].line.reversescale = not fig.data[0].line.reversescale
show(fig)

In [None]:
def trials2dict(trials):
    params = list(study.best_trials[0].params.keys())
    keys = ["number"] + params 
    trial_dict = {key: [] for key in keys}

    for trial in trials:
        trial_dict["number"].append(trial.number)
        for param in params:
            # if model == "siglip" and param == "batch_size":
            #     best_trials_dict[param].append(8)
            # else:
            trial_dict[param].append(trial.params[param])

    return trial_dict

In [None]:
import pandas as pd
# trials.sort(key = lambda x, : x.values[0], reverse=True)
# top_trials = trials[:10]
trials.sort(key = lambda x, : x.values[1], reverse=True)
top_trials = trials[:10]
trials.sort(key = lambda x, : x.values[2], reverse=True)
top_trials += trials[:10]
top_trials = pd.DataFrame(trials2dict(top_trials))
top_trials = top_trials.drop_duplicates()
summary = top_trials.describe()
summary = summary.drop(columns=["number"])
row_list = summary.index.to_list()
row_list.remove("count")
print(top_trials.shape)
print(summary.loc[row_list].to_latex(float_format="%.4g", escape=True))
summary

In [None]:
top_trials

In [None]:
from libs.dataset_loader import MulTweEmoDataset
labels = MulTweEmoDataset.get_labels()
trials.sort(key = lambda x, : x.values[1], reverse=True)
trial_attr = trials[0].user_attrs.copy()
support = {label: int(trial_attr[label]["support"]) for label in labels}
support

In [None]:
emotion_metrics = trials[0].user_attrs.copy()

In [None]:
no_pred_samples = emotion_metrics.pop("no_prediction_samples")
no_pred_samples

In [None]:
emotion_metrics_table = pd.DataFrame(emotion_metrics).T
emotion_metrics_table = emotion_metrics_table.loc[labels + [metric for metric in emotion_metrics_table.index if metric not in labels]]
emotion_metrics_table["support"] = emotion_metrics_table["support"].astype(int)
print(emotion_metrics_table.to_latex(float_format="%.5f"))

In [None]:
# for emotion, metrics in emotion_metrics.items():
#     metrics["support"] /= sum(support.values())
# emotion_metrics

In [None]:
emotion_metrics_table = emotion_metrics_table.drop(columns=["support"])
emotion_metrics_table.columns = emotion_metrics_table.columns.map(str.capitalize)

In [None]:
f1_agg_list = ["macro avg", "micro avg", "weighted avg", "samples avg"]
ax = emotion_metrics_table.loc[labels + f1_agg_list].plot(kind="bar", figsize=(7,4), yticks=[x / 10 for x in range(0,11)])
# ax = emotion_metrics_table.loc[labels].plot(kind="bar", figsize=(6,4), yticks=[x / 10 for x in range(0,11)])
ax.set_xticklabels([l.get_text().capitalize() for l in ax.get_xticklabels()], rotation=45, ha='right');
ax.set_axisbelow(True)
ax.yaxis.grid(True)


In [None]:
from math import sqrt
tmp = []
no_pred_samples_tot = 0
for i in range(10):
    no_pred_samples_tot += trials[i].user_attrs["no_prediction_samples"]
no_pred_samples_var = 0
for i in range(10):
    no_pred_samples_var += (trials[i].user_attrs["no_prediction_samples"]-no_pred_samples_tot/10)**2
print(sqrt(no_pred_samples_var/10))
print(no_pred_samples_tot/10)

In [None]:
tmp = []
# trials.sort(key = lambda x, : x.values[0], reverse=False)
trials.sort(key = lambda x, : x.values[1], reverse=True)
for i in range(10):
    tmp.append(pd.DataFrame(trials[i].user_attrs).drop(columns="no_prediction_samples").T.drop(columns=["support"]))
best_avg_metrics = pd.concat(tmp).groupby(level=0).mean()
best_avg_metrics.columns = [c.capitalize() for c in best_avg_metrics.columns]
best_avg_metrics = best_avg_metrics.loc[labels]
ax = best_avg_metrics.plot(kind="bar", figsize=(6,4), yticks=[x / 10 for x in range(0,11)])
ax.set_xticklabels([l.get_text().capitalize() for l in ax.get_xticklabels()], rotation=45, ha='right');
ax.set_axisbelow(True)
ax.legend(fontsize=12, loc="upper right")
plt.rc('xtick', labelsize=14)
plt.rc('ytick', labelsize=14)
ax.yaxis.grid(True)


# Top 10 metrics average

In [None]:
from ipywidgets import widgets
import optuna
from IPython.display import clear_output
import pandas as pd

In [None]:
storage_name = "sqlite:///MulTweEmo_study_new_split.db"

metrics = ["loss", "f1-score", "accuracy"]

options = optuna.study.get_all_study_names(storage_name)
dropdown = widgets.Dropdown(options=options) 
study_name=dropdown.value

study = optuna.create_study(study_name=study_name, storage=storage_name, load_if_exists=True, directions=["minimize", "maximize", "maximize"])

trials = {metric: study.trials for metric in metrics}
for i, metric in enumerate(metrics):
    trials[metric].sort(key = lambda x: x.values[i], reverse=True if metric != "loss" else False)

def on_change(change):
    global study_name
    global study
    global trials
    if change['type'] == 'change' and change['name'] == 'value':
        study_name = change['new']
        clear_output()
        display(dropdown)
        study = optuna.create_study(study_name=study_name, storage=storage_name, load_if_exists=True, directions=["minimize", "maximize", "maximize"])
        trials = {metric: study.trials for metric in metrics}
        for i, metric in enumerate(metrics):
            trials[metric].sort(key = lambda x: x.values[i], reverse=True if metric != "loss" else False)

dropdown.observe(on_change)
display(dropdown)

In [None]:
avg = {metric: {metric_2: 0 for metric_2 in metrics} for metric in metrics}
top_n = 10
for metric_2 in metrics:
    for i, metric in enumerate(metrics):
        for j in range(top_n):
            avg[metric_2][metric] += trials[metric_2][j].values[i]
        avg[metric_2][metric] /= top_n
avg

In [None]:
import math
std = {metric: {metric_2: 0 for metric_2 in metrics} for metric in metrics}

for metric_2 in metrics:
    for i, metric in enumerate(metrics):
        for j in range(top_n):
            std[metric_2][metric] += (trials[metric_2][j].values[i] - avg[metric_2][metric]) ** 2
        std[metric_2][metric] /= top_n
        std[metric_2][metric] = math.sqrt(std[metric_2][metric])
std

# Model comparison by emotion

In [None]:
import optuna
import pandas as pd
from libs.dataset_loader import MulTweEmoDataset
import matplotlib as plt

In [None]:
# storage_name = "sqlite:///final_study.db"
storage_name = "sqlite:///MulTweEmo_study_new_split.db"

study_name_list = [
    "base", 
    "jina", 
    "large", 
    "siglip", 
    "blip2"
    ]

# study_name_list = [
#     "base",
#     # "base_text_only",
#     # "jina",
#     "base_freeze-weights",
#     "base_append-captions",
#     # "jina_append-captions",
#     # "base_freeze-weights_augment_0.84",
#     "base_process_emojis",
#     "base_append-captions_process_emojis",
#     "base_augment_0.82",
#     "base_augment_0.84",
#     # "base_append-captions_augment_0.84",
#     # "base_append-captions_freeze-weights_augment_0.84"
#     ]

# study_name_list = [
#     "bert_final",
#     "base_final",
#     "base_augment_final",
#     "base_captions_final"
#     ]


# study_name_list = ["base_augment_0.82", "base_augment_0.84"]

studies = {}
trials = {}

metric = 1

for name in study_name_list:
    studies[name] = optuna.create_study(study_name=name+"_study", storage=storage_name, load_if_exists=True, directions=["minimize", "maximize", "maximize"])
    trials[name] = studies[name].trials
    if name == "base_append-captions_freeze-weights_augment_0.84":
        trials[name].pop(1)
        trials[name].pop(0)
    trials[name].sort(key = lambda x: x.values[metric], reverse=False if metric==0 else True)
# trials

In [None]:
import optuna
from plotly.io import show
fig = optuna.visualization.plot_edf(list(studies.values()), target=lambda x: x.values[metric], target_name="Accuracy")

fig.update_layout(
    autosize=False,
    width=750,
    height=450,
    title=None,
    margin=dict(l=20, r=20, t=20, b=20),
    font_size=17,
)

# names_list = [
#     "Base CLIP",
#     "Jina CLIP",
#     "Large CLIP",
#     ]

# names_list = [
#     "Baseline",
#     "Frozen weights",
#     "Captions",
#     "Emojis",
#     "Captions+Emojis",
#     "Augment (t=0.82)",
#     "Augment (t=0.84)",
#     ]
# newnames = {f"{s}_study": names_list[i] for i, s in enumerate(study_name_list)}
# fig.for_each_trace(lambda t: t.update(name = newnames[t.name]))
show(fig)

In [None]:
trials["base"][0].params

In [None]:
labels = MulTweEmoDataset.get_labels()
def get_trial_f1_scores(trial):
    user_attrs = trial.user_attrs.copy()
    no_pred_samples = user_attrs.pop("no_prediction_samples")
    f1_scores = {key:user_attrs[key]["f1-score"] for key in labels}
    for key in user_attrs.keys():
        if key not in labels:
            f1_scores[key] = user_attrs[key]["f1-score"]
    return f1_scores

def get_no_pred_samples(trial):
    user_attrs = trial.user_attrs
    return user_attrs["no_prediction_samples"]
# get_trial_f1_scores(trials["bert"][0])

In [None]:
f1_scores = {name: get_trial_f1_scores(trials[name][0]) for name in study_name_list}
f1_scores = pd.DataFrame(f1_scores)
# f1_scores["support"] = pd.Series(val_support).astype(int)
f1_scores

In [None]:
f1_agg_list = ["macro avg", "micro avg", "weighted avg", "samples avg"]
names = ["Base CLIP", "Jina CLIP", "Large CLIP", "SigLIP", "BLIP-2"]

# names = [
#     "Baseline",
#     "Frozen weights",
#     "Captions",
#     "Emojis",
#     "Captions+Emojis",
#     "Augment (t=0.82)",
#     "Augment (t=0.84)",
#     ]
ax = f1_scores[study_name_list].loc[labels + f1_agg_list].plot(kind="bar", figsize=(12,4.5))
ax.legend(labels=names)
ax.set_xticklabels([l.get_text().capitalize() for l in ax.get_xticklabels()], rotation=45, ha='right');

ax.set_axisbelow(True)
ax.yaxis.grid(True)

In [None]:
print(f1_scores.to_latex())

In [None]:
# f1_scores = {name: get_trial_f1_scores(trials[name][0]) for name in study_name_list}
# f1_scores = pd.DataFrame(f1_scores)
# # f1_scores["support"] = pd.Series(val_support).astype(int)
# f1_scores


for name in study_name_list:

    trials[name].sort(key = lambda x: x.values[1], reverse=True)

tmp = []
for i in range(10):
    tmp.append(pd.DataFrame({name: get_trial_f1_scores(trials[name][i]) for name in study_name_list}))
best_avg_metrics = pd.concat(tmp).groupby(level=0).mean()
best_avg_metrics = best_avg_metrics.loc[labels + f1_agg_list]


ax = best_avg_metrics.plot(kind="bar", figsize=(16,5), yticks=[x / 10 for x in range(0,11)])
ax.legend(labels=names,loc="upper right", fontsize=16)

ax.set_xticklabels([l.get_text().capitalize() for l in ax.get_xticklabels()], rotation=45, ha='right');
ax.set_axisbelow(True)
plt.rc('xtick', labelsize=18)
plt.rc('ytick', labelsize=18)
ax.yaxis.grid(True)

In [None]:
metrics = ["loss", "f1-score", "accuracy"]
results = {study: {metric: 0 for metric in metrics} for study in study_name_list}
top_n = 1

for study in study_name_list:
    for i, metric in enumerate(metrics):
        trials[study].sort(key = lambda x: x.values[i], reverse=True if metric != "loss" else False)
        for j in range(top_n):
            results[study][metric] += trials[study][j].values[i]
        results[study][metric] /= top_n
print(pd.DataFrame(results).T.to_latex(float_format="%.4f").replace("_", " "))

# Zero shot CLIP

In [None]:
from libs.dataset_loader import MulTweEmoDataset
import numpy as np
import sklearn.metrics as skm

In [None]:
mode="M"
train, _ = MulTweEmoDataset.load(csv_path="./dataset/train_MulTweEmo.csv", mode=mode, drop_something_else=True, force_override=True, test_split=None, seed=123)
emotions = MulTweEmoDataset.get_labels()
emotions.remove("something else")

In [None]:
with open("zero_shot_predictions", "rb") as f:
    predictions = np.load(f)

In [None]:
threshold = 0.1
f1_scores = {}
models = ["base", "jina", "large"]
for i in range(predictions.shape[0]):
    tmp = predictions[i] > threshold
    count = 0
    f1_scores[models[i]] = {}
    for sample in tmp:
        if 1 not in sample:
            count+=1
    results = skm.classification_report(list(train["labels"]), tmp, zero_division=0, target_names=emotions, output_dict=True)
    for emotion in emotions:
        f1_scores[models[i]][emotion] = results[emotion]["f1-score"] 
    print(models[i])
    display(pd.DataFrame(results).T)
    print(count, "samples with no label")
    print("\n\n\n")

In [None]:
threshold = 0.3
f1_scores = {}
models = ["base", "jina", "large"]
for i in range(predictions.shape[0]):
    tmp = predictions[i] > threshold
    count = 0
    f1_scores[models[i]] = {}
    for sample in tmp:
        if 1 not in sample:
            count+=1
    results = skm.classification_report(list(train["labels"]), tmp, zero_division=0, target_names=emotions, output_dict=True)
    for emotion in emotions:
        f1_scores[models[i]][emotion] = results[emotion]["f1-score"] 
pd.DataFrame(f1_scores).plot(kind="bar", yticks=[x / 10 for x in range(0,11)], figsize=(10,5))

In [None]:
threshold_list = [x / 10 for x in range(1,6)]
f1_scores = {}
predictions_index = 1
for threshold in threshold_list:
    tmp = predictions[predictions_index] > threshold
    count = 0
    f1_scores[threshold] = {}
    for sample in tmp:
        if 1 not in sample:
            count+=1
    results = skm.classification_report(list(train["labels"]), tmp, zero_division=0, target_names=emotions, output_dict=True)
    for emotion in emotions:
        f1_scores[threshold][emotion] = results[emotion]["f1-score"] 
pd.DataFrame(f1_scores).plot(kind="bar", title=f"{models[predictions_index]} clip", yticks=[x / 10 for x in range(0,11)], figsize=(10,5))

# Zero-shot LLaVA

In [None]:
f1_scores = {}
count = 0
for sample in predictions:
    if 1 not in sample:
        count+=1
results = skm.classification_report(list(test["labels"]), predictions, zero_division=0, target_names=emotions)
print(results)
print(skm.accuracy_score(list(test["labels"]), predictions))
# for emotion in emotions:
#     f1_scores[emotion] = results[emotion]["f1-score"] 
# display(pd.DataFrame(results).T)
print(count, "samples with no label")
# print("\n\n\n")

In [None]:
f1_scores = {}
for i in range(4):
    f1_scores[f"Prompt {i}"] = {}
    llava_results_path = f"./zero_shot_results/list/results_{i}.np"
    with open(llava_results_path, "rb") as f:
        predictions = np.load(f)
    results = skm.classification_report(list(test["labels"]), predictions, zero_division=0, target_names=emotions, output_dict=True)
    for key in results.keys():
        f1_scores[f"Prompt {i}"][key] = results[key]["f1-score"] 
pd.DataFrame(f1_scores).plot(kind="bar", figsize=(10,5))

In [None]:
print(pd.DataFrame(f1_scores).to_latex(float_format="%.3f"))

In [None]:
llava_results_path = "./zero_shot_results/list/results_3.np"
with open(llava_results_path, "rb") as f:
    predictions = np.load(f)

f1_scores = {}
count = 0
for sample in predictions:
    if 1 not in sample:
        count+=1
results = skm.classification_report(list(test["labels"]), predictions, zero_division=0, target_names=emotions)
print(results)
print(skm.accuracy_score(list(test["labels"]), predictions))

print(count, "samples with no label")

In [None]:
results_table = pd.DataFrame(skm.classification_report(list(test["labels"]), predictions, zero_division=0, target_names=emotions, output_dict=True)).T.drop(columns=["support"])
results_table.plot(kind="bar")

# Simple Baseline

In [None]:
import pandas as pd
from libs.dataset_loader import MulTweEmoDataset
def count_labels(dataset):
    labels = MulTweEmoDataset.get_labels()
    labels
    count = {}
    for i in labels:
        count[i] = 0

    for i, row in dataset.iterrows():
        for label in labels:
            count[label] += 1 if row[label] else 0
    return count

In [None]:
test, _ = MulTweEmoDataset.load(csv_path="./dataset/val_MulTweEmo.csv",mode="M", drop_something_else=True, force_override=True, test_split=None)
count_labels(test)

### Random

In [None]:
import numpy as np
import sklearn.metrics as skm
preds = np.random.rand(test.shape[0], 9) > 0.5
print(skm.classification_report(test["labels"].to_list(), preds, target_names=MulTweEmoDataset.get_labels(), zero_division=0))

### All ones

In [None]:
preds = np.ones((test.shape[0], 9), dtype=int)
print(skm.classification_report(test["labels"].to_list(), preds, target_names=MulTweEmoDataset.get_labels()))

### All Joy

In [None]:
preds = np.zeros((test.shape[0], 9), dtype=int)
for i in range(preds.shape[0]):
    preds[i][4] = 1
print(skm.classification_report(test["labels"].to_list(), preds, target_names=MulTweEmoDataset.get_labels()))

# Check number of tokens

In [None]:
from transformers import AutoProcessor
from libs.model import TweetMERConfig
from libs.dataset_loader import MulTweEmoDataset

model = "base"

processor = AutoProcessor.from_pretrained(TweetMERConfig.get_feature_extractor_name(model), trust_remote_code=True)
dataset, _ = MulTweEmoDataset.load(csv_path="./dataset/train_MulTweEmo.csv",mode="M", drop_something_else=True, test_split=None, emoji_decoding=False)
dataset["tweet"] = dataset["tweet"] + " " + dataset["caption"]

In [None]:
processor.tokenizer.model_max_length

In [None]:
processed_inputs = processor(
                            text = list(dataset["tweet"]), 
                            padding=False, 
                            # truncation=True, 
                            return_tensors="np"
                            )

In [None]:
count = 0
sum = 0
min = 1024
max = 0
for inputs in processed_inputs["input_ids"]:
    n_tokens = inputs.shape[0]
    if n_tokens > processor.tokenizer.model_max_length:
        # print(inputs.shape[0])
        if n_tokens < min: min=n_tokens
        if n_tokens > max: max=n_tokens
        sum += n_tokens
        count += 1
sum /= count
print(" & ".join(str(x) for x in [min, max, sum, count, count/len(dataset)]), "\\\\")

# Correlation between labels for dataset and predictions

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

cols = MulTweEmoDataset.get_labels()
fig = plt.figure(figsize=(12, 7))
sns.heatmap(dataset[cols].corr(), annot = True, fmt = '.3f')
plt.show()
plt.close()

In [None]:
from libs.dataset_loader import MulTweEmoDataset
from libs.utils.ModelWrappers import TweetMERWrapper
from datasets import Dataset
from libs.model import TweetMERModel
import sklearn.metrics as skm

class TweetMSAObjective(object):
    def __init__(self, clip_version="jina", append_captions:bool=False, process_emojis:bool=False, data_augment:bool=False, mode="M", freeze_weights:bool=False, seed:int=123):
        self.train, _ = MulTweEmoDataset.load(csv_path="./dataset/train_MulTweEmo.csv", mode=mode, drop_something_else=True,
                                               emoji_decoding=process_emojis, test_split=None, seed=seed)
        self.val, _ = MulTweEmoDataset.load(csv_path="./dataset/val_MulTweEmo.csv", mode=mode, drop_something_else=True,
                                               emoji_decoding=process_emojis, test_split=None, seed=seed)


        if append_captions:
            tweet_caption_data = self.train.apply(lambda x: x["tweet"] + " " + x["caption"], axis=1)
            if data_augment:
                tweet_caption_train = self.train.copy()
                tweet_caption_train["tweet"] = tweet_caption_data
                # caption_train = self.train.copy()
                # caption_train["tweet"] = caption_train["caption"]
                # self.train = pd.concat(self.train, caption_train)
                self.train = pd.concat(self.train, tweet_caption_train)
            else:
                self.train["tweet"] = tweet_caption_data
        #    self.val["tweet"] = self.val.apply(lambda x: x["tweet"] + " "  + x["caption"], axis=1)


        self.train = Dataset.from_pandas(TweetMERModel.preprocess_dataset(dataset=self.train, model=clip_version, text_column="tweet", label_column="labels"))
        self.val = Dataset.from_pandas(TweetMERModel.preprocess_dataset(dataset=self.val, model=clip_version, text_column="tweet", label_column="labels"))
        self.clip_version = clip_version
        self.freeze_weights = freeze_weights

    def __call__(self):
        model = TweetMERWrapper(n_epochs=3, warmup_steps=30, learning_rate=6.263149136769504e-05, 
                                 batch_size=16, n_layers=4, n_units=76,
                                 dropout=0.16, clip_version=self.clip_version, freeze_weights=self.freeze_weights)
        model.fit(self.train, self.train["labels"])
        predictions, results =  model.score(self.val, self.val["labels"])
        label_names = MulTweEmoDataset.get_labels()
        metrics = skm.classification_report(self.val["labels"], predictions, output_dict=True, zero_division=0, target_names=label_names)
        count = 0
        for sample in predictions:
            if 1 not in sample:
                count+=1
        del model
        return count, results["loss"], results["f1_score"], results["exact_match"], metrics

In [None]:
obj = TweetMSAObjective(clip_version="base")

In [None]:
obj()

In [None]:
import pandas as pd
from libs.dataset_loader import MulTweEmoDataset

In [None]:
train, _ = MulTweEmoDataset.load(csv_path="./dataset/new_split.bak/train_MulTweEmo.csv", mode="M", drop_something_else=True, test_split=None, seed=123)
val, _ = MulTweEmoDataset.load(csv_path="./dataset/new_split.bak/val_MulTweEmo.csv", mode="M", drop_something_else=True, test_split=None, seed=123)
test, _ = MulTweEmoDataset.load(csv_path="./dataset/new_split.bak/test_MulTweEmo.csv", mode="M", drop_something_else=True, test_split=None, seed=123)

In [None]:
train.shape

In [None]:
count = 0
train_id = train.id.values
for value in test.id.values:
    if value in train_id:
        count += 1
count

In [None]:
val["id"].isin(train_id)

In [None]:
tmp = val[val["id"].isin(train_id)]

In [None]:
labels = MulTweEmoDataset.get_labels()
count = {label: 0 for label in labels}
for i, row in tmp.iterrows():
    for label in labels:
        if row[label]:
            count[label] += 1
count

# Leaky ReLU plot

In [None]:
np.vectorize(relu)([3.2, 2.1])

In [None]:
y = x.copy()
y[y<0] = 0
y.shape

In [None]:
import matplotlib.pyplot as plt
import torch
import numpy as np

def leaky_relu(x, alpha=0.1):
    return max(alpha*x, x)

def relu(x:float):
    if x > 0:
        return x
    else:
        return 0

sigmoid = torch.nn.functional.sigmoid

x = np.linspace(-21, 21, 1000)

# y = np.vectorize(leaky_relu)(x)
y = x.copy()
y[y<0] = 0
# y = sigmoid(torch.tensor(x))

fig = plt.figure(figsize=(5,3))
ax = fig.gca()

ax.set_xlim(-20, 20)

ax.set_xlabel("Input")
ax.set_ylabel("Output")
plt.grid()
# ax.set_aspect("equal")
ax.axhline(0, color='#777777')
ax.axvline(0, color='#777777')
plt.plot(x, y)
plt.show()

In [None]:
sigmoid(torch.tensor(x))

In [None]:
def leaky_relu(x, alpha=0.1):
    return max(alpha*x, x)

def relu(x):
    return max(0, x)

x = np.linspace(-21, 21, 1000)
y = np.vectorize(relu)(x)

fig = plt.figure()
ax = fig.gca()

ax.set_xlim(-20, 20)

ax.set_xlabel("Input")
ax.set_ylabel("Output")
plt.grid()
ax.set_aspect("equal")
ax.axhline(0, color='#777777')
ax.axvline(0, color='#777777')
plt.plot(x, y)
plt.show()