# Moral Value Recognition
_Ben Heyderman_

This notebook presents the practical implementation of the paper _Predicting Lyric Morality Through Handcrafted Audio Features_, which was submitted as the final project for the Sound and Music Computing MSc course at Queen Mary University of London.

## Dataset Gathering

### Step 1 - Gather Spotify Previews and Lyrics

The minimum requirement for the project is a csv file with one column containing song names and song titles. This can be used to find song lyrics and previews.

1. Song lyrics can be scraped from Genius using the utils_get_lyrics script in the supporting scripts folder. Note: WASABI API includes lyrics (not GitHub version) so it could be quicker to access through there
2. Previews are obtained in a two step process: finding the preview url (utils_get_previews) and downloading the preview (utils_download_previews). I have included a class I wrote for scraping information from Spotify which has some useful functionality beyond what is used in this project.

### Step 2 - Extract Features and Construct Dataset

This is done using the utils_construct_dataset script which uses functions from the audiofeatureextractor class I created.
- A combination of bespoke (some origional and some adapted from other work - see class for details) and features from the Essentia library are extracted.
- The dataset is stored as a dictionary in which features are organised by category (allowing easy categorical elimination)
- The class includes a function to convert these dictionaries into Pandas dataframes ready for xgboost.

### Step 3 - Moral Value Extraction

- Moral values are extracted using a model created by Vjosa Preniqi. This cannot be included in this as it is not public/my own work.
- The remainder of this notebook assumes the moral values are extracted and added as additional columns to the the feature dataframe

## Train Models

### Set Up

In [None]:
import pandas as pd 
import xgboost as xgb
from sklearn.model_selection import train_test_split 
from sklearn.metrics import f1_score
from sklearn.model_selection import GridSearchCV # cross validation
import pickle
import matplotlib.pyplot as plt
import sage
import shap
import itertools
from imblearn.over_sampling import RandomOverSampler, SMOTE
import matplotlib.pyplot as plt
from sklearn.feature_selection import RFECV
from sklearn.model_selection import StratifiedKFold
from matplotlib.colors import LinearSegmentedColormap

In [None]:
# Load dataset
dataset_identifier = "WASABI"
df = pd.read_csv(f"dataset_{dataset_identifier}_MFT.csv")

In [None]:
# Plot distribution

# Get first 10 columns (mft columns)
selected_columns = ['care', 'harm', 'fairness', 'cheating', 'loyalty', 'betrayal', 'authority', 'subversion', 'purity', 'degradation']
selected_df = df[selected_columns]

# Create a figure and axes
fig, axes = plt.subplots(2, 5, figsize=(20, 10))
axes = axes.flatten()

# Plot a histogram for each mft
for i, column in enumerate(selected_columns):
    axes[i].hist(selected_df[column].dropna(), bins=30, color='skyblue')
    axes[i].set_title(column)

plt.tight_layout()
plt.show()

### Train

In [None]:
def prep_split_dataset_classifier(df, mft, columns_to_drop):
    """
    Prepare and split the dataset for classification.

    Parameters:
    df (DataFrame): The input dataframe containing the data.
    mft (str): The name of the main feature target column.
    columns_to_drop (list): List of column names to drop from the dataframe.

    Returns:
    X (DataFrame): The feature set.
    y (DataFrame): The target set.
    X_train (DataFrame): The training set features.
    X_test (DataFrame): The test set features.
    y_train (DataFrame): The training set target.
    y_test (DataFrame): The test set target.
    """
    
    # Filter the dataframe to only include low and high feature target values
    df_filtered = df[(df[mft] <= 0.2) | (df[mft] >= 0.6)]
    
    # Round the main feature target values to the nearest integer (binary classification)
    df_filtered.loc[:, mft] = df_filtered[mft].round(0)
    
    # Create the target set
    y = df_filtered[[mft]].copy()

    # Drop specified columns to create the feature set
    X = df_filtered.drop(columns_to_drop, axis=1).copy() 

    # Split the data into training and testing sets
    # Y stratified so equal distribution of y in training and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42, stratify=y)

    return X, y, X_train, X_test, y_train, y_test

