In [1]:
import os
import sys

sys.path.append(os.path.abspath("../../"))

%matplotlib inline
import matplotlib.pyplot as plt
import my_utils as utils
import numpy as np
import pandas as pd

BASE_PATH_CONCAT = os.path.join("results", "early_concat")
BASE_PATH_FUSION = os.path.join("results", "late_fusion")

## IEMOCAP



In [None]:
DATASET = "IEMOCAP"


def get_result_dataframe(dataset: str) -> pd.DataFrame:
    fusion_path = os.path.join(BASE_PATH_FUSION, dataset, "preds_test.json")
    fusion_audio_path = os.path.join(
        BASE_PATH_FUSION, dataset, "preds_test_no_text.json"
    )
    concat_path = os.path.join(BASE_PATH_CONCAT, dataset, "preds_test_normal.json")
    concat_audio_path = os.path.join(
        BASE_PATH_CONCAT, dataset, "preds_test_audio_only.json"
    )

    fusion_df = utils.build_result_dataframe(fusion_path)
    fusion_audio_df = utils.build_result_dataframe(fusion_audio_path)
    concat_df = utils.build_result_dataframe(concat_path)
    concat_audio_df = utils.build_result_dataframe(concat_audio_path)

    assert len(fusion_df) == len(concat_df)

    results = utils.merge_result_dataframes(
        [fusion_df, concat_df, fusion_audio_df, concat_audio_df],
        ["fusion", "concat", "fusion_audio", "concat_audio"],
    )
    results = utils.extract_dialogue_information(results)

    return results


df = get_result_dataframe(DATASET)
df.head(3)

In [None]:
labels = df["target"].value_counts().index.to_list()
labels

In [None]:
from functools import partial


iemocap_positive_emotions = ["happy", "excited"]
iemocap_negative_emotions = ["angry", "sad", "frustrated"]


iemocap_classify_sentiment = partial(
    utils.classify_sentiment,
    positive=iemocap_positive_emotions,
    negative=iemocap_negative_emotions,
)

df["emotion_sentiment"] = df["target"].apply(iemocap_classify_sentiment)
df_long = df.melt(
    id_vars=["emotion_sentiment", "target", "utterance"],
    value_vars=["output", "output_concat"],
    var_name="source",
    value_name="output_long",
)
df_long["emotion_sentiment_prediction"] = df_long["output_long"].apply(
    iemocap_classify_sentiment
)


utils.print_confusion_matrix(
    df_long,
    target_labels=["positive", "negative"],
    output_column="emotion_sentiment_prediction",
    target_column="emotion_sentiment",
    xlab_name="Predicted Sentiment",
    ylab_name="True Sentiment",
    text_size=28,
    label_scaling_adjustment=4,
    name="images/sentiment_iemocap_cm.png",
)

In [None]:
utils.print_confusion_matrix(
    df_long,
    target_labels=iemocap_negative_emotions,
    output_column="output_long",
    target_column="target",
    text_size=20,
    label_scaling_adjustment=2,
    name="images/negative_iemocap_cm.png",
)

In [None]:
utils.print_confusion_matrix(
    df_long,
    target_labels=iemocap_positive_emotions,
    output_column="output_long",
    target_column="target",
    text_size=28,
    label_scaling_adjustment=4,
    name="images/positive_iemocap_cm.png",
)

## Audio

In [None]:
df_long_audio = df.melt(
    id_vars=["emotion_sentiment", "target", "utterance"],
    value_vars=["output_fusion_audio", "output_concat_audio"],
    var_name="source",
    value_name="output_long",
)
df_long_audio

In [None]:
utils.print_confusion_matrix(
    df_long_audio,
    target_labels=labels[::-1],
    output_column="output_long",
    target_column="target",
    title="Confusion Matrix (Audio)",
)
utils.print_confusion_matrix(
    df_long,
    target_labels=labels[::-1],
    output_column="output_long",
    target_column="target",
    title="Confusion Matrix (Combined)",
)

In [None]:
utils.print_confusion_matrix(
    df_long_audio,
    target_labels=["excited", "happy"],
    output_column="output_long",
    target_column="target",
    title="Confusion Matrix (Audio)",
)

