In [7]:
import os
import time
import json
import logging
import warnings
import pandas as pd
import numpy as np
from glob import glob
from functools import reduce
from sklearn.metrics import f1_score
from datasets import Dataset, DatasetDict
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from transformers import Trainer, TrainingArguments, AutoTokenizer,AutoModelForSequenceClassification

%run Balance_Data.ipynb
%run Preprocessing_Functions.ipynb



# Data Preparation Functions

In [None]:
# MPres functions
def preprocess_data_binary(df, m):
    """
    Preprocesses the dataframe based on the binary task (MPres).

    Args:
    df (pd.DataFrame): DataFrame containing the data to preprocess.
    m (int): Moral identifier (1...5)

    Returns:
    pd.DataFrame: Preprocessed DataFrame.
    """
    if m == 1:
        df = labels_m1(df)
    elif m == 2:
        df = labels_m2(df)
    elif m == 3:
        df = labels_m3(df)
    elif m == 4:
        df = labels_m4(df)
    elif m == 5:
        df = labels_m5(df)
    return df


# MultiPres functions
def preprocess_data_polarity(df, mp):
    """
    Preprocesses the dataframe based on the polarity task (MultiPres).

    Args:
    df (pd.DataFrame): DataFrame containing the data to preprocess.
    mp (int): Moral trait identifier (1...5)

    Returns:
    pd.DataFrame: Preprocessed DataFrame.
    """
    if mp == 1:
        df=label_mp1(df)
        id2label, label2id = mp_label1()
    elif mp == 2:
        df=label_mp2(df)
        id2label, label2id = mp_label2()
    elif mp == 3:
        df=label_mp3(df)
        id2label, label2id = mp_label3()
    elif mp == 4:
        df=label_mp4(df)
        id2label, label2id = mp_label4()
    elif mp == 5:
        df=label_mp5(df)
        id2label, label2id = mp_label5()
    return df,id2label,label2id


# METRICS
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average='macro')
    acc = accuracy_score(labels, predictions)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall}

# TOKENIZER
def tokenize_function(examples):
    tokenized_inputs=tokenizer(examples["text"], truncation=True)
    return tokenized_inputs


In [3]:
#CREATE DATAFRAMES
def annotator_dataframes(folder_path, ending):
    """
    Reads CSV files from a specified folder and creates pandas DataFrames.

    Args:
    folder_path (str): Path to the folder containing CSV files.
    ending (str): File extension or ending of the CSV files to be read (e.g., '.csv').

    Returns:
    dict: A dictionary where keys are filenames (without `ending`) and values are corresponding pandas DataFrames.
    list: List of all filenames that matched the specified `ending`.
    list: List of filenames without the specified `ending`.
    """
    files = os.listdir(folder_path)
    dataframes = {}
    final_filenames = []
    names = []  
    for filename in files:
        if filename.endswith(ending):
            df = pd.read_csv(os.path.join(folder_path, filename))
            dataframes[filename.replace(ending, '')] = df
            final_filenames.append(filename)
            names.append(filename.replace(ending, ''))
    return dataframes, final_filenames, names



#LOAD DATA
def load_data(prompt_number,folder_path,lexicon,ending):
    """
    Loads datasets from CSV files in a specified folder, applies a prompt template based on `prompt_number` and `lexicon`,
    and returns modified datasets.

    Args:
    prompt_number (int): Number indicating the type of prompt template to apply (1, 2, 3, or 4).
    folder_path (str): Path to the folder containing CSV files.
    lexicon (str): Type of lexicon to use ('moralstrength', 'depechemood', or 'moralstrength+depechemood').
    ending (str): File extension or ending of the CSV files to be read (e.g., '.csv').

    Returns:
    dict: A dictionary where keys are dataset names and values are corresponding pandas DataFrames modified by `prompt_template`.
    list: List of all filenames that matched the specified `ending`.
    list: List of filenames without the specified `ending`.
    """
    #no prompt
    datasets,filenames,names = annotator_dataframes(folder_path,ending)
    for dataset in names:
        datasets[dataset]=  prompt_template(datasets[dataset], prompt_number,lexicon)
  
    return datasets,filenames,names