def get_best_model_classifier(X_train, y_train):
    """
    Train and select the best classifier model using GridSearchCV.

    Parameters:
    X_train (DataFrame): The training set features.
    y_train (DataFrame): The training set target.

    Returns:
    best_model (XGBClassifier): The best classifier model.
    """

    # Parameter grid for GridSearchCV
    param_grid = {
        'n_estimators': [100, 200, 300],
        'max_depth': [3, 5, 6],
        'learning_rate': [0.01, 0.1, 0.2],
        'subsample': [0.6, 0.8, 1.0],
        'reg_lambda': [0.5, 1.0, 1.5],
        'reg_alpha': [0.5, 1.0, 1.5]
    }

    # Calculate scale_pos_weight to handle class imbalance
    scale_pos_weight = (y_train.values == 0).sum() / (y_train.values == 1).sum()
    
    # Initialize the XGBClassifier
    xgb_class = xgb.XGBClassifier(objective='binary:logistic', 
                                  seed=42,
                                  subsample=0.9,
                                  eval_metric='auc',
                                  scale_pos_weight=scale_pos_weight,
                                  colsample_bytree=0.5)

    # Initialize GridSearchCV with 3-fold cross-validation
    grid_search = GridSearchCV(estimator=xgb_class, param_grid=param_grid, cv=3, n_jobs=-1, scoring='roc_auc', verbose=1)

    # Fit the model
    grid_search.fit(X_train, y_train)

    # Train the best model with the best parameters
    best_model = grid_search.best_estimator_
    best_model.fit(X_train, y_train)
    
    return best_model

def save_data(filename, data):
    """
    Save the data to a file.

    Parameters:
    filename (str): The name of the file to save the data.
    data: The data to be saved.

    Returns:
    None
    """

    with open(filename, 'wb') as fp:
        pickle.dump(data, fp)
        print('data saved successfully to file.\n')

In [None]:
# List of 10 mfts used in the study
mft_list = ['care','harm','fairness','cheating','loyalty','betrayal','authority','subversion','purity','degradation']

# Columns to be removed from the training set
columns_to_drop = mft_list + ["file_id"]

imbalance_method = ""


In [None]:
# Train model for each of the MFTs

# If blank the class imbalance is being addressed using weighting in the xgboost classifier

for mft in mft_list:
    print(f"Training model {mft}:")
    print("Splitting data...")
    
    # Use above function to prepare the dataset
    _, _, X_train, X_test, y_train, y_test = prep_split_dataset_classifier(df, mft, columns_to_drop)

    # Optional alternative methods for addressing class imbalance.
    # Uncomment one method at a time

    '''# ROS
    imbalance_method = "_ROS"
    ros = RandomOverSampler(random_state=42)
    # fit predictor and target variable
    X_train, y_train = ros.fit_resample(X_train, y_train)'''
    
    '''# SMOTE
    imbalance_method = "_SMOTE"
    smote = SMOTE(random_state=42)
    X_train, y_train = smote.fit_resample(X_train, y_train)'''

    # Perform gridsearch using the above function
    print("Grid search...")
    best_model = get_best_model_classifier(X_train, y_train)
    print("Best model found.")  
        
    # Predict on the test set
    y_pred = best_model.predict(X_test)

    # Calculate f1 to assess performance
    f1 = f1_score(y_test, y_pred)
    print(f"F1: {f1}")

    # Save model
    filename = f"best_model_{dataset_identifier}_{mft}{imbalance_method}.pkl"
    save_data(filename, best_model)


### Feature Elimination

In [None]:
# Cross validated Recurssive Feature Elimination (RFE)

f1_grid_searched = {}