In [10]:
def process_text(text):
    import re
    results = []
    lines = text.strip().split('\n')
    # Remove any empty lines or lines that are just whitespace
    lines = [line.strip() for line in lines if line.strip()]
    
    blocks = []
    current_block = []
    for line in lines:
        if line.startswith('['):
            # Start of a new block
            if current_block:
                blocks.append(current_block)
                current_block = []
            current_block.append(line)
        else:
            current_block.append(line)
    if current_block:
        blocks.append(current_block)
    
    # Function to parse the header line and extract the target emotion label
    def parse_header(header):
        # Assuming header format is:
        # [START_TIME - END_TIME] TURN_NAME EMOTION [V, A, D]
        # We can use regex to extract the EMOTION
        pattern = r'^\[.*?\]\s+(.*?)\s+(.*?)\s+\[.*?\]$'
        match = re.match(pattern, header)
        if match:
            turn_name = match.group(1)
            emotion = match.group(2)
            return emotion
        else:
            return None
    
    for block in blocks:
        # Process each block
        # The first line is the header line
        header = block[0]
        emotion = parse_header(header)
        if emotion is None:
            continue
        target = emotion
        # Initialize annotations list
        annotations = []
        for line in block[1:]:
            if line.startswith('C-'):
                # Extract the labels
                # Format is "C-E1:    Excited;    ()"
                label_line = line.split(':',1)[1]  # Get the part after the colon
                label_line = label_line.strip()
                # Remove the '()' at the end if present
                if label_line.endswith('()'):
                    label_line = label_line[:-2].strip()
                # Now, labels are separated by semicolons
                labels = label_line.split(';')
                # Remove empty strings and strip spaces
                labels = [label.strip() for label in labels if label.strip()]
                annotations.extend(labels)
        result = {"target": target, "annotations": annotations}
        results.append(result)
    return results


In [None]:
from glob import glob

emo_eval = glob("../../datasets/iemocap/IEMOCAP_full_release/Session*/dialog/EmoEvaluation/*.txt")
data = []

for emo_eval_file in emo_eval:
    with open(emo_eval_file) as f:
        text = f.read()
        processed = process_text(text)
        dialogue = emo_eval_file.split('/')[-1].split('.')[0]
        processed = list(map(lambda x: {**x, "dialogue": dialogue}, processed))
        data.extend(processed)

data = pd.DataFrame.from_records(data)
data

In [None]:
data[data["target"] == "hap"]["annotations"].apply(lambda x: x.count("Excited")).sum() / len(data[data["target"] == "hap"])
data[data["target"] == "exc"]["annotations"].apply(lambda x: x.count("Happiness")).sum() / len(data[data["target"] == "exc"])



In [13]:
fusion_model = df["target"] == df["output"] 
concat_model = df["target"] == df["output_concat"]  

In [None]:
from sklearn.metrics import f1_score

df["any_correct"] = df["target"]
df.loc[~fusion_model & ~concat_model, "any_correct"] = df[~fusion_model & ~concat_model]["output"]
f1_score(df["target"], df["any_correct"], labels=labels, average="weighted")


In [None]:
utils.print_confusion_matrix_difference(df, target_labels=labels[::-1], output_column1="output", output_column2="output_concat", name = "images/discussion_iemocap_diff_cm.png", title="Confusion Matrix difference on IEMOCAP")

In [None]:
from plotnine import *


def iou(df, label_true, label_pred=None):
    label_pred = label_pred if label_pred is not None else label_true
    diff = len(df[(df["target"] == label_true) & (df["output"] == label_pred)]) - len(
        df[(df["target"] == label_true) & (df["output_concat"] == label_pred)]
    )
    inter = df[
        (df["target"] == label_true) & (df["output"] == label_pred)
    ].index.intersection(
        df[(df["target"] == label_true) & (df["output_concat"] == label_pred)].index
    )
    union = df[(df["target"] == label_true) & (df["output"] == label_pred)].index.union(
        df[(df["target"] == label_true) & (df["output_concat"] == label_pred)].index
    )

    numerator = len(inter)
    denominator = len(union)
    denominator_norm = len(union) - abs(diff)
    result_iou = numerator / denominator if denominator != 0 else np.nan
    result_norm = numerator / denominator_norm if denominator_norm != 0 else np.nan
    return {"iou": result_iou, "agreement": result_norm, "denominator": denominator_norm, "numerator": numerator}


cm_iou, cm_agreement = [], []
for label_true in labels:
    preds_iou, preds_agreement = [], []
    for label_pred in labels:
        r = iou(df, label_true, label_pred)
        preds_iou.append(r["denominator"])
        preds_agreement.append(r["agreement"])
    cm_iou.append(preds_iou)
    cm_agreement.append(preds_agreement)


melted_iou = utils._prepare_confusion_matrix(cm_iou, labels).merge(
    utils._prepare_confusion_matrix(cm_agreement, labels),
    on=["actual", "predicted"],
    suffixes=["_iou", "_agreement"],
)
melted_iou["label"] = (
    (melted_iou["count_agreement"] * 100).round(1).astype(str)
    + "\n("
    + (melted_iou["count_iou"]).round(1).astype(str) + ")"
)
melted_iou