#PROMPT AND DATA FUNCTIONS
def prompt_template(df, prompt_number,lexicon):
    """
    Modifies the 'text' column of the dataframe based on the specified prompt number and lexicon.

    Args:
    df (pd.DataFrame): DataFrame containing the data to be processed.
    prompt_number (int): Number indicating the prompt template to apply (1 to 4).
    lexicon (str): Lexicon type ('moralstrength', 'depechemood', 'moralstrength+depechemood').

    Returns:
    pd.DataFrame: DataFrame with the 'text' column modified according to the specified prompt template.
    """

    if lexicon=='moralstrength':
        if prompt_number == 1:
            df["text"] = df["moralstrength"]  + " : " + df["text"]                       # PROMPT 1   
        elif prompt_number == 2:
            df["text"] = "the text '" + df["text"] + "' reflects the moral value "+ df["moralstrength"] # PROMPT 2
        elif prompt_number == 3:
            df["text"] = "the moral value " + df["moralstrength"]+ " is reflected in the text '" + df["text"] + " '"   # PROMPT 3
        elif prompt_number == 4:
            df["text"] = "The text '" + df["text"]+ "reflects varying intensities of morality such as: " +df['moralstrength_i']         # PROMPT 4

    elif lexicon=='depechemood':
        if prompt_number == 1:
            df["text"] = df["emotion_word"]  + " : " + df["text"]                       # PROMPT 1   
        elif prompt_number == 2:
            df["text"] = "the text '" + df["text"] + "' reflects the emotion "+ df["emotion_word"] # PROMPT 2
        elif prompt_number == 3:
            df["text"] = "the emotion " + df["emotion_word"]+ " is reflected in the text '" + df["text"] + " '"   # PROMPT 3
        elif prompt_number == 4:
            df["text"] = "The text '" + df["text"]+ "' reflects different emotions such as: " +df['depechemood']         # PROMPT 4
    
    elif lexicon=='moralstrength+depechemood':
        if prompt_number == 1:
            df["text"] = df["moralstrength"] +" and "+ df["emotion_word"]  + " : " + df["text"]                       # PROMPT 1   
        elif prompt_number == 2:
            df["text"] = "the text '" + df["text"] + "' reflects : the moral value " + df["moralstrength"]+ " and the emotion "+ df["emotion_word"] # PROMPT 2
        elif prompt_number == 3:
            df["text"] = "the moral value: " + df["moralstrength"] +" and the emotion " + df["emotion_word"]+ " is reflected in the text '" + df["text"] + " '"   # PROMPT 3
        elif prompt_number == 4:
            df["text"] = "The text '" + df["text"]+ "' reflects varying intensities of morality such as: " + df["moralstrength_i"] +" and different emotions such as: " +df['depechemood']         # PROMPT 4
        
    else:
        pass
    
    return df


# Train and Test Functions

In [None]:
# MPress task Train and Test Function

def train_test_binary(df, m, model_name, directory, name,prompt,lexicon,undersampling=True):
    #-----info-----
    print(f'Info: {name}')
    print(f'Moral to detect: {m}')
    print(f'Prompt: {prompt}')
    print(f'Directory: {directory}')
    print(f'Undersampling: {undersampling}')
    print('Data example:')
    print(df.loc[0].text)
    
    #-----data preparation-----
    df = preprocess_data_binary(df, m)
    id2label = {0: "NO-MORAL", 1: "MORAL"}
    label2id = {"NO-MORAL": 0, "MORAL": 1}
    
    if undersampling:
        df=undersampling_data(df)
    else:
        pass
        
    #-----info-----
    file_name = f"result_binary_{name}_{lexicon}_moral{m}_prompt{prompt}"
    print(f'File: {file_name}')
    print(f'Labels :{label2id}')

        
    #-----split data-----
    train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
    train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)
    datasets = {'train': Dataset.from_pandas(train_df), 'val': Dataset.from_pandas(val_df), 'test': Dataset.from_pandas(test_df)}
    datasets = DatasetDict(datasets)

    #----model--------
    tokenizer = AutoTokenizer.from_pretrained(model_name, truncation=True)
    tokenized_datasets = datasets.map(tokenize_function, batched=True)
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2, id2label=id2label, label2id=label2id)
    
    #-----train-----
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_datasets["train"],
        eval_dataset=tokenized_datasets["val"],
        tokenizer=tokenizer,
        compute_metrics=compute_metrics)
    
    #start_time = time.time()
    trainer.train()
    #end_time = time.time()
    #elapsed_time = (end_time - start_time) 
    #time_results = {"elapsed_time_train": elapsed_time}

    
    #-----inference-----
    #start_time_pred = time.time()
    predictions = trainer.predict(tokenized_datasets["test"])
    predicted_class_ids = predictions.predictions.argmax(axis=1)
    actual_labels = tokenized_datasets["test"]["label"]
    results = classification_report(actual_labels, predicted_class_ids, digits=5, output_dict=True)
    #end_time_pred = time.time()
    #elapsed_time_pred = (end_time_pred - start_time_pred)
    #time_results_pred = {"elapsed_time prediction": elapsed_time_pred}



    
    #-----save metrics-----   
    results_directory = os.path.join(directory, file_name)
    os.makedirs(results_directory, exist_ok=True)
    with open(os.path.join(results_directory, f'results_classification_{model_name}.json'), 'w') as f:
        json.dump(results, f)
        
    #-----save times-----  
    #time_file_path = os.path.join(results_directory, f'training_time_{model_name}.json')
    #with open(time_file_path, 'w') as f:
        #json.dump([time_results,time_results_pred], f)
    


