# Forest elephant rumble vocalisation analysis

This Jupyter notebook provides a step-by-step guide to using pre-trained CNNs via transfer learning to identify rumble vocalisation sub-types and to identify associations between rumble acoustic features and different contexts (age, sex, behaviour, distress). This builds on the call-types analysis notebook using the main techniques of transfer learning to automatically extract acoustic features and unsupervised ordination of these features into lower dimensional space. 

## Dataset Description

This dataset contains information on African forest elephant vocalisations recorded in Dzanga-Bai clearing, Central African Republic, between September 2018 and April 2019 by the Elephant Listening Project.

It includes:

1. `elephant_vocalisations.csv`: A table of 1254 annotated vocalisations, each with start time, end time, frequency range, call type (roar, rumble, or trumpet), and corresponding audio file.
2. `elephant_contextual_observations.csv`: A table containing 359 entries, each describing the context in which a specific audio recording was made. This includes information about the elephants vocalising (e.g., age, sex, behaviour) and the overall context of the recording (distress).  Importantly, each audio file may contain multiple individual vocalisations, but all the vocalisations within that file share the same contextual information.
3. `{model}_vocalisations_features.parquet`: Parquet files storing acoustic features extracted from the vocalisations using the workflow described in the `1_feature_extraction` notebook. Features are extracted using four different CNN models (VGGish, YAMNet, BirdNET, and Perch).


## Steps

1. **Set-up**: Load the vocalisationa and contextual data, as well as the pre-computed features.
2. **Data Filtering and Aggregation**: Select only rumble vocalisations and average the acoustic features across all rumbles within each audio file. This ensures that each data point represents the overall acoustic characteristics of rumbles within a specific context.
3. **Dimensionality reduction**: Project the acoustic features into 2D space as the basis for clustering and statistical analysis
4. **Unsupervised clustering: rumble sub-type identification**: Use affinity propagation to automatically cluster the rumble features and visually inspect age, sex, behaviour and distress composition of clusters
5. **Behavioural and demographic analysis**: Perform statistical tests to assess the signficance of changes in acoustic features and different contexts.

**Note:** Feature extraction is a computationally intensive process. 
To avoid re-computation, this notebook uses pre-computed features. 
Refer to the `1_feature_extraction_notebook` for details on the feature extraction workflow.

### 1. Set up

Here we import all the relevant data.

In [1]:
from pathlib import Path

import pandas as pd

Now we load the table containing information about each of the elephant vocalisations.

In [None]:
DATA_DIR = Path("data")
OUTPUTS_DIR = Path("outputs")

vocalisations = pd.read_csv(DATA_DIR / "elephant_vocalisations.csv")
context = pd.read_csv(DATA_DIR / "elephant_contextual_observations.csv")

vocalisations.head()

In [3]:
MODELS = ["VGGish", "YAMNet", "BirdNET", "Perch"]

features = {
    model: pd.read_parquet(
        OUTPUTS_DIR / f"{model.lower()}_vocalisation_features.parquet"
    )
    for model in MODELS
}

## 2. Data Filtering and Aggregation

In [4]:
# Merge contextual information to full vocalisation dataset
df = pd.merge(
    left=vocalisations,
    right=context,
    left_on="filename",
    right_on="filename",
)

# We no longer need info about the temporal and spectral span of each vocalisation
df = df.drop(columns=["start_time", "end_time", "low_freq", "high_freq", "duration"])

# Remove files that contain non-rumble vocalisations
df = df.groupby("filename").filter(
    lambda group: group["call_type"].isin(["Rumble"]).all()
)

In [5]:
def aggregate_rumble_features(metadata, features):
    # Join the feature table with the metadata table to associate vocalisation features
    # to the file in which they appear.
    merged = pd.merge(
        left=metadata[["filename", "vocalisation_id"]],
        right=features,
        left_on="vocalisation_id",
        right_index=True,
        how="inner",
    )

    # Compute the average features of all vocalisations from the same file.
    return merged.drop(columns=["vocalisation_id"]).groupby("filename").mean()

In [6]:
# Compute the aggregated features for all models
aggregated = {
    model: aggregate_rumble_features(df, feats) for model, feats in features.items()
}

### 3. Dimensionality reduction

Now that we have the feature embeddings for each of the 361 recordings we need to reduce this high-dimensional data into lower dimensional space to make it interpretable to the human brain and usable in the statistical tests. This involves 2 steps:
1. Normalise the embeddings so that their mean = 0 and variance = 1. This ensures equal weighting of the features
2. Carry out the dimensionality reduction with specified parameters, including the number of components (2) and distance metric we want to use (cosine).

**Normalisation step**

In [7]:
from sklearn.preprocessing import StandardScaler


# Function to normalise the DataFrame
def normalise_features(features):
    # Initialize the scaler
    scaler = StandardScaler()

    # Fit and transform the features
    normalised_features = scaler.fit_transform(features)

    # Create a new DataFrame for the normalised features
    normalised = pd.DataFrame(
        normalised_features,
        columns=features.columns,
        index=features.index,
    )

    # Return the normalised DataFrame
    return normalised

In [8]:
# Normalise aggregated features from all models
normalised = {model: normalise_features(agg) for model, agg in aggregated.items()}

**Dimensionality reduction**

In [9]:
import umap.umap_ as umap

# Specify the UMAP parameters
N_COMP = 2  # select 1, 2 or 3 dimensions
METRIC = "cosine"  # distance metric used
N_NEIGHBORS = 15
MIN_DIST = 0
RANDOM_STATE = 2204