p = (
    ggplot(melted_iou, aes("factor(predicted)", "factor(actual)", fill="count_agreement * 100"))
    + geom_tile(show_legend=True, width=0.98, height=0.98)
    + geom_text(aes(label="label"), size=16, show_legend=False, color = "white")
    + ggtitle("Agreement Matrix IEMOCAP")
    + ylab("True Emotion")
    + xlab("Predicted Emotion")
    + scale_y_discrete(limits=labels[::-1])
    + scale_fill_gradient2(low="#DC3440", mid = "#FFC685", high="#1fae08", limits=[0, 100], midpoint=60)
    + theme_bw()
    + guides(fill="none")
    + theme(
        title=element_text(size=18),  # Increases title size
        axis_title=element_text(
            size=20
        ),  # Increases axis title size
        axis_text=element_text(
            size=16
        ),  # Increases axis tick label size
        axis_text_x=element_text(rotation=45),  # Rotates x-axis tick labels
    )
)
p.show()
p.save("images/discussion_iemocap_agreement.png", width=7, height=7, dpi=300)

In [None]:
iou(df, "frustrated", "neutral")

In [13]:
concat_model = df["target"] == df["output_concat"]
fusion_model = df["target"] == df["output"]

In [None]:
utils.print_confusion_matrix(
    df[~concat_model & ~fusion_model],
    target_labels=labels[::-1],
    output_column="output",
    target_column="output_concat",
    xlab_name="Fusion Prediction",
    ylab_name="Concat Prediction",
    title="False-Prediction Agreement",
    show_percentage=False,
    name="images/discussion_iemocap_false_prediction_agreement.png",
    text_size=20
)

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

for label in labels:
    y_true = df["target"].apply(lambda x: 1 if x == label else 0)
    y_pred_fusion = df["output"].apply(lambda x: 1 if x == label else 0)
    y_pred_concat = df["output_concat"].apply(lambda x: 1 if x == label else 0)

    pre_label = precision_score(y_true, y_pred_fusion) * 100
    rec_label = recall_score(y_true, y_pred_fusion) * 100
    pre_label_ierc = precision_score(y_true, y_pred_concat) * 100
    rec_label_ierc = recall_score(y_true, y_pred_concat) * 100
    f1_fusion = f1_score(y_true, y_pred_fusion) * 100
    f1_concat = f1_score(y_true, y_pred_concat) * 100
    
    print(f"F1-score\t für Label {label}: Fusion: {f1_fusion:.2f} - Concat: {f1_concat:.2f} - Different: {(f1_fusion - f1_concat):.2f}")
    print(f"Precision\t für Label {label}: Fusion: {pre_label:.2f} - Concat: {pre_label_ierc:.2f} - Differenz: {(pre_label - pre_label_ierc):.2f}")
    print(f"Recall\t\t für Label {label}: Fusion: {rec_label:.2f} - Concat: {rec_label_ierc:.2f} - Differenz: {(rec_label - rec_label_ierc):.2f}")
    print()

### Weitere Statistiken
Hier werden weitere Statistiken genannt, wie das Klassifizierungsverhalten der einzelnen Teilmodelle ist.
Es wurden einmal die Ergebnisse von InstructERC, dem Late-Fusion Modell audio output, und dem kombinierten Modell berechnet


In [None]:
# 2. late-fusion Model (F)
df_F = df[fusion_model]
print(f"In total, {len(df_F)} samples were recognized by the late-fusion model.")

# 3. concat Model (C)
df_C = df[concat_model]
print(f"In total, {len(df_C)} samples were recognized by the concat model.")


# 5. NOT late-fusion Model (¬F)
df_not_F = df[not_fusion_model]
print(f"In total, {len(df_not_F)} samples were NOT recognized by the late-fusion model.")

# 6. NOT concat Model (¬C)
df_not_C = df[not_concat_model]
print(f"In total, {len(df_not_C)} samples were NOT recognized by the concat model.")


# 9. late-fusion AND concat Models (F ∧ C)
df_F_and_C = df[fusion_model & concat_model]
print(f"In total, {len(df_F_and_C)} samples were recognized by both the late-fusion and concat models.")


# 13. late-fusion AND NOT concat Models (F ∧ ¬C)
df_F_and_not_C = df[fusion_model & not_concat_model]
print(f"In total, {len(df_F_and_not_C)} samples were recognized by the late-fusion model but NOT by the concat model.")


# 15. concat AND NOT late-fusion Models (C ∧ ¬F)
df_C_and_not_F = df[concat_model & not_fusion_model]
print(f"In total, {len(df_C_and_not_F)} samples were recognized by the concat model but NOT by the late-fusion model.")


# 18. NOT late-fusion AND NOT concat Models (¬F ∧ ¬C)
df_not_F_and_not_C = df[not_fusion_model & not_concat_model]
print(f"In total, {len(df_not_F_and_not_C)} samples were NOT recognized by both the late-fusion and concat models.")
