In [1]:
import random
import plotly.express as px
import plotly.io as pio
from itertools import combinations
from scipy.spatial.distance import euclidean, cdist
from sklearn.model_selection import KFold
import numpy as np
import pandas as pd
import statistics, math
import os
from tqdm import tqdm

from eval import evaluate, mean_pearsons, calc_pearsons

In [5]:
# The path follows the challenges output format: /MuSe-2024/results/prediction_muse/perception
PATH_RESULTS = "PATH_TO_THE_PERCEPTION_RESULT"

# List of model for emsemble
MODELS_LIST = ["CNN-ATTN_[vit-fer]_[0.0005_32]", "CRNN_[vit-fer]_[0.0005_32]", "CNN_[vit-fer]_[0.0005_32]",
               "CRNN-ATTN_[vit-fer]_[0.0005_32]", "RNN_[vit-fer]_[0.0005_32]"]

# MODELS_LIST = ["IAF_[egemaps_vit-fer_bert-base-uncased]_[0.0005_32]", "CNN-ATTN_[vit-fer]_[0.0005_32]", "CNN_[vit-fer]_[0.0005_32]", 
#                "CRNN_[vit-fer]_[0.0005_32]", "CRNN-ATTN_[vit-fer]_[0.0005_32]", "RNN_[vit-fer]_[0.0005_32]",
#                "IAF_[w2v-msp_avhubert-large-noise-pt-noise-ft-30h_vit-fer]_[0.0005_32]", 
#                "IAF_[w2v-msp_vit-fer_avhubert-large-noise-pt-noise-ft-30h]_[0.0005_32]",
#                "IAF_[vit-fer_vit-fer_faus]_[0.0005_32]",
#                "IAF_[faus+vit-fer+avhubert-large-noise-pt-noise-ft-30h]_[0.0005_32]"]

# MODELS_LIST = ["CNN-ATTN_[vit-fer]_[0.0005_32]", "CRNN_[vit-fer]_[0.0005_32]", "CNN_[vit-fer]_[0.0005_32]",
#                "CRNN-ATTN_[vit-fer]_[0.0005_32]", "RNN_[vit-fer]_[0.0005_32]", 
#                "IAF_[w2v-msp_vit-fer_gpt2]_[0.0005_32]", "IAF_[ds_vit-fer_xlm-roberta-large]_[0.0005_32]",
#                "IAF_[w2v-msp_vit-fer_xlm-roberta-large]_[0.0005_32]", "IAF_[egemaps_vit-fer_gpt2]_[0.0005_32]",
#                "IAF_[ds_vit-fer_gpt2]_[0.0005_32]"]

# Trait group dictionary
TRAITS_DICT = {1: ["aggressive", "arrogant", "dominant", "assertiv"],
               2: ["risk", "independent", "leader_like", "confident", "enthusiastic"],
               3: [],
               4: ["friendly", "collaborative", "sincere", "likeable", "kind", "warm", "good_natured"]
              }


In [6]:
all_traits = [item for sublist in TRAITS_DICT.values() for item in sublist]

# REMEMBER to set it to "test" for challenge evaluation
def load_data(prefer='dev'):
    # Choose file name based on preference    
    file_name = "predictions_devel.csv" if prefer == 'dev' else "predictions_test.csv"

    # Create an empty dataframe to stack all dataframes loaded from csvs
    final_df = pd.DataFrame()

    # Iterate over all models
    for model in MODELS_LIST:

        # Iterate over all trait categories and traits in the traits_dict
        for trait_category, traits in TRAITS_DICT.items():

            # Iterate over all traits
            for trait in traits:
                trait_path = os.path.join(PATH_RESULTS, trait, model, file_name)

                # Load data using pandas
                df = pd.read_csv(trait_path)

                # Add 'model' and 'trait' info to loaded df as new columns
                df['model'] = model
                df['trait'] = trait

                # Stack current df to final_df
                final_df = pd.concat([final_df, df])

    # Return the final dataframe
    return final_df

In [7]:
df = load_data(prefer='dev')  # load development data

traits = ['aggressive', 'arrogant', 'dominant', 'assertiv', 'risk', 'independent', 'leader_like', 'confident', 'enthusiastic', 'friendly', 'collaborative', 'sincere', 'likeable', 'kind', 'warm', 'good_natured']

