In [5]:
import re
import numpy as np
import pandas as pd
import liwc
import pandas as pd
from collections import Counter
from sklearn.metrics import cohen_kappa_score
from itertools import combinations
import matplotlib.pyplot as plt
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer

In [None]:
#READ 

results_file = '../data/MIND/F1_results/llama-3.2-1b-moral'

# Leer JSON
with open(results_file, 'r') as f:
    data = json.load(f)

rows = []

for exp in data['experiments']:
    experiment_name = exp['experiment']
    dataset_name = exp['dataset']
    results = exp['results']
    
    row = {
        'Dataset': dataset_name,
        'Experiment': experiment_name
    }
    
    for i in range(6):
        i_str = str(i)
        if i_str in results:
            row[i_str] = round(results[i_str]['f1-score'] * 100, 2)  # Multiplicar *100 y redondear
        else:
            row[i_str] = 0.00

    if 'macro avg' in results:
        row['MacroF1'] = round(results['macro avg']['f1-score'] * 100, 2)
    else:
        row['MacroF1'] = 0.00

    rows.append(row)

df = pd.DataFrame(rows)
df = df[['Dataset', 'Experiment', '0', '1', '2', '3', '4', '5', 'MacroF1']]
df


# MoralBert

- Usar aplicación MoralBert y seleccionar el detalle de las anotaciones que coincida en los modelos.
- Este modelo devuelve las probabilidades de 10 valores morales (polaridad)
- Link: https://huggingface.co/spaces/vjosap/MoralBERTApp

In [None]:
df=pd.read_csv('../data/OMC/omc_morality.csv')
df

In [None]:
def moralbert_predictions(df,key=False):
    moral_columns = df.columns[5:] 
    #obtener la moral con mayor probabilidad
    df['moralbert'] = df[moral_columns].idxmax(axis=1)
    df['moral_value'] = df[moral_columns].max(axis=1)  # Valor máximo de esa columna
    df.loc[df['moral_value'] < 0.5, 'moralbert'] = 'nonmoral'
    df.drop(columns=moral_columns, inplace=True)
    df.drop('moral_value', axis=1 ,inplace=True)
    return df

df = moralbert_predictions(df,key=True)
df

# Modelos GSI
Se cargan los modelos de HF del GSI, hay dos opciones de modelo (con y sin polaridad)
- Usar API: https://moral-values-api.gsi.upm.es/predict"


In [None]:
import requests
import unicodedata
def to_title_case(name):
    """Convierte una anotación moral en mayúsculas a minúsculas, eliminando caracteres especiales"""
    try:
        normalized_name = unicodedata.normalize('NFD', name)
    except Exception as e:
        print(name)
        raise e
    
    # Eliminar acentos y caracteres especiales
    ascii_name = ''.join(c for c in normalized_name if unicodedata.category(c) != 'Mn')

    
    return re.sub(r'[^a-zA-Z0-9]', '', ascii_name).lower()



MORAL_API_URL = "https://moral-values-api.gsi.upm.es/predict"

def analyze_moral_polarity(text):
    request_payload = {
        "text": text,
        "model_name": "multimoralpolarity_model"}
    
    try:
        response = requests.post(MORAL_API_URL, json=request_payload)
        response.raise_for_status()
        prediction = response.json()

        if "Probabilities" in prediction:
            highest_moral = max(prediction["Probabilities"], key=prediction["Probabilities"].get)
            #highest_confidence = prediction["Probabilities"][highest_moral]
            return highest_moral
        else:
            return "No moral values detected.", 0.0
            
    except requests.exceptions.RequestException as e:
        print("Error analyzing moral values:", e)
        return "Error", 0.0


def analyze_moral_trait(text):
    request_payload = {
        "text": text,
        "model_name": "multimoral_model"}
    
    try:
        response = requests.post(MORAL_API_URL, json=request_payload)
        response.raise_for_status()
        prediction = response.json()

        if "Probabilities" in prediction:
            highest_moral = max(prediction["Probabilities"], key=prediction["Probabilities"].get)
            #highest_confidence = prediction["Probabilities"][highest_moral]
            return highest_moral
        else:
            return "No moral values detected.", 0.0
            
    except requests.exceptions.RequestException as e:
        print("Error analyzing moral values:", e)
        return "Error", 0.0