# Function to fit UMAP and merge metadata
def process_umap(
    normalised_df,
    metadata_df,
    n_comp=N_COMP,
    metric=METRIC,
    min_dist=MIN_DIST,
    n_neighbors=N_NEIGHBORS,
    random_state=RANDOM_STATE,
):
    # Instantiate UMAP projector with provided parameters
    reducer = umap.UMAP(
        n_components=N_COMP,
        metric=metric,
        min_dist=min_dist,
        random_state=random_state,
    )

    # Fit UMAP and obtain embeddings
    embedding = reducer.fit_transform(normalised_df)

    # Create DataFrame with UMAP embeddings, preserving 'vocalisation_id' as index
    umap_results = pd.DataFrame(
        embedding,
        columns=[f"UMAP{i + 1}" for i in range(N_COMP)],
        index=normalised_df.index,
    )

    # Merge UMAP coordinates with metadata to obtain the
    # corresponding call type
    return umap_results.merge(metadata_df, on="filename", how="left")

In [None]:
# Project features using UMAP for all models
umaps = {model: process_umap(norm, df) for model, norm in normalised.items()}

### 4. Unsupervised Clustering: Rumble Sub-type identification

In [11]:
from sklearn.cluster import AffinityPropagation

In [None]:
random_state = 2204
damping = 0.95
preference = -90
max_iter = 1000
N_COMP = 2


def affinity_propagation_clustering(
    features_standardized,
    damping,
    preference,
    random_state,
    max_iter,
):
    # Perform affinity propagation clustering
    affinity_propagation = AffinityPropagation(
        damping=damping,
        preference=preference,
        random_state=random_state,
        max_iter=max_iter,
    )
    cluster_labels = affinity_propagation.fit_predict(features_standardized)
    return cluster_labels


# Perform affinity propagation clustering and save the new cluster column for each of the 4 UMAP files
# Loop through each UMAP DataFrame
for umap_name, umap_df in umaps.items():
    # Dynamically select the UMAP columns based on N_COMP
    umap_columns = [f"UMAP{i}" for i in range(1, N_COMP + 1)]
    embeddings = umap_df[umap_columns].values

    # Perform affinity propagation clustering
    cluster_labels = affinity_propagation_clustering(
        embeddings,
        damping=damping,
        preference=preference,
        random_state=random_state,
        max_iter=max_iter,
    )

    # Add the 'Cluster' column to the DataFrame
    umap_df["Cluster"] = cluster_labels

    # Print the updated DataFrame
    print(f"Cluster labels added to {umap_name}")

We can visually assess the level of separation achieved by the unsupervised methods but this needs to be accompanied by a statistical analysis of its performance. We will do this by calculating the overall silhouette score for the data and the silhouette score per call-type.

The silhouete score is a mathematical representation of the tightness and separation of each cluster (Rousseeuw, 1987). It is calculated as the mean Euclidean distance between data points within a cluster (a) and the mean Euclidean distance to points in the nearest cluster (Rousseeuw, 1987).

Typically, silhouette scores >=0.5 show evidence for clustering, while those >=0.7 show strong evidence for clustering.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import silhouette_samples, silhouette_score

# Define a darker version of the colour-blind friendly palette
darkened_palette = [
    "#b57600",  # Darker orange
    "#3d7faf",  # Darker sky blue
    "#007b5a",  # Darker bluish green
    "#b2a62d",  # Darker yellow
    "#005082",  # Darker blue
    "#a64700",  # Darker vermillion
    "#a14376",  # Darker reddish purple
    "#707070",  # Darker grey
]

# Define different marker shapes
marker_shapes = [
    "o",
    "s",
    "D",
    "^",
    "P",
    "X",
    "*",
    "v",
]  # circle, square, diamond, triangle, etc.

# Set the style of the visualisation
sns.set(style="white")