In [None]:
# MPol task Train and Test Function

def train_test_model_polarity(df, task, model_name, directory, name,prompt,lexicon,undersampling=False):
    #-----info-----
    print(f'Info: {name}')
    print(f'Task: {task}')
    print(f'Prompt: {prompt}')
    print(f'Directory: {directory}')
    print(f'Undersampling: {undersampling}')
    print('Data example:')
    print(df.loc[0].text)
    
    #-----data preparation-----    
    df,id2label,label2id = preprocess_data_polarity(df, task)
    if undersampling:
        df=undersampling_data(df)
    else:
        pass

    #-----info-----
    file_name = f"result_polarity_{name}_{lexicon}_task{task}_prompt{prompt}"
    print(f'File: {file_name}')
    print(f'Labels :{label2id}')
    
        
    #-----split data-----
    train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
    train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)
    datasets = {'train': Dataset.from_pandas(train_df), 'val': Dataset.from_pandas(val_df), 'test': Dataset.from_pandas(test_df)}
    datasets = DatasetDict(datasets)

    #-----model------
    tokenizer = AutoTokenizer.from_pretrained(model_name, truncation=True)
    tokenized_datasets = datasets.map(tokenize_function, batched=True)
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=3, id2label=id2label, label2id=label2id)
    
    #-----train-----
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_datasets["train"],
        eval_dataset=tokenized_datasets["val"],
        tokenizer=tokenizer,
        compute_metrics=compute_metrics
    )

    #start_time = time.time()
    trainer.train()
    #end_time = time.time()
    #elapsed_time = (end_time - start_time) 
    #time_results = {"elapsed_time_train": elapsed_time}

    #-----inference-----
    #start_time_pred = time.time()
    predictions = trainer.predict(tokenized_datasets["test"])
    predicted_class_ids = predictions.predictions.argmax(axis=1)
    actual_labels = tokenized_datasets["test"]["label"]
    results = classification_report(actual_labels, predicted_class_ids, digits=5, output_dict=True)
    #end_time_pred = time.time()
    #elapsed_time_pred = (end_time_pred - start_time_pred)
    #time_results_pred = {"elapsed_time prediction": elapsed_time_pred}

    #-----save metrics-----
    results_directory = os.path.join(directory, file_name)
    os.makedirs(results_directory, exist_ok=True)
    with open(os.path.join(results_directory, f'results_classification_{model_name}.json'), 'w') as f:
        json.dump(results, f)
        
    #-----save times-----  
    #time_file_path = os.path.join(results_directory, f'training_time_{model_name}.json')
    #with open(time_file_path, 'w') as f:
        #json.dump([time_results,time_results_pred], f)



In [None]:
# MultiPres Task Train and Test Function