In [None]:
df['roberta_mmp'] = df['text'].apply(lambda x: pd.Series(analyze_moral_polarity(x)))
df['roberta_mmp'] = df['roberta_mmp'].apply(lambda x: pd.Series(to_title_case(x)))


df['roberta_mm'] = df['text'].apply(lambda x: pd.Series(analyze_moral_trait(x)))
df['roberta_mm']=df['roberta_mm'].replace({'NO-MORAL':'no moral','AUTHORITY/SUBVERSION':'authority','LOYALTY/BETRAYAL':'loyalty','CARE/HARM':'care','FAIRNESS/CHEATING':'fairness','PURITY/DEGRADATION':'purity'})
df.head()

# LIWC

In [None]:
df=pd.read_csv('../data/OMC/final_omc_morality.csv')
df

In [None]:
parse, category_names = liwc.load_token_parser('../data/mfd2.0.dic')

def tokenize(text):
    '''tokenizar texto'''
    for match in re.finditer(r'\w+', text, re.UNICODE):
        yield match.group(0)
        
def lexicon_analysis(text):
    '''aplicar liwic, si no se detecta categoría asignar valor no moral'''
    tokens = tokenize(text)
    category_counts = Counter()
    for token in tokens:
        categories = parse(token)
        if categories:
            category_counts.update(categories)    
    return category_counts


def get_max_category(liwc_dict):
    '''obtener categoría de mayor valor'''
    if not liwc_dict:  
        return 'no moral'
    return max(liwc_dict, key=liwc_dict.get)


In [None]:
df['liwc_mfd']=df['text'].apply(lexicon_analysis)
df['liwc_mfd'] = df['liwc_mfd'].apply(get_max_category)
df

# Perspectivism

In [None]:
model_path = "../models/perspectivist_bert_model"
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModelForSequenceClassification.from_pretrained(model_path)
model.eval()

In [None]:
def classify_text(text):
    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
    outputs = model(**inputs)
    logits = outputs.logits
    probabilities = logits.softmax(dim=1)
    predicted_class = probabilities.argmax(dim=1).item()
    return predicted_class

df['persp_bert'] = df['text'].apply(lambda x: pd.Series(classify_text(x)))
df= df.replace({'persp_bert': {1:'care', 2: 'fairness', 3: 'loyalty', 4: 'authority', 5: 'purity', 0: 'nonmoral'}})
df


# Métricas de Anotación y Selección de Etiquetas

## Moralidad sin polaridad

In [12]:
import pandas as pd
df=pd.read_csv('../data/MIND/final_tweets_morality.csv')
hcr=pd.read_csv('../data/HCR/final_hcr_morality.csv')
omc=pd.read_csv('../data/OMC/final_omc_morality.csv')

MORAL_MAPPING= {'harm':'care', 'cheating':'fairness','betrayal':'loyalty','subversion':'authority','degradation':'purity','sanctity':'purity','no moral':'nonmoral','nomoral':'nonmoral'}

df['liwc_mfd'] = df['liwc_mfd'].apply(lambda x: x.split('.')[0] if isinstance(x, str) else x)
df.replace({"moralbert": MORAL_MAPPING, "roberta_mmp":MORAL_MAPPING, "roberta_mm":MORAL_MAPPING, "liwc_mfd":MORAL_MAPPING, "persp_bert":MORAL_MAPPING}, inplace=True)

hcr['liwc_mfd'] = hcr['liwc_mfd'].apply(lambda x: x.split('.')[0] if isinstance(x, str) else x)
hcr.replace({"moralbert": MORAL_MAPPING, "roberta_mmp":MORAL_MAPPING, "roberta_mm":MORAL_MAPPING, "liwc_mfd":MORAL_MAPPING, "persp_bert":MORAL_MAPPING}, inplace=True)

omc['liwc_mfd'] = omc['liwc_mfd'].apply(lambda x: x.split('.')[0] if isinstance(x, str) else x)
omc.replace({"moralbert": MORAL_MAPPING, "roberta_mmp":MORAL_MAPPING, "roberta_mm":MORAL_MAPPING, "liwc_mfd":MORAL_MAPPING, "persp_bert":MORAL_MAPPING}, inplace=True)


In [None]:
# SELECCIONAR ETIQUETA

#model_cols = ['moralbert', 'roberta_mmp', 'roberta_mm', 'liwc_mfd']
#model_cols=['moral_label2','human_annot']
def count_moral_votes(row):
    votes = {}
    for col in model_cols:
        label = row[col]
        votes[label] = votes.get(label, 0) + 1
    return votes