# Create a category type based on your ordered trait list
df['trait'] = pd.Categorical(df['trait'], categories=traits, ordered=True)

# Sort by trait (in the order specified by your 'traits' list), then by 'meta_col_0'
df_sorted = df.sort_values(by=['trait', 'meta_col_0'])

df = df_sorted.copy()

In [8]:
df_trait = df.copy()

def filter_by_trait(df, traits_dict, target_trait):
    for key in traits_dict:
        if target_trait in traits_dict[key]:
            group_traits = traits_dict[key]
            filtered_df = df[df['trait'].isin(group_traits)] # this will create a new df for each group
            return filtered_df

# call the function
#df_trait = filter_by_trait(df_trait, TRAITS_DICT, 'aggressive')
df_trait

Unnamed: 0,meta_col_0,prediction,label,model,trait
0,9,0.377227,0.3529,CNN-ATTN_[vit-fer]_[0.0005_32],aggressive
0,9,0.354732,0.3529,CRNN_[vit-fer]_[0.0005_32],aggressive
0,9,0.324465,0.3529,CNN_[vit-fer]_[0.0005_32],aggressive
0,9,0.380100,0.3529,CRNN-ATTN_[vit-fer]_[0.0005_32],aggressive
0,9,0.390050,0.3529,RNN_[vit-fer]_[0.0005_32],aggressive
...,...,...,...,...,...
57,227,0.634571,0.6979,CNN-ATTN_[vit-fer]_[0.0005_32],good_natured
57,227,0.537733,0.6979,CRNN_[vit-fer]_[0.0005_32],good_natured
57,227,0.645341,0.6979,CNN_[vit-fer]_[0.0005_32],good_natured
57,227,0.645373,0.6979,CRNN-ATTN_[vit-fer]_[0.0005_32],good_natured


In [9]:
D1 = 0.2
D2 = 0.1
W_INTER = 1.0

def measure_distance_for_model(df, group, model_name, d1=D1, d2=D2):
    # Measure distance for each model
    # TODO: normalize the distance per items in the list
    # the output is a list, so sum of all items in the list can be not good
    group_traits = TRAITS_DICT[group]
    filter_cond = (df['trait'].isin(group_traits)) & (df['model'] == model_name)
    filtered_df = df[filter_cond]
    
    trait_pairs = list(combinations(group_traits, 2))
    distances = []
    for pair in trait_pairs:
        predictions = filtered_df[filtered_df['trait'].isin(pair)]['prediction']

        if len(predictions) == 2:
            prediction1, prediction2 = predictions
            difference = abs(prediction1 - prediction2)

            
            acceptable_distance = min(prediction1, prediction2) * ((1-d1) if group == 1 else (1-d2))
            
            # if distance is within acceptable level, distance is 0
            if difference <= acceptable_distance:
                normalized_distance = 0
            else:
                min_possible_difference = acceptable_distance
                max_possible_difference = 1  # since predictions lie in the range 0 to 1
                normalized_distance = (difference - min_possible_difference) / (max_possible_difference - min_possible_difference)
                
            distances.append(normalized_distance)
    return distances

def measure_distance_for_model_centroid(df, group, model_name, radius):
    # Measure distance for each model
    # TODO: normalize the distance per items in the list
    # the output is a list, so sum of all items in the list can be not good
    group_traits = TRAITS_DICT[group]
    filter_cond = (df['trait'].isin(group_traits)) & (df['model'] == model_name)
    filtered_df = df[filter_cond]
    variable_list = filtered_df.prediction.tolist()
    
    # Get the center point
    center_point = np.mean(variable_list)    
  
    # Calculate distances from each point to boundary of circle
    dist_to_center = np.abs(variable_list - center_point)
    dist_to_boundary = np.where(dist_to_center <= radius, 0, dist_to_center - radius)
        
    return np.sum(dist_to_boundary)  


def calculate_distance_between_groups(df, group1, group2, model_name):
    group1_traits = TRAITS_DICT[group1]
    group2_traits = TRAITS_DICT[group2]

    group1_filtered = df[(df['trait'].isin(group1_traits)) & (df['model'] == model_name)]
    group2_filtered = df[(df['trait'].isin(group2_traits)) & (df['model'] == model_name)]
    
    distances = cdist([[x] for x in group1_filtered['prediction'].tolist()], [[x] for x in group2_filtered['prediction'].tolist()])
    avg_distances_group1_to_group2 = distances.mean(axis=1)
    
    return sum(avg_distances_group1_to_group2)