for mft in mft_list:
    print(f"Training model {mft}:")
    selected_features = {}
    f1 = {}

    # Use above function to prepare the dataset
    print("Splitting data...")
    _, _, X_train, X_test, y_train, y_test = prep_split_dataset_classifier(df, mft, columns_to_drop)

    print("Running RFE")

    # Calculate scale_pos_weight
    num_positives = y_train.sum().item()
    num_negatives = len(y_train) - num_positives
    scale_pos_weight = (num_negatives / num_positives)
    
    # Define the model with the calculated scale_pos_weight
    model = xgb.XGBClassifier(scale_pos_weight=scale_pos_weight)

    # perform RFE
    rfe = RFECV(estimator=model, cv=StratifiedKFold(5), min_features_to_select=1, step=1, scoring="f1")
    rfe = rfe.fit(X_train, y_train)
    selected_features = rfe.support_
    
    # Remove unused features from the dataset
    X_test = X_test.iloc[:, selected_features]
    X_train= X_train.iloc[:, selected_features]

    # Gridsearch on using reduced feature set
    print("Performing Grid Search")
    best_model = get_best_model_classifier(X_train, y_train)

    print("Best model found.")

    # Make predictions from best model
    y_pred = best_model.predict(X_test)

    # Calculate f1 score
    f1 = f1_score(y_test, y_pred)
    f1_grid_searched[mft] = f1
    print(f"Grid Searched F1: {f1}")

    # Save model and feature selection
    filename = f'reduced_feature_models/RFE_{dataset_identifier}_{mft}_best_model.pkl'
    save_data(filename, best_model)

    filename = f'reduced_feature_models/RFE_{dataset_identifier}_{mft}_best_featureset.pkl'
    save_data(filename, selected_features)

print("\nPrint F1s:")
print(f1_grid_searched)

In [None]:
from supporting_scripts.audiofeatureextractor import DatasetConstructor

def get_data_by_category(included_categories, full_dataset_path):
    """
    Load the dataset and filter it by the specified categories.

    Parameters:
    included_categories (list): List of categories to include in the filtered dataset.
    full_dataset_path (str): Path to the full dataset file.

    Returns:
    df (DataFrame): The filtered dataset as a pandas DataFrame.
    """

    # Open and load the dataset from the specified file path
    with open(full_dataset_path, 'rb') as f:
        dataset_dict = pickle.load(f)
    
    # Convert the dataset dictionary to a DataFrame and filter by the specified categories
    df = DatasetConstructor.dict2df(dataset_dict, included_categories=included_categories, save_dataset=False)

    return df


def get_unique_combinations(section_list):
    """
    Generate all unique combinations of the sections given.

    Parameters:
    section_list (list): List of sections to generate combinations from.

    Returns:
    list: List of unique combinations, each combination also retains the MFT features.
    """

    # Initialize an empty list to store all combinations
    all_combinations = []

    # Iterate over the range from 1 to the length of the section list
    for i in range(1, len(section_list) + 1):
        # Generate combinations of the current length
        combinations = itertools.combinations(section_list, i)
        # Extend the all_combinations list with the new combinations
        all_combinations.extend(combinations)
    
    # Convert each combination to a list and append "mft" to it
    return [list(comb) + ["mft"] for comb in all_combinations]

# Get unique combinations based on feature categories
sections = ['timbre','dynamics','rhythm','harmony','melody']
unique_combinations = get_unique_combinations(sections)

print(f"Number of models: {len(unique_combinations)}\n")


In [None]:
# Conduct category elemination feature selection

f1_values_feature_comparison = {}

#create blank nested dictionary for f1 scores
for mft in mft_list:
    f1_values_feature_comparison[mft] = {}