def get_top_morals(vote_dict):
    max_votes = max(vote_dict.values())
    top_morals = [moral for moral, count in vote_dict.items() if count == max_votes]
    
    if len(top_morals) > 1 and 'nonmoral' in top_morals:
        top_morals = [moral for moral in top_morals if moral != 'nonmoral']
    
    return top_morals[0]

    
df['top_morals'] = df.apply(count_moral_votes, axis=1)
df['moral_label2'] = df['top_morals'].apply(get_top_morals)
df.drop('top_morals',axis=1,inplace=True)



In [7]:
## Métrica Cohen Kappa
#label_cols = ["moralbert", "roberta_mm","roberta_mmp", "liwc_mfd"]
label_cols=['moral_label2','human_annot']

def compute_cohen_kappa(df,label_cols=label_cols):
    #label_cols = ["moralbert", "roberta_mm","roberta_mmp", "liwc_mfd"]
    label_cols=['moral_label2','human_annot']

    kappa_matrix = pd.DataFrame(index=label_cols, columns=label_cols)
    
    # Cohen's Kappa values
    for col1, col2 in combinations(label_cols, 2):
        kappa = cohen_kappa_score(df[col1], df[col2])
        kappa_matrix.loc[col1, col2] = kappa
        kappa_matrix.loc[col2, col1] = kappa  
    
    # Diagonal v
    for col in label_cols:
        kappa_matrix.loc[col, col] = 1.0
    
    kappa_matrix = kappa_matrix.astype(float)
    kappa_matrix = kappa_matrix.round(4)
    
    return kappa_matrix

kappa_matrix_df1= compute_cohen_kappa(df)
#kappa_matrix_omc= compute_cohen_kappa(omc)
#kappa_matrix_hcr= compute_cohen_kappa(hcr)
#df= pd.concat([kappa_matrix_df1,kappa_matrix_hcr,kappa_matrix_omc], keys= ['POZZI','HCR','OMC'])
#print(df.to_latex())

print(kappa_matrix_df1.to_latex())

Unnamed: 0,moral_label2,human_annot
moral_label2,1.0,0.4174
human_annot,0.4174,1.0


In [8]:
print(kappa_matrix_df1.to_latex())

\begin{tabular}{lrr}
\toprule
 & moral_label2 & human_annot \\
\midrule
moral_label2 & 1.000000 & 0.417400 \\
human_annot & 0.417400 & 1.000000 \\
\bottomrule
\end{tabular}



In [11]:
# Métrica Fleiss kappa

from torchmetrics.nominal import FleissKappa
import torch
#https://lightning.ai/docs/torchmetrics/stable/nominal/fleiss_kappa.html

categories = ['care', 'fairness', 'loyalty', 'authority', 'purity', 'nonmoral']

# Create a function to count occurrences per category
def count_category_occurrences(row, categories):
    #annotators = ["moralbert", "roberta_mm","roberta_mmp", "liwc_mfd"]
    annotators=['moral_label2','human_annot']
    counts = {category: 0 for category in categories}
    
    for annotator in annotators:
        if row[annotator] in counts:
            counts[row[annotator]] += 1
            
    return list(counts.values())


ratingsdf1 = df.apply(lambda row: count_category_occurrences(row, categories), axis=1)
#ratingsomc = omc.apply(lambda row: count_category_occurrences(row, categories), axis=1)
#ratingshcr = hcr.apply(lambda row: count_category_occurrences(row, categories), axis=1)

# Convertir los resultados a tensores
ratings_tensor_df1 = torch.tensor(ratingsdf1.tolist(), dtype=torch.int)
#ratings_tensor_omc = torch.tensor(ratingsomc.tolist(), dtype=torch.int)
#ratings_tensor_hcr = torch.tensor(ratingshcr.tolist(), dtype=torch.int)

fleiss_kappa = FleissKappa(mode='counts')

# Calcular Fleiss' 
fleiss_k_df1 = fleiss_kappa(ratings_tensor_df1)
#fleiss_k_omc = fleiss_kappa(ratings_tensor_omc)
#fleiss_k_hcr = fleiss_kappa(ratings_tensor_hcr)

#results = {'Dataset': ['df1', 'omc', 'hcr'],'Fleiss Kappa': [fleiss_k_df1.item(), fleiss_k_omc.item(), fleiss_k_hcr.item()]}