# Function to calculate silhouette scores and plot UMAPs or boxplot (depending on N_COMP)
def plot_umap_with_silhouette(umap_df, model_name, ax, n_comp):
    if n_comp == 1:
        # Calculate silhouette scores for the boxplot title
        labels = umap_df["Cluster"]
        silhouette_avg = silhouette_score(umap_df[["UMAP1"]], labels)

        # Create a boxplot if N_COMP is 1
        boxplot = sns.boxplot(
            x="Cluster", y="UMAP1", data=umap_df, palette=darkened_palette, ax=ax
        )
        ax.set_title(f"{model_name} (Silhouette: {silhouette_avg:.2f})", fontsize=14)
        ax.set_xlabel("Cluster", fontsize=12)
        ax.set_ylabel("UMAP1", fontsize=12)
        ax.tick_params(axis="x", labelsize=12)
        ax.tick_params(axis="y", labelsize=12)

        # Extend y-axis to make space for the legend
        ymin, ymax = ax.get_ylim()
        ax.set_ylim(ymin - 0.1 * (ymax - ymin), ymax + 0.1 * (ymax - ymin))

        # Create a custom legend to show clusters and their respective colours
        cluster_legend = []
        unique_labels = sorted(umap_df["Cluster"].unique())
        for i, label in enumerate(unique_labels):
            cluster_legend.append(
                mpatches.Patch(color=darkened_palette[i], label=f"Cluster {label}")
            )

        ax.legend(
            handles=cluster_legend,
            title="Clusters",
            title_fontsize=12,
            fontsize=10,
            loc="best",
        )

    else:
        # Calculate silhouette scores across all data points
        labels = umap_df["Cluster"]
        silhouette_avg = silhouette_score(umap_df[["UMAP1", "UMAP2"]], labels)
        silhouette_values = silhouette_samples(umap_df[["UMAP1", "UMAP2"]], labels)

        # Prepare a dictionary to store average silhouette scores for each label
        silhouette_dict = {}
        unique_labels = sorted(labels.unique())
        for label in unique_labels:
            label_indices = labels == label
            avg_silhouette_score = silhouette_values[label_indices].mean()
            silhouette_dict[label] = avg_silhouette_score

        # Scatter plot with different marker shapes for each cluster
        for i, label in enumerate(unique_labels):
            cluster_data = umap_df[umap_df["Cluster"] == label]
            ax.scatter(
                x=cluster_data["UMAP1"],
                y=cluster_data["UMAP2"],
                label=f"Cluster {label} (Silhouette: {silhouette_dict[label]:.2f})",
                color=darkened_palette[i],
                marker=marker_shapes[i % len(marker_shapes)],
                s=80,  # Increased marker size
                alpha=0.8,  # Slight transparency for better visual contrast
            )

        # Extend x and y axes to make space for the legend
        xmin, xmax = ax.get_xlim()
        ymin, ymax = ax.get_ylim()
        ax.set_xlim(xmin - 0.1 * (xmax - xmin), xmax + 0.1 * (xmax - xmin))
        ax.set_ylim(ymin - 0.1 * (ymax - ymin), ymax + 0.1 * (ymax - ymin))

        # Set the title, labels, parameters
        ax.set_title(f"{model_name} (Silhouette: {silhouette_avg:.2f})", fontsize=14)
        ax.set_xlabel("UMAP1", fontsize=12)
        ax.set_ylabel("UMAP2", fontsize=12)
        ax.tick_params(axis="x", labelsize=12)
        ax.tick_params(axis="y", labelsize=12)

        # Custom legend for clusters
        ax.legend(title="Cluster", title_fontsize=12, fontsize=10, loc="best")


# Example of plotting all 4 models' UMAPs in a 2x2 grid
fig, axs = plt.subplots(2, 2, figsize=(20, 20), dpi=300)

# Replace these dataframes with your actual dataframes
plot_umap_with_silhouette(umaps["VGGish"], "VGGish", axs[0, 0], n_comp=N_COMP)
plot_umap_with_silhouette(umaps["Perch"], "Perch", axs[0, 1], n_comp=N_COMP)
plot_umap_with_silhouette(umaps["YAMNet"], "YAMNet", axs[1, 0], n_comp=N_COMP)
plot_umap_with_silhouette(umaps["BirdNET"], "BirdNET", axs[1, 1], n_comp=N_COMP)

# Adjust layout for a clean 2x2 grid
plt.tight_layout(pad=4.0)

# Show the plot
plt.show()

**Visual inspection of rumble sub-types**

Plot column charts of the unsupervised clusters split by their Age, Sex, Distress and Behaviour labels to identify patterns in the cluster composition

In [None]:
# Set the seaborn style and color palette
sns.set_style("whitegrid")
sns.set_palette("colorblind")

# Define the variables to be plotted
variables = ["Age", "Sex", "Behaviour", "Distress"]

# Define custom titles for each variable
titles = ["Age", "Sex", "Behaviour", "Distress"]

# Specify the order of models to display on the 2x2 grid
model_order = ["VGGish", "Perch", "YAMNet", "BirdNET"]  #