# For each combination of the category
for unique_combination in unique_combinations:
    # Get the right columsn of the dataset
    df_by_category = get_data_by_category(unique_combination)

    print(f"Current combination: {', '.join(unique_combination[:-1])}")
    print(f'The number of columns in the DataFrame is: {df.shape[1]}\n')
    # For each mft find the best model
    for mft in mft_list:
        print(f"Training model {mft}:")
        print("Splitting data...")
        _, _, X_train, X_test, y_train, y_test = prep_split_dataset_classifier(df_by_category, mft, columns_to_drop)
        
        # Perform gridsearch
        print("Grid search...")
        best_model = get_best_model_classifier(X_train, y_train)
        
        # Save data
        filename = f"reduced_feature_models/category_elim_{dataset_identifier}_{mft}_{'_'.join(unique_combination[:-1])}"
        save_data(filename, best_model)

        # Calculate f1 score
        y_pred = best_model.predict(X_test)
        f1 = f1_score(y_test, y_pred)
        f1_values_feature_comparison[mft][', '.join(unique_combination[:-1])] = f1
        print(f"f1: {f1}\n")

# Convert f1 dictionary to DataFrame
df_f1 = pd.DataFrame(f1_values_feature_comparison)
print(df_f1)
df_f1.to_csv("f1_data.csv")

### Bi-polar model

In [None]:
# Load the datasets
df_bill = pd.read_csv("dataset_billboard_MFT.csv")
df_WASABI = pd.read_csv("dataset_WASABI_MFT.csv")

df_morality = pd.concat([df_bill, df_WASABI], ignore_index=True)

# Categorise moral and immoral MFTs
moral_headers = ['care','fairness','loyalty','authority','purity']
immoral_headers = ['harm','cheating','betrayal','subversion','degradation']

# Add new column for morality
df_morality['moral'] = None

# Check for the conditions and set 'moral' column values
for i, row in df_morality.iterrows():
    moral_condition = (row[moral_headers] > 0.6).any() and (row[immoral_headers] <= 0.2).all()
    immoral_condition = (row[immoral_headers] > 0.6).any() and (row[moral_headers] <= 0.2).all()
    
    if moral_condition:
        df_morality.at[i, 'moral'] = 1
    elif immoral_condition:
        df_morality.at[i, 'moral'] = 0

# Remove rows that are morally ambiguous
df_morality = df_morality.dropna(subset=['moral'])

# Make sure 'moral' column is of int type
df_morality['moral'] = df_morality['moral'].astype(int)

# Save data
df_morality.to_csv('dataset_with_morality.csv', index=False)
print(df_morality.head())

In [None]:
imbalance_method = ""

print("Splitting data...")

# Use above function to prepare the dataset
_, _, X_train, X_test, y_train, y_test = prep_split_dataset_classifier(df_morality, "moral", columns_to_drop+["moral"])

# Perform gridsearch using the above function
print("Grid search...")
best_model = get_best_model_classifier(X_train, y_train)
print("Best model found.")  
    
# Predict on the test set
y_pred = best_model.predict(X_test)

# Calculate f1 to test performance
f1 = f1_score(y_test, y_pred)
print(f"F1: {f1}")

# Save model
filename = f"best_model_combined_data_moral{imbalance_method}.pkl"
save_data(filename, best_model)

## Analysis

### Cross-Dataset validation

In [None]:
import numpy as np

# Initialize the dictionary
f1s = {"billboard": {}, "WASABI": {}}

# Load the datasets
df_bill = pd.read_csv("dataset_billboard_MFT.csv")
df_WASABI = pd.read_csv("dataset_WASABI_MFT.csv")

def calculate_f1_scores(df, dataset_identifier):
    """
    Load model and get f1 score.

    Parameters:
    df (Dataframe): Data
    dataset_identifier (str):  Name of the dataset being analysed

    Returns:
    None
    """
    for mft in mft_list:
        # Get data
        X, y, _, _, _, _ = prep_split_dataset_classifier(df, mft, columns_to_drop)

        # Load model
        filename = f"best_model_{dataset_identifier}_{mft}{imbalance_method}.pkl"
        with open(filename, 'rb') as f:
            model = pickle.load(f)

        y_pred = model.predict(X)
        # Calculate f1 to test performance
        f1 = f1_score(y, y_pred)

        f1s[dataset_identifier][mft] = f1