#fleiss_kappa_df = pd.DataFrame(results)

#print(fleiss_kappa_df.to_latex())
fleiss_k_df1

tensor(0.4055)

In [13]:
##PABAK
from calcular_pabak import *
def run_pabak_multiclass(df, annotator_columns):
    #map values into int
    df_filtered = df[annotator_columns].replace({
        'care': 1, 'harm': 1,
        'fairness': 2, 'cheating': 2,
        'loyalty': 3, 'betrayal': 3,
        'authority': 4, 'subversion': 4,
        'purity': 5, 'degradation': 5,
        'nonmoral': 6, 'nomoral': 6,  'no moral': 6
    })


    data_matrix = df_filtered.values
    #print(data_matrix)
    observed_agreement_full = calculate_observed_agreement_full(data_matrix)
    PABAK_full = 2 * observed_agreement_full - 1

    print("PABAK:", PABAK_full)
    return PABAK_full

annotator_columns = ["moralbert", "roberta_mm", "roberta_mmp", "liwc_mfd"]
annotator_columns = ['moral_label2','human_annot']

df_pabak = run_pabak_multiclass(df, annotator_columns)
#hcr_pabak = run_pabak_multiclass(hcr, annotator_columns)
#omc_pabak = run_pabak_multiclass(omc, annotator_columns)
df_pabak

textos: 159 , anotadores:  2
PABAK: 0.320754716981132


np.float64(0.320754716981132)

## Moralidad con polaridad

In [None]:
df=pd.read_csv('../data/MIND/final_tweets_morality.csv')
hcr=pd.read_csv('../data/HCR/final_hcr_morality.csv')
omc=pd.read_csv('../data/OMC/final_omc_morality.csv')

LW_MORAL_MAPPING= {'care.virtue':'care', 'fairness.virtue':'fairness','loyalty.virtue':'loyalty','authority.virtue':'authority','sanctity.virtue':'purity','care.vice':'harm','fairness.vice':'cheating','loyalty.vice':'betrayal','authority.vice':'subversion','sanctity.vice':'degradation','no moral':'nonmoral','nomoral':'nonmoral'}

df.replace({"liwc_mfd": LW_MORAL_MAPPING,"moralbert":LW_MORAL_MAPPING,"roberta_mmp":LW_MORAL_MAPPING}, inplace=True)
hcr.replace({"liwc_mfd": LW_MORAL_MAPPING,"moralbert":LW_MORAL_MAPPING,"roberta_mmp":LW_MORAL_MAPPING}, inplace=True)
omc.replace({"liwc_mfd": LW_MORAL_MAPPING,"moralbert":LW_MORAL_MAPPING,"roberta_mmp":LW_MORAL_MAPPING}, inplace=True)


In [None]:
#SELECCIÓN ETIQUETA CON POLARIDAD
'''
model_cols = ['moralbert', 'roberta_mmp', 'liwc_mfd']

def count_moral_votes(row):
    votes = {}
    for col in model_cols:
        label = row[col]
        votes[label] = votes.get(label, 0) + 1
    return votes

def get_top_morals(vote_dict):
    max_votes = max(vote_dict.values())
    top_morals = [moral for moral, count in vote_dict.items() if count == max_votes]
    
    if len(top_morals) > 1 and 'nonmoral' in top_morals:
        top_morals = [moral for moral in top_morals if moral != 'nonmoral']
    
    return top_morals[0]

    
df['top_morals'] = df.apply(count_moral_votes, axis=1)
df['moral_label_polarity'] = df['top_morals'].apply(get_top_morals)
df.drop('top_morals',axis=1,inplace=True)
df
'''

In [None]:
def compute_cohen_kappa(df,label_cols=label_cols):
    label_cols = ["moralbert","roberta_mmp", "liwc_mfd"]
    kappa_matrix = pd.DataFrame(index=label_cols, columns=label_cols)
    
    # Fill in pairwise Cohen's Kappa values
    for col1, col2 in combinations(label_cols, 2):
        kappa = cohen_kappa_score(df[col1], df[col2])
        kappa_matrix.loc[col1, col2] = kappa
        kappa_matrix.loc[col2, col1] = kappa  # symmetric
    
    # Diagonal values (agreement with self = 1.0)
    for col in label_cols:
        kappa_matrix.loc[col, col] = 1.0
    
    kappa_matrix = kappa_matrix.astype(float)
    kappa_matrix = kappa_matrix.round(4)
    
    return kappa_matrix