# Loop through each variable and create the 2x2 grid of subplots
for i, (variable, title) in enumerate(zip(variables, titles)):
    # Set up the figure and axes for the 2x2 grid
    fig, axes = plt.subplots(2, 2, figsize=(14, 10), dpi=300)

    # To store unique handles and labels for a single legend
    unique_handles_labels = []

    # Loop through each model in the specified order
    for j, model_name in enumerate(model_order):
        umap_df = umaps[model_name]  # Access the UMAP DataFrame based on the model name
        df_filtered = umap_df.copy()  # Make a copy of the DataFrame

        # Modify 'Distress' to group 'Unknown' and 'Not Applicable' into 'Other'
        if variable == "Distress":
            df_filtered[variable] = df_filtered[variable].replace(
                ["Unknown", "Not Applicable"], "Other"
            )

        # Exclude 'Anti-Predator' and 'Sexual' levels for the 'Behaviour' variable
        if variable == "Behaviour":
            df_filtered = df_filtered[
                (df_filtered[variable] != "Anti-Predator")
                & (df_filtered[variable] != "Sexual")
                & (df_filtered[variable] != "Unknown")
            ]

        # Exclude 'Unknown' values for other variables
        if variable in ["Age", "Sex"]:
            df_filtered = df_filtered[df_filtered[variable] != "Unknown"]

        # Calculate counts for each category in the variable, grouped by Cluster
        counts = df_filtered.groupby(["Cluster", variable]).size().unstack().fillna(0)

        # Calculate proportions
        proportions = counts.div(counts.sum(axis=0), axis=1) * 100

        # Plot the bar chart in the corresponding subplot for each model
        ax = proportions.plot(
            kind="bar", stacked=False, ax=axes[j // 2, j % 2], legend=False
        )
        ax.set_title(f"{model_name} - {title}", fontsize=12)
        ax.set_xlabel("Cluster")
        ax.set_ylabel("Proportion of category per cluster (%)")

        # Collect unique legend handles and labels for the categories
        handles, labels = ax.get_legend_handles_labels()
        for handle, label in zip(handles, labels):
            if label not in [lbl for _, lbl in unique_handles_labels]:
                unique_handles_labels.append((handle, label))

        # Rotate cluster labels to be horizontal
        ax.set_xticklabels(ax.get_xticklabels(), rotation=0)

    # Add one shared legend for all subplots with unique categories only
    if unique_handles_labels:
        handles, labels = zip(*unique_handles_labels)
        fig.legend(
            handles,
            labels,
            bbox_to_anchor=(1.01, 0.5),  # Move legend closer to the plots
            loc="center left",
            title="Category",
            fontsize=10,
            title_fontsize=12,
            borderaxespad=0.5,  # Reduce padding between legend and axes
        )

    # Adjust layout for better spacing and make room for legend
    plt.tight_layout(
        rect=[0, 0, 0.99, 1], pad=1.0
    )  # Shrink white space between plots and legend
    plt.show()


In [None]:
# Set the seaborn style and color palette
sns.set_style("whitegrid")
sns.set_palette("colorblind")

# Define the UMAP DataFrame for the Perch model
perch_df = umaps["YAMNet"]

# Define the variables to be plotted and their titles
variables = ["Age", "Sex", "Behaviour", "Distress"]
titles = ["Age", "Sex", "Behaviour", "Distress"]

# Set up the figure and axes for the 2x2 grid
fig, axes = plt.subplots(2, 2, figsize=(12, 8), dpi=900)

# Loop through each variable and create subplots for each one
for i, (variable, title) in enumerate(zip(variables, titles)):
    ax = axes[i // 2, i % 2]  # Get the appropriate subplot

    df_filtered = perch_df.copy()  # Make a copy of the Perch DataFrame

    # Modify 'Distress' to group 'Unknown' and 'Not Applicable' into 'Other'
    if variable == "Distress":
        df_filtered[variable] = df_filtered[variable].replace(
            ["Unknown", "Not Applicable"], "Other"
        )

    # Exclude 'Anti-Predator' and 'Sexual' levels for the 'Behaviour' variable
    if variable == "Behaviour":
        df_filtered = df_filtered[
            (df_filtered[variable] != "Anti-Predator")
            & (df_filtered[variable] != "Sexual")
            & (df_filtered[variable] != "Unknown")
        ]

    # Exclude 'Unknown' values for other variables
    if variable in ["Age", "Sex"]:
        df_filtered = df_filtered[df_filtered[variable] != "Unknown"]

    # Calculate counts for each category in the variable, grouped by Cluster
    counts = df_filtered.groupby(["Cluster", variable]).size().unstack().fillna(0)

    # Calculate proportions
    proportions = counts.div(counts.sum(axis=0), axis=1) * 100

    # Plot the bar chart in the corresponding subplot
    proportions.plot(kind="bar", stacked=False, ax=ax)
    ax.set_title(f"YAMNet - {title}")
    ax.set_xlabel("Cluster")
    ax.set_ylabel("Proportion of category per cluster (%)")

    # Remove 'Unknown' and 'Anti-Predator' from the legend
    handles, labels = ax.get_legend_handles_labels()
    filtered_handles_labels = [
        (h, l)
        for h, l in zip(handles, labels)
        if l not in ["Unknown", "Anti-Predator", "Sexual"]
    ]
    if filtered_handles_labels:
        handles, labels = zip(*filtered_handles_labels)
        ax.legend(
            handles,
            labels,
            bbox_to_anchor=(1.05, 1),
            loc="upper left",
            title="Category",
        )
    else:
        ax.legend().remove()

    # Rotate cluster labels to be horizontal
    ax.set_xticklabels(ax.get_xticklabels(), rotation=0)

# Adjust layout for better spacing between subplots
plt.tight_layout()
plt.show()

## 5. Behavioural and Demographic Analysis

Here we specify and run a Generalised Linear Model to test the statistical significance of the different categories (Age, Sex, Behaviour and Distress) on the UMAP-ordinated acoustic embeddings from each model. 

**Calculate model performance metrics**

In [None]:
import pandas as pd
import statsmodels.api as sm
from scipy.stats import chi2
from sklearn.cross_decomposition import CCA
from sklearn.decomposition import PCA, FactorAnalysis
from sklearn.metrics import (
    mean_absolute_error,
    mean_squared_error,
    r2_score,
    root_mean_squared_error,
)


# Function to fit GLM and get category-level significance
def fit_glm_and_get_category_significance(df):
    full_formula = 'UMAP_factor ~ C(Behaviour, Treatment("Separation")) + C(Distress, Treatment("No distress")) + C(Age) + C(Sex)'
    full_model = sm.GLM.from_formula(
        full_formula,
        family=sm.families.Gaussian(sm.families.links.Identity()),
        data=df,
    ).fit()

    category_p_values = {}
    for category in ["Behaviour", "Distress", "Age", "Sex"]:
        reduced_formula = "UMAP_factor ~ " + " + ".join(
            [
                (
                    f'C({cat}, Treatment("Separation"))'
                    if cat == "Behaviour"
                    else (
                        f'C({cat}, Treatment("No distress"))'
                        if cat == "Distress"
                        else f"C({cat})"
                    )
                )
                for cat in ["Behaviour", "Distress", "Age", "Sex"]
                if cat != category
            ]
        )
        reduced_model = sm.GLM.from_formula(
            reduced_formula,
            family=sm.families.Gaussian(sm.families.links.Identity()),
            data=df,
        ).fit()

        lr_stat = 2 * (full_model.llf - reduced_model.llf)
        p_value = chi2.sf(lr_stat, df=full_model.df_model - reduced_model.df_model)
        category_p_values[category] = p_value

    return full_model, category_p_values


# Function to analyse model performance
def analyze_model_performance(df, glm_result):
    actual_values = df["UMAP_factor"]
    predicted_values = glm_result.fittedvalues

    # Align indices to avoid shape mismatches
    actual_values, predicted_values = actual_values.align(
        predicted_values, join="inner"
    )

    mae = mean_absolute_error(actual_values, predicted_values)
    rmse = root_mean_squared_error(actual_values, predicted_values)
    r2 = r2_score(actual_values, predicted_values)
    nrmse = rmse / (max(df["UMAP_factor"]) - min(df["UMAP_factor"]))

    return mae, rmse, r2, nrmse


# Format and export results
def format_and_export_results(final_results, filename):
    def format_p_value(p):
        if p < 0.001:
            return f"{p:.4f}***"
        elif p < 0.01:
            return f"{p:.4f}**"
        elif p < 0.05:
            return f"{p:.4f}*"
        else:
            return f"{p:.4f}"

    final_results[["MAE", "RMSE", "R2", "NRMSE"]] = final_results[
        ["MAE", "RMSE", "R2", "NRMSE"]
    ].map(lambda x: f"{x:.4f}")
    for col in ["Age", "Sex", "Behaviour", "Distress"]:
        if col in final_results.columns:
            final_results[col] = final_results[col].apply(format_p_value)
    final_results.to_csv(filename)
    print(f"Exported results to {filename}")


# Main Execution
cat_vars = ["Age", "Sex", "Behaviour", "Distress"]
dimensionality_reduction_method = "PCA"
results = {}
category_p_values = {}
filtered_dfs = {}

# Preprocess and store filtered data once for all analysis
for model_name, umap_df in umaps.items():
    filtered_df = umap_df.copy()
    filtered_df["Distress"] = filtered_df["Distress"].replace(
        ["Unknown", "Not Applicable"], "Other"
    )
    filtered_df = filtered_df[
        (filtered_df["Behaviour"] != "Unknown")
        & (filtered_df["Behaviour"] != "Sexual")
        & (filtered_df["Behaviour"] != "Anti-Predator")
        & (filtered_df["Age"] != "Unknown")
        & (filtered_df["Sex"] != "Unknown")
    ]
    filtered_df[cat_vars] = filtered_df[cat_vars].astype("category")

    umap_columns = [f"UMAP{i}" for i in range(1, N_COMP + 1)]

    if N_COMP == 1:
        filtered_df["UMAP_factor"] = filtered_df["UMAP1"]
    else:
        if dimensionality_reduction_method == "PCA":
            pca = PCA(n_components=N_COMP)
            filtered_df["UMAP_factor"] = pca.fit_transform(filtered_df[umap_columns])[
                :, 0
            ]
        elif dimensionality_reduction_method == "FA":
            fa = FactorAnalysis(n_components=N_COMP)
            filtered_df["UMAP_factor"] = fa.fit_transform(filtered_df[umap_columns])[
                :, 0
            ]
        elif dimensionality_reduction_method == "CCA":
            cca = CCA(n_components=N_COMP)
            cca.fit(filtered_df[umap_columns], filtered_df[umap_columns])
            filtered_df["UMAP_factor"] = cca.transform(filtered_df[umap_columns])[:, 0]

    filtered_dfs[model_name] = filtered_df

# GLM fitting and performance analysis using preprocessed data
for model_name, filtered_df in filtered_dfs.items():
    glm_result, cat_p_values = fit_glm_and_get_category_significance(filtered_df)
    mae, rmse, r2, nrmse = analyze_model_performance(filtered_df, glm_result)

    results[model_name] = {
        "MAE": mae,
        "RMSE": rmse,
        "R2": r2,
        "NRMSE": nrmse,
        "glm_result": glm_result,
    }
    category_p_values[model_name] = cat_p_values

# Combine performance metrics and category-level significance into a DataFrame
performance_results = pd.DataFrame(results).T[["MAE", "RMSE", "R2", "NRMSE"]]
category_significance = pd.DataFrame(category_p_values).T
final_results = pd.concat([performance_results, category_significance], axis=1)
final_results.index.name = "Model"

# Export results to CSV
format_and_export_results(
    final_results,
    OUTPUTS_DIR / "glm_category_performance_metrics.csv",
)

# Display final results
final_results

**Calculate GLM coefficients per model**

Next we extract the GLM coefficients for each level for each category

In [None]:
# Function to format coefficients, standard errors, and p-values
def format_coefficients_table(details):
    def format_p_value(p):
        if p < 0.001:
            return f"{p:.4f}***"
        elif p < 0.01:
            return f"{p:.4f}**"
        elif p < 0.05:
            return f"{p:.4f}*"
        else:
            return f"{p:.4f}"

    details["Coefficient"] = details["Coefficient"].apply(lambda x: f"{x:.4f}")
    details["Std Error"] = details["Std Error"].apply(lambda x: f"{x:.4f}")
    details["P-Value"] = details["P-Value"].apply(format_p_value)
    return details


# Function to extract coefficients, standard errors, and p-values
def extract_glm_details(glm_result):
    glm_summary = glm_result.summary2().tables[1]
    glm_summary = glm_summary[["Coef.", "Std.Err.", "P>|z|"]]
    glm_summary.rename(
        columns={"Coef.": "Coefficient", "Std.Err.": "Std Error", "P>|z|": "P-Value"},
        inplace=True,
    )
    return glm_summary


# call block
model_details = {}

for model_name, details in results.items():
    # Retrieve the stored GLM result
    glm_result = details["glm_result"]

    # Extract coefficients, standard errors, and p-values
    glm_details = extract_glm_details(glm_result)

    # Add model name for later identification and format the table
    glm_details["Model"] = model_name
    glm_details = format_coefficients_table(glm_details)

    model_details[model_name] = glm_details

# Combine all models into a single DataFrame and export
combined_details = pd.concat(
    model_details.values(), keys=model_details.keys(), names=["Model", "Index"]
)
combined_details.to_csv(OUTPUTS_DIR / "glm_model_coefficients_formatted.csv")
print("Exported formatted coefficients to model_coefficients_formatted.csv")

combined_details

**Inspect residuals for normality**

We validate that the residuals from the GLM do not contravene the assumptions of normality

In [None]:
import matplotlib.pyplot as plt
import statsmodels.api as sm


def plot_residuals_and_qq_for_glms(results, filtered_dfs):
    """
    Generate residual plots and QQ plots for all GLMs, one per model.
    Includes:
    - Residuals vs Fitted Values
    - Histogram of Residuals
    - QQ Plot of Residuals
    """
    models = list(results.keys())

    for model_name in models:
        # Retrieve the GLM result object and corresponding filtered DataFrame
        glm_result = results[model_name]["glm_result"]  # Extract the fitted GLM object
        if glm_result is None:
            print(f"GLM result for {model_name} is missing!")
            continue

        filtered_df = filtered_dfs[
            model_name
        ]  # Use the filtered DataFrame used for fitting

        # Extract fitted values and actual values, ensuring alignment to avoid mismatches
        fitted_values = glm_result.fittedvalues
        actual_values = filtered_df["UMAP_factor"]

        # Align indices to ensure no shape mismatch
        actual_values, fitted_values = actual_values.align(fitted_values, join="inner")
        residuals = actual_values - fitted_values

        # Handle case where no residuals can be calculated due to mismatched data
        if residuals.empty:
            print(f"No residuals to plot for {model_name} due to mismatched data!")
            continue

        # Set up the figure
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))

        # 1. Residuals vs Fitted Values
        axes[0].scatter(fitted_values, residuals, alpha=0.7, edgecolor="k")
        axes[0].axhline(0, color="red", linestyle="--", linewidth=1)
        axes[0].set_title(f"Residuals vs Fitted - {model_name}", fontsize=14)
        axes[0].set_xlabel("Fitted Values", fontsize=12)
        axes[0].set_ylabel("Residuals", fontsize=12)

        # 2. Histogram of Residuals
        axes[1].hist(residuals, bins=20, color="blue", alpha=0.7, edgecolor="k")
        axes[1].set_title(f"Residual Histogram - {model_name}", fontsize=14)
        axes[1].set_xlabel("Residuals", fontsize=12)
        axes[1].set_ylabel("Frequency", fontsize=12)

        # 3. QQ Plot of Residuals
        sm.qqplot(residuals, line="s", ax=axes[2])
        axes[2].set_title(f"QQ Plot - {model_name}", fontsize=14)

        # Tidy up and show the plots
        plt.tight_layout()
        plt.show()