# Calculate F1 scores for each dataset/model combination
calculate_f1_scores(df_WASABI, "billboard")
calculate_f1_scores(df_bill, "WASABI")

# Create a list of keys (moral values) and indices
keys = list(f1s["WASABI"].keys())
indices = np.arange(len(keys))

# Width of the bars
bar_width = 0.35

# Plot the f1 values for both sets
plt.figure(figsize=(10, 6))

plt.bar(indices, f1s["billboard"].values(), bar_width, color='skyblue', label='Train: Billboard, Test: WASABI')
plt.bar(indices + bar_width, f1s["WASABI"].values(), bar_width, color='grey', label='Train: WASABI, Test: Billboard')

# Add labels, title, and legend
plt.xlabel('Moral Value')
plt.ylabel('F1 Score')
plt.title('Cross-Dataset Validation')
plt.xticks(indices + bar_width / 2, keys, rotation=45)

plt.legend()
plt.show()

### SAGE

In [None]:
# For each MFT, calculate SAGE values
for mft in ["care", "harm", "cheating", "subversion", "degradation"]:
    # Get data
    print(f"Getting data...")
    X, y, _,_,_,_ = prep_split_dataset_classifier(df, mft, columns_to_drop)
    print("Loading model...")

    # Load model
    filename = f"best_model_{dataset_identifier}_{mft}{imbalance_method}.pkl"
    with open(filename, 'rb') as f:
        model = pickle.load(f)

    print("Getting sage...")
    # Calculate SAGE
    imputer = sage.MarginalImputer(model, X[:128].to_numpy())
    estimator = sage.PermutationEstimator(imputer, "cross entropy")
    sage_values = estimator(X.to_numpy(), y.to_numpy())
    sage_values.plot(X.columns, max_features=10, orientation="horizontal", color="skyblue")
    print("Saving sage...")
    filename = f'sage_{dataset_identifier}_{mft}{imbalance_method}.pkl'
    with open(filename, 'wb') as fp:
        pickle.dump(sage_values, fp)
        print('Explanation saved successfully to file\n')

In [None]:
# For each MFT, plot sage values
for mft in mft_list:
    print(f"Current MFT: {mft}")
    filename = f'sage_{dataset_identifier}_{mft}{imbalance_method}.pkl'
    with open(filename, 'rb') as f:
        sage_values = pickle.load(f)
    X, y, _,_,_,_ = prep_split_dataset_classifier("subversion", columns_to_drop)
    sage_values.plot(X.columns, max_features=10, orientation="horizontal")

### SHAP

In [None]:
#Calculate SHAP
imbalance_method = ""

# For each MFT, calculate shap values
for mft in mft_list:
    # Get data
    print("Getting data...")
    X, _, _,_,_,_ = prep_split_dataset_classifier(df, mft, columns_to_drop)
    print("Loading model...")

    # Load the model
    filename = f"best_model_{dataset_identifier}_{mft}{imbalance_method}.pkl"
    with open(filename, 'rb') as f:
        model = pickle.load(f)

    print("Getting SHAP...")

    # Float 64 required for SHAP library
    X = X.astype('float64')

    # Compute SHAP values
    explainer = shap.Explainer(model, X)
    shap_values = explainer(X)    
    
    print("Saving SHAP...")
    filename = f'shap_{dataset_identifier}_{mft}{imbalance_method}.pkl'
    # save dictionary 
    with open(filename, 'wb') as fp:
        pickle.dump(shap_values, fp)
        print('Explanation saved successfully to file\n')

In [None]:
# Set up colours for Beeswarm plot
colors = ["grey","skyblue"]
cmap = LinearSegmentedColormap.from_list("custom_cmap", colors)

# Plot Beeswarm plot of SHAP values
for mft in mft_list:
    print(f"Current MFT: {mft}")
    filename = f'shap_{dataset_identifier}_{mft}{imbalance_method}.pkl'
    with open(filename, 'rb') as f:
        shap_values = pickle.load(f)
        shap.plots.beeswarm(shap_values, color=cmap)