cols = ["moralbert", "roberta_mmp", "liwc_mfd"]

kappa_matrix_df1= compute_cohen_kappa(df,label_cols=cols)
kappa_matrix_omc= compute_cohen_kappa(omc, label_cols=cols)
kappa_matrix_hcr= compute_cohen_kappa(hcr, label_cols=cols)

df2= pd.concat([kappa_matrix_df1,kappa_matrix_hcr,kappa_matrix_omc], keys= ['POZZI','HCR','OMC'])
print(df2.to_latex())

In [None]:
# Métrica Fleiss kappa
from torchmetrics.nominal import FleissKappa
import torch
#https://lightning.ai/docs/torchmetrics/stable/nominal/fleiss_kappa.html

def count_category_occurrences(row, categories):
    annotators = ['moralbert','roberta_mmp', 'liwc_mfd']
    counts = {category: 0 for category in categories}
    
    for annotator in annotators:
        if row[annotator] in counts:
            counts[row[annotator]] += 1
            
    return list(counts.values())


ratingsdf1 = df.apply(lambda row: count_category_occurrences(row, categories), axis=1)
ratingsomc = omc.apply(lambda row: count_category_occurrences(row, categories), axis=1)
ratingshcr = hcr.apply(lambda row: count_category_occurrences(row, categories), axis=1)

# Convertir los resultados a tensores
ratings_tensor_df1 = torch.tensor(ratingsdf1.tolist(), dtype=torch.int)
ratings_tensor_omc = torch.tensor(ratingsomc.tolist(), dtype=torch.int)
ratings_tensor_hcr = torch.tensor(ratingshcr.tolist(), dtype=torch.int)

fleiss_kappa = FleissKappa(mode='counts')

# Calcular Fleiss' 
fleiss_k_df1 = fleiss_kappa(ratings_tensor_df1)
fleiss_k_omc = fleiss_kappa(ratings_tensor_omc)
fleiss_k_hcr = fleiss_kappa(ratings_tensor_hcr)

results = {
    'Dataset': ['df1', 'omc', 'hcr'],
    'Fleiss Kappa': [fleiss_k_df1.item(), fleiss_k_omc.item(), fleiss_k_hcr.item()]}

fleiss_kappa_df = pd.DataFrame(results)

print(fleiss_kappa_df.to_latex())

In [None]:
## PABAK

##PABAK
from calcular_pabak import *
def run_pabak_multiclass(df, annotator_columns):
    df_filtered = df[annotator_columns].replace({
        'care': 1, 'harm': 2,
        'fairness': 3, 'cheating': 4,
        'loyalty': 5, 'betrayal': 6,
        'authority': 7, 'subversion': 8,
        'purity': 9, 'degradation': 10,
        'nonmoral': 11, 'no moral': 11, 'nomoral': 11
    })

    data_matrix = df_filtered.values

    observed_agreement_full = calculate_observed_agreement_full(data_matrix)
    PABAK_full = 2 * observed_agreement_full - 1

    print("PABAK:", PABAK_full)
    return PABAK_full

annotator_columns = ["moralbert", "roberta_mmp", "liwc_mfd"]
pabak_df = run_pabak_multiclass(df, annotator_columns)
pabak_omc = run_pabak_multiclass(omc, annotator_columns)
pabak_hcr = run_pabak_multiclass(hcr, annotator_columns)

# Unión Datasets

In [None]:
embeddings= pd.read_pickle('../models/omc_tadw_df.pkl')
embeddings

df_merged = pd.merge(df_merged, embeddings[['tweet.id', 'extra_data']], on='tweet.id', how='inner')
df_merged.rename(columns={'extra_data':'tadw'},inplace=True)
df_merged

#df_merged.to_pickle('../data/OMC/final_omc_morality.pkl')


In [None]:
df = pd.read_pickle("../data/OMC/final_omc_morality.pkl")
df

embedding_columns = ['svd', 'deepwalk', 'node2vec', 'tadw']

combined_embeddings = np.array([
    np.concatenate([
        row['svd'],
        row['deepwalk'],
        row['node2vec'],
        row['tadw']
    ]) for _, row in df.iterrows()
])

#np.save("all_omc_embeddings.npy", combined_embeddings)


# Anotador Humano

In [None]:
tweets = pd.read_pickle("../data/MIND/final_tweets_morality.pkl")

tweets