# Call the function with updated inputs
plot_residuals_and_qq_for_glms(results, filtered_dfs)

**Perform Tukey pairwise analysis of each level in each category**

We the run a Tukey Honestly Significantly Different (HSD) pairwise analysis. This enables us to assess which levels (e.g., Adult) in each category (e.g., Age) are signficantly different from other levels (e.g., Infant) within the same category

In [None]:
import itertools

import numpy as np
from statsmodels.stats.multicomp import pairwise_tukeyhsd
from tabulate import tabulate


# Function to calculate Cohen's d
def calculate_cohens_d(group1, group2):
    mean_diff = np.mean(group1) - np.mean(group2)
    pooled_std = np.sqrt(
        (
            (len(group1) - 1) * np.var(group1, ddof=1)
            + (len(group2) - 1) * np.var(group2, ddof=1)
        )
        / (len(group1) + len(group2) - 2)
    )
    return mean_diff / pooled_std


# Function to run pairwise Tukey analysis and calculate Cohen's d
def run_pairwise_tukey_with_cohens_d(df, var, model_name):
    levels = df[var].unique()
    combinations = list(itertools.combinations(levels, 2))
    tukey_results_all = []

    for combo in combinations:
        subset_df = df[df[var].isin(combo)]
        tukey_results_var = pairwise_tukeyhsd(subset_df["UMAP_factor"], subset_df[var])

        # Calculate Cohen's d
        group1 = subset_df[subset_df[var] == combo[0]]["UMAP_factor"]
        group2 = subset_df[subset_df[var] == combo[1]]["UMAP_factor"]
        cohens_d = calculate_cohens_d(group1, group2)

        # Extract the relevant data
        for i in range(len(tukey_results_var.reject)):
            tukey_results_all.append(
                {
                    "Model": model_name,
                    "Variable": var,
                    "Level_1": combo[0],
                    "Level_2": combo[1],
                    "P-Value": float(tukey_results_var.pvalues[i]),  # Keep p-value
                    "Cohen_D": round(cohens_d, 4),  # Add Cohen's d
                    "Reject_Null_Hypothesis": bool(
                        tukey_results_var.reject[i]
                    ),  # Track rejection for agreement count
                }
            )

    # Convert to DataFrame
    tukey_results_df = pd.DataFrame(tukey_results_all)
    return tukey_results_df