def train_test_model_multiclass_6(df, task, model_name, directory, name,prompt,lexicon,undersampling=False):
    #-----info-----
    print(f'Info: {name}')
    print(f'Pask: {task}')
    print(f'Prompt: {prompt}')
    print(f'Directory: {directory}')
    print(f'Undersampling: {undersampling}')
    print('Data example:')
    print(df.loc[0].text)
    
    #------data preparation-----
    df = label_multiclass6(df)
    id2label,label2id = multiclass_task_6()
    if undersampling:
        df=undersampling_data(df)
    else:
        pass
        
    #-----info-----
    file_name = f"result_multiclass_{name}_{lexicon}_prompt{prompt}"
    print(f'File: {file_name}')
    print(f'Labels :{label2id}')


    #-----split data-----
    train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
    train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)
    datasets = {'train': Dataset.from_pandas(train_df), 'val': Dataset.from_pandas(val_df), 'test': Dataset.from_pandas(test_df)}
    datasets = DatasetDict(datasets)

    #-----model-----
    tokenizer = AutoTokenizer.from_pretrained(model_name, truncation=True)
    tokenized_datasets = datasets.map(tokenize_function, batched=True)
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=6, id2label=id2label, label2id=label2id)

    
    #-----train-----
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_datasets["train"],
        eval_dataset=tokenized_datasets["val"],
        tokenizer=tokenizer,
        compute_metrics=compute_metrics
    )

    #start_time = time.time()
    trainer.train()
    #end_time = time.time()
    #elapsed_time = ( end_time - start_time) 
    #time_results = {"elapsed_time_train": elapsed_time}


    #-----inference-----

    #start_time_pred = time.time()
    predictions = trainer.predict(tokenized_datasets["test"])
    predicted_class_ids = predictions.predictions.argmax(axis=1)
    actual_labels = tokenized_datasets["test"]["label"]
    results = classification_report(actual_labels, predicted_class_ids, digits=5, output_dict=True)
    #end_time_pred = time.time()
    #elapsed_time_pred = (end_time_pred - start_time_pred) 
    #time_results_pred = {"elapsed_time prediction": elapsed_time_pred}

    #-----save metrics-----
    results_directory = os.path.join(directory, file_name)
    os.makedirs(results_directory, exist_ok=True)
    with open(os.path.join(results_directory, f'results_classification_{model_name}.json'), 'w') as f:
        json.dump(results, f)
        
    #-----save times-----  
    #time_file_path = os.path.join(results_directory, f'training_time_{model_name}.json')
    #with open(time_file_path, 'w') as f:
        #json.dump([time_results,time_results_pred], f)


In [None]:
# MultiPol Task Train and Test Function


def train_test_model_multiclass_11(df, task, model_name, directory, name,prompt,lexicon,undersampling=False):
    #-----info-----
    print(f'Info: {name}')
    print(f'Task: {task}')
    print(f'Prompt: {prompt}')
    print(f'Directory: {directory}')
    print(f'Undersampling: {undersampling}')
    print('Data example')
    print(df.loc[0].text)
    
    #-----data preparation-----
    df = label_multiclass11(df)
    id2label,label2id = multiclass_task_11()
    if undersampling:
        df=undersampling_data(df)
    else:
        pass
        
    #-----info-----
    file_name = f"result_multiclass_{name}_{lexicon}_prompt{prompt}"
    print(f'File: {file_name}')
    print(f'Labels :{label2id}')

    
    #-----split data-----
    train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
    train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)
    datasets = {'train': Dataset.from_pandas(train_df), 'val': Dataset.from_pandas(val_df), 'test': Dataset.from_pandas(test_df)}
    datasets = DatasetDict(datasets)

    #------model-----
    tokenizer = AutoTokenizer.from_pretrained(model_name, truncation=True)
    tokenized_datasets = datasets.map(tokenize_function, batched=True) 
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=11, id2label=id2label, label2id=label2id)
    
    #-----train-----
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_datasets["train"],
        eval_dataset=tokenized_datasets["val"],
        tokenizer=tokenizer,
        compute_metrics=compute_metrics
    )
    
    #start_time = time.time()
    trainer.train()
    #end_time = time.time()
    #elapsed_time = (end_time - start_time)
    #time_results = {"elapsed_time_train": elapsed_time}


    #-----inference-----
    start_time_pred = time.time()
    predictions = trainer.predict(tokenized_datasets["test"])
    predicted_class_ids = predictions.predictions.argmax(axis=1)
    actual_labels = tokenized_datasets["test"]["label"]
    results = classification_report(actual_labels, predicted_class_ids, digits=5, output_dict=True)
    #end_time_pred = time.time()
    #elapsed_time_pred = (end_time_pred - start_time_pred) / 60
    #time_results_pred = {"elapsed_time prediction": elapsed_time_pred}

    #-----save metrics-----
    results_directory = os.path.join(directory, file_name)
    os.makedirs(results_directory, exist_ok=True)
    with open(os.path.join(results_directory, f'results_classification_{model_name}.json'), 'w') as f:
        json.dump(results, f)
        
    #-----save times-----  
    #time_file_path = os.path.join(results_directory, f'training_time_{model_name}.json')
    #with open(time_file_path, 'w') as f:
        #json.dump([time_results,time_results_pred], f)