def rank_models(df, model_names, d1=D1, d2=D2, w_inter=W_INTER):
    model_scores = []

    for model_name in model_names:

        group1_distances_model = measure_distance_for_model_centroid(df, 1, model_name, d1)
        group4_distances_model = measure_distance_for_model_centroid(df, 4, model_name, d2)
        
        distances_group1_to_group4 = calculate_distance_between_groups(df, 1, 4, model_name)

        ######################## NOTE #####################        
        # The smaller the intra_group_sum and the higher the inter_group_sum, the better the model.
        #print(intra_group_sum, inter_group_sum)
        score = -group1_distances_model -group4_distances_model + w_inter*distances_group1_to_group4 
        
        model_scores.append((model_name, score))

    # Sort the models by their scores in descending order
    model_scores.sort(key=lambda x: x[1], reverse=True)

    return model_scores

def get_best_models(df, d1=D1, d2=D2, w_inter=W_INTER):
    results = []
    
    temp_df = df.groupby('meta_col_0')[['meta_col_0', 'prediction', 'label', 'model', 'trait']].apply(lambda x: x.loc[x['model'] == rank_models(x, MODELS_LIST, d1, d2, w_inter)[0][0]])
    result_df = temp_df.reset_index(drop=True)
    return result_df

def calculate_score(best_df):
    # calculate the score 
    unique_traits = best_df['trait'].unique()
    traits_to_idx_map = {trait: idx for idx, trait in enumerate(unique_traits)}
    num_classes = len(unique_traits)

    # Convert df to list of dictionaries
    df_dict = best_df.to_dict('records')

    result_prediction, result_label = {}, {}
    
    for rec in df_dict:
        key = (rec['meta_col_0'], rec['model'])
        trait_position = traits_to_idx_map[rec['trait']]
        
        if key not in result_prediction:
            result_prediction[key] = np.zeros(num_classes)
            result_label[key] = np.zeros(num_classes)

        result_prediction[key][trait_position] = rec['prediction']
        result_label[key][trait_position] = rec['label'] if 'label' in best_df.columns else 0
            
    prediction_arrays = np.array(list(result_prediction.values()))
    label_arrays = np.array(list(result_label.values()))

    result, _data = mean_pearsons(prediction_arrays, label_arrays)    
    return result, _data

In [10]:
result_df = get_best_models(df_trait)
result_df.to_csv(f'test_sum_devel_{D1:.2f}_{D2:.2f}_{W_INTER:.2f}.csv', index=False)
result_df.head(16)

Unnamed: 0,meta_col_0,prediction,label,model,trait
0,9,0.377227,0.3529,CNN-ATTN_[vit-fer]_[0.0005_32],aggressive
1,9,0.35725,0.3382,CNN-ATTN_[vit-fer]_[0.0005_32],arrogant
2,9,0.502012,0.4902,CNN-ATTN_[vit-fer]_[0.0005_32],dominant
3,9,0.5552,0.6569,CNN-ATTN_[vit-fer]_[0.0005_32],assertiv
4,9,0.505092,0.5147,CNN-ATTN_[vit-fer]_[0.0005_32],risk
5,9,0.651515,0.652,CNN-ATTN_[vit-fer]_[0.0005_32],independent
6,9,0.631275,0.652,CNN-ATTN_[vit-fer]_[0.0005_32],leader_like
7,9,0.677542,0.7451,CNN-ATTN_[vit-fer]_[0.0005_32],confident
8,9,0.616529,0.4804,CNN-ATTN_[vit-fer]_[0.0005_32],enthusiastic
9,9,0.574478,0.5539,CNN-ATTN_[vit-fer]_[0.0005_32],friendly


In [11]:
result, _data = calculate_score(result_df)
result_df.to_csv(f'test_sum_devel_{D1:.2f}_{D2:.2f}_{W_INTER:.2f}_{np.round(result,4)}.csv', index=False)
result, _data

(0.49820454332930497,
 array([0.53447626, 0.58122181, 0.483436  , 0.22132553, 0.46534533,
        0.47060199, 0.51041311, 0.43339587, 0.2533439 , 0.60332572,
        0.56650368, 0.47097401, 0.6363766 , 0.55132877, 0.57362998,
        0.61557412]))