# Function to convert p-value to significance asterisks
def p_value_to_stars(p_value):
    """Convert p-value to asterisks and include the p-value with 4 decimal places."""
    p_value_rounded = round(p_value, 4)
    if p_value < 0.001:
        return f"{p_value_rounded} ***"
    elif p_value < 0.01:
        return f"{p_value_rounded} **"
    elif p_value < 0.05:
        return f"{p_value_rounded} *"
    else:
        return f"{p_value_rounded}"  # Return just the p-value for non-significant cases


# Initialize an empty list to store results across all models
all_tukey_results = []

# Run pairwise Tukey tests for each variable in each model's DataFrame
for model_name, filtered_df in filtered_dfs.items():
    # Filter the DataFrame
    cat_vars = ["Age", "Sex", "Behaviour", "Distress"]
    filtered_df[cat_vars] = filtered_df[cat_vars].astype("category")

    # Run Tukey analysis for each variable in the current model and append results
    for fixed_effect in ["Behaviour", "Distress", "Age", "Sex"]:
        tukey_results_df = run_pairwise_tukey_with_cohens_d(
            filtered_df, fixed_effect, model_name
        )
        all_tukey_results.append(tukey_results_df)

# Concatenate all Tukey results into one DataFrame
tukey_all_df = pd.concat(all_tukey_results)