# Parameters

In [8]:
directory='Results/Roberta_best_results'

training_args = TrainingArguments(
    output_dir = directory,
    learning_rate=2e-5,
    num_train_epochs=15,
    per_device_train_batch_size = 32,
    per_device_eval_batch_size=32,
    weight_decay=0.01,
    evaluation_strategy = "epoch",
    push_to_hub=False,
    save_strategy='no')


models=['bert-base-uncased','roberta-base']
#select model
model_name=models[1]

#select prompt
prompts=['noprompt',1,2,3,4]
prompt_number=prompts[2]

#select lexicon
lexicons=['baseline','moralstrength','depechemood','moralstrength+depechemood']
lexicon=lexicons[3]


#model tokenizer
if model_name=='bert-base-uncased':
    folder='Bert'
elif model_name=='roberta-base':
    folder='Roberta' 
tokenizer = AutoTokenizer.from_pretrained(model_name, truncation=True)



# Select datasets

In [9]:

#create datasets
folder_path='DATASETS'
datasets,filenames,names=load_data(prompt_number,folder_path,lexicon,'_dataset.csv')
names

['BALTIMORE', 'ALM', 'REDDIT', 'SANDY', 'BLM', 'ELECTION', 'DAVIDSON']

# Train and Test

In [None]:
#Select Moral Estimation Task : MPres, MPol, MultiPres, MultiPol 
case='MPres'

if case== 'MPres':
    directory = f"Results_{folder}_B/MFTD_Dataset_balanced"
    for task in range(1, 6):
        for f in names:
            df = datasets[f]
            print(f'An example: {df.text.loc[0]}')
            train_test_model_binary(df.copy(), task, model_name, directory, name= f,prompt=prompt_number,lexicon=lexicon, undersampling=True)
            
                                
elif case=='Mpol':
    directory = f"Results_{folder}_P/MFTD_Dataset_balanced"
    for task in range(1, 6):
        for f in names:
            df = datasets[f]
            df['label']=df['label_annotators']
            print(f'An example: {df.text.loc[0]}')
            df.label=df.label.replace({"hate": 'degradation','Care':'care','Harm':'harm','Fairness':'fairness','Cheating':'cheating','Loyalty':'loyalty','Betrayal':'betrayal','Authority':'authority','Subversion':'subversion','Purity':'purity','Degradation':'degradation','Non-moral':'non-moral','nm':'non-moral','Non-Moral':'non-moral'})
            train_test_model_polarity(df.copy(), task, model_name, directory, name=f,prompt=prompt_number,lexicon=lexicon,undersampling=True)


elif case=='MultiPres':
    directory = f"Results_{folder}_M6/MFTD_Dataset_balanced"
    for f in names:
        df = datasets[f]
        print(f'An example: {df.text.loc[0]}')
        train_test_model_multiclass_5(df.copy(), 1, model_name, directory, name=f,prompt=prompt_number,lexicon=lexicon,undersampling=True)


elif case=='MultiPol':
    directory = f"Results_{folder}_M11/MFTD_Dataset_balanced"
    for f in names:
        df = datasets[f]
        df['label']=df['label_annotators']
        df.label=df.label.replace({"hate": 'degradation','Care':'care','Harm':'harm','Fairness':'fairness','Cheating':'cheating','Loyalty':'loyalty','Betrayal':'betrayal','Authority':'authority','Subversion':'subversion','Purity':'purity','Degradation':'degradation','Non-moral':'non-moral','nm':'non-moral','Non-Moral':'non-moral'})
        train_test_model_multiclass_11(df.copy(), 1, model_name, directory, name=f,prompt=prompt_number,lexicon=lexicon,undersampling=True)