# Pivot the results to create a table with significance and Cohen's d for each model
p_values_df = tukey_all_df.pivot_table(
    index=["Level_1", "Level_2", "Variable"],
    columns="Model",
    values="P-Value",
    aggfunc="first",
)

cohen_d_df = tukey_all_df.pivot_table(
    index=["Level_1", "Level_2", "Variable"],
    columns="Model",
    values="Cohen_D",
    aggfunc="first",
)

# Apply the p-value to asterisks conversion
asterisk_df = p_values_df.map(p_value_to_stars)

# Merge Cohen's d and p-values with asterisks into the final table
final_df = pd.concat(
    [asterisk_df.add_suffix("_P-Value"), cohen_d_df.add_suffix("_Cohen_D")], axis=1
)

# Calculate the Agreement_Count column
final_df["Agreement_Count"] = tukey_all_df.pivot_table(
    index=["Level_1", "Level_2", "Variable"],
    columns="Model",
    values="Reject_Null_Hypothesis",
    aggfunc="first",
).sum(axis=1)

# Reset the index to make the DataFrame more readable
final_df.reset_index(inplace=True)

# Sort by the Variable column
final_df.sort_values(by="Variable", inplace=True)

# Optionally, save to CSV for further analysis
final_df.to_csv(OUTPUTS_DIR / "tukey_with_pvalues_and_cohens_d.csv", index=False)

# Display the nicely formatted table
table_str = tabulate(final_df, headers="keys", tablefmt="pretty", showindex=False)
print(table_str)

**Present pairwise significance values in bubble chart**

In [None]:
import colorsys
import re

import plotly.graph_objects as go


def plot_bubbles_bigger_labels_spaced(pivot_df):
    """
    Final snippet:
      - 5 columns: vggish, perch, yamnet, birdnet, All
      - Numeric y-axis, no extra space
      - Colour-blind-friendly palette (Okabe & Ito)
      - Dashed vertical line at x=3.5
      - Bubble saturation by min p-value in the row (lower p => more saturation)
      - If bubble_size < 35 => text above bubble in black
      - If bubble_size >= 35 and saturation>1.7 => text in bubble, white + bold
      - Else text in bubble, black normal
    """

    df = final_df.copy()

    # 1) Model columns & names
    model_cols = [
        "VGGish_P-Value",
        "Perch_P-Value",
        "YAMNet_P-Value",
        "BirdNET_P-Value",
        "Agreement_Count",
    ]
    model_names = ["VGGish", "Perch", "YAMNet", "BirdNET", "All"]

    # 2) Build "Comparison" label
    df["Comparison"] = df.apply(lambda r: f"{r['Level_1']} vs {r['Level_2']}", axis=1)

    # 3) Sort each variable by descending Agreement_Count
    var_order = ["Age", "Sex", "Behaviour", "Distress"]
    df["Variable"] = pd.Categorical(df["Variable"], categories=var_order, ordered=True)
    df.sort_values(
        by=["Variable", "Agreement_Count", "Level_1", "Level_2"],
        ascending=[True, False, True, True],
        inplace=True,
    )

    # Gather final list of comparisons
    comp_list = df["Comparison"].unique().tolist()
    n_comps = len(comp_list)
    # numeric y => top => n_comps-1, bottom => 0
    comp_to_y = {comp: (n_comps - 1 - i) for i, comp in enumerate(comp_list)}

    # 4) Okabe & Ito colour-blind palette
    var_colours = {
        "Age": "#009E73",  # bluish green
        "Sex": "#E69F00",  # orange
        "Behaviour": "#0072B2",  # blue
        "Distress": "#D55E00",  # vermillion
    }

    def parse_pvalue_and_stars(sig_str):
        if not isinstance(sig_str, str) or not sig_str.strip():
            return None, ""
        match = re.search(r"([\d.]+)", sig_str)
        p_val = float(match.group(1)) if match else None
        if p_val is None:
            return None, ""

        # Assign stars based on the true p-value
        if p_val < 0.001:
            stars = "***"
        elif p_val < 0.01:
            stars = "**"
        elif p_val < 0.05:
            stars = "*"
        else:
            stars = ""

        # Return both the original float and a rounded string for display
        return p_val, stars

    # 6) Convert colour by scaling saturation
    def saturate_color(base_hex, sat_factor):
        base_hex = base_hex.lstrip("#")
        r = int(base_hex[0:2], 16) / 255.0
        g = int(base_hex[2:4], 16) / 255.0
        b = int(base_hex[4:6], 16) / 255.0
        h, l, s = colorsys.rgb_to_hls(r, g, b)
        new_s = max(0.0, min(1.0, s * sat_factor))
        r2, g2, b2 = colorsys.hls_to_rgb(h, l, new_s)
        return "#{:02x}{:02x}{:02x}".format(int(r2 * 255), int(g2 * 255), int(b2 * 255))

    # 7) Map p-value -> saturation factor
    def saturation_factor_from_pval(p_val):
        if p_val is None:
            return 0.2
        elif p_val < 0.001:
            return 5
        elif p_val < 0.01:
            return 1.5
        elif p_val < 0.05:
            return 1.2
        else:
            return 0.35

    # 8) Build traces
    traces = []
    for var in var_order:
        sub = df[df["Variable"] == var]
        if sub.empty:
            continue

        base_hex = var_colours[var]

        x_vals = []
        y_vals = []
        sizes = []
        texts = []
        textpos = []
        textcols = []
        textfamilies = []  # to handle bold vs normal
        colorvals = []

        for _, row in sub.iterrows():
            y_num = comp_to_y[row["Comparison"]]

            # min p-value across the 4 significance columns
            row_pvals = []
            for colm in [
                "VGGish_P-Value",
                "Perch_P-Value",
                "YAMNet_P-Value",
                "BirdNET_P-Value",
            ]:
                rp, _ = parse_pvalue_and_stars(
                    row[colm] if pd.notnull(row[colm]) else ""
                )
                if rp is not None:
                    row_pvals.append(rp)
            row_min_p = min(row_pvals) if row_pvals else None
            sat_fac = saturation_factor_from_pval(row_min_p)

            for mcol, mname in zip(model_cols, model_names):
                final_hex = saturate_color(base_hex, sat_fac)

                if mcol == "Agreement_Count":
                    # interpret star_num = min(3, Agreement_Count)
                    agr = row["Agreement_Count"]
                    star_num = min(3, agr)
                    bubble_sz = 22 + star_num * 15
                    label_str = str(agr)
                else:
                    sig_str = row[mcol] if pd.notnull(row[mcol]) else ""
                    p_val, star_str = parse_pvalue_and_stars(sig_str)
                    star_count = len(star_str)

                    bubble_sz = 22 + star_count * 15
                    if p_val is not None:
                        p_val_txt = f"{p_val:.3f}"
                        label_str = p_val_txt
                        if star_str:
                            label_str += f"{star_str}"
                    else:
                        label_str = "n.s."

                # If bubble_sz < 35 => text above => black normal
                if bubble_sz < 37:
                    pos = "top center"
                    txt_colour = "black"
                    txt_family = "Arial"
                else:
                    # bubble_sz >= 35 => text in bubble
                    # if sat_fac>1.7 => white bold
                    if sat_fac > 1.7:
                        txt_colour = "white"
                        txt_family = "Arial Black"
                    else:
                        txt_colour = "black"
                        txt_family = "Arial"
                    pos = "middle center"

                x_vals.append(mname)
                y_vals.append(y_num)
                sizes.append(bubble_sz)
                texts.append(label_str)
                textpos.append(pos)
                textcols.append(txt_colour)
                textfamilies.append(txt_family)
                colorvals.append(final_hex)

        # Build single trace for this variable
        trace = go.Scatter(
            name=var,
            x=x_vals,
            y=y_vals,
            mode="markers+text",
            text=texts,
            textposition=textpos,
            textfont=dict(
                size=14,
                color=textcols,
                family="Arial",  # single font family across all points in this trace
            ),
            hoverinfo="skip",
            marker=dict(
                size=sizes,
                color=colorvals,
                opacity=0.85,
                line=dict(width=1, color="white"),
            ),
        )
        traces.append(trace)

    fig = go.Figure(data=traces)

    # X-axis => [0..4]
    fig.update_xaxes(
        title="Model",
        categoryorder="array",
        categoryarray=model_names,
        tickfont=dict(size=16),  # Increase font size
        range=[-0.5, 4.5],
    )

    # Y-axis => numeric
    reversed_labels = comp_list[::-1]
    fig.update_yaxes(
        title="Comparison",
        range=[-0.5, n_comps - 0.5],
        tickvals=list(range(n_comps)),
        ticktext=reversed_labels,
    )

    # Dashed line at x=3.5
    fig.add_shape(
        type="line",
        xref="x",
        yref="paper",
        x0=3.5,
        x1=3.5,
        y0=0,
        y1=1,
        line=dict(color="gray", width=2, dash="dash"),
    )

    # Figure size
    fig_width = 900
    fig_height = 280 + 70 * n_comps
    fig.update_layout(
        title="Figure 4: Tukey Post-Hoc Pairwise Significance Tests Compared Across CNN Models",
        width=fig_width,
        height=fig_height,
        showlegend=True,
        margin=dict(l=80, r=80, t=90, b=80),
    )

    return fig


fig = plot_bubbles_bigger_labels_spaced(final_df)
fig.show()

In [None]:
import plotly.io as pio


def export_final_chart_to_png(pivot_df, out_file="final_chart.png"):

    fig = plot_bubbles_bigger_labels_spaced(pivot_df)
    pio.write_image(fig, out_file, width=1000, height=1750, scale=1)  # scale
    print(f"Exported chart to '{out_file}' at ~300 DPI.")


export_final_chart_to_png(final_df, out_file=OUTPUTS_DIR / "tukey_bubble_revised.png")