<a href="https://colab.research.google.com/github/bernardes7/Paredes/blob/main/notebooks/Between_Voices_Corpus_Analysis_from_JSONs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1) Environment Setup & Imports

In [None]:
!pip install dtaidistance
%pip install EMD-signal
!pip -q install gdown


import glob
from pathlib import Path
from PyEMD import EMD
from dtaidistance import dtw
from google.colab import drive
import os, json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
from scipy.spatial.distance import pdist, squareform
from scipy.ndimage import gaussian_filter1d
import seaborn as sns
import itertools


#drive.mount('/content/drive')

# 2) Load Data (JSON)

In [None]:

# Config: set your shared folder URL and local destination
DRIVE_FOLDER_URL = 'https://drive.google.com/drive/folders/1aKt0FP2U8ONcRo0cXrrYDdiqnuM9YkrZ?usp=sharing'
DEST_DIR = Path('/content/Results')
DEST_DIR.mkdir(parents=True, exist_ok=True)

# Download the shared folder into /content/Results
!gdown --fuzzy --folder "{DRIVE_FOLDER_URL}" -O "{DEST_DIR}"

# Find the actual downloaded folder (Drive folder name)
downloaded_root = DEST_DIR

print(f"Downloaded into: {downloaded_root}")

# Helper: load all JSON files recursively
def load_json_files(root_dir: Path):
    all_data = []
    json_paths = list(root_dir.rglob('*.json'))
    if not json_paths:
        print(f"No JSON files found under {root_dir}")
    for jp in json_paths:
        try:
            with open(jp, 'r', encoding='utf-8') as f:
                all_data.append(json.load(f))
        except json.JSONDecodeError as e:
            print(f"Error decoding {jp}: {e}")
        except Exception as e:
            print(f"Error reading {jp}: {e}")
    return all_data

data = load_json_files(downloaded_root)

print(f"Loaded {len(data)} JSON objects.")

# 3) Apply EMD

In [None]:
import numpy as np
from PyEMD import EMD
import warnings

# Feature configuration
features_config_emd = [
    ('rhythmic', ['tempo_deviations'], 'Tempo Deviation'),
    ('rhythmic', ['voice_rhythmic_density'], 'Voice Rhythmic Density'),
    ('rhythmic', ['guitars_rhythmic_density'], 'Guitars Rhythmic Density'),
    ('dynamic', ['voice_loudness'], 'Voice Loudness'),
    ('dynamic', ['guitars_loudness'], 'Guitars Loudness'),
    ('harmonic', ['tonal_dissonance'], 'Tonal Dissonance'),
    ('harmonic', ['tonal_dispersion'], 'Tonal Dispersion'),
    ('melodic', ['voice_melodic_contour'], 'Melodic Voice Contour'),
    ('melodic', ['guitars_harmonic_contour'], 'Harmonic Guitars Contour')
]

data_processed = []

for row_idx, row in enumerate(data):
    processed_row = {
        'metadata': row.get('metadata', {}),
        'features': {}
    }

    for domain, keys, label in features_config_emd:
        if domain not in row:
            warnings.warn(f"Row {row_idx}: Domain '{domain}' not found.")
            continue

        for key in keys:
            if key not in row[domain]:
                warnings.warn(f"Row {row_idx}: Feature '{key}' not found in domain '{domain}'.")
                continue

            values = row[domain].get(key, [])
            if values and isinstance(values, list) and all(isinstance(v, (int, float)) for v in values):
                signal = np.array(values)
                if np.all(signal == signal[0]):  # Skip constant signals
                    continue
                try:
                    emd = EMD()
                    imfs = emd(signal)
                    if imfs.shape[0] > 1:
                        processed_signal = np.sum(imfs[2:], axis=0).tolist()
                    else:
                        processed_signal = signal.tolist()
                    processed_row['features'][label] = processed_signal
                except Exception as e:
                    warnings.warn(f"Row {row_idx}: EMD failed for '{label}' ({e}). Using original signal.")
                    processed_row['features'][label] = signal.tolist()
            else:
                warnings.warn(f"Row {row_idx}: Invalid or empty values for feature '{key}' in domain '{domain}'.")

    data_processed.append(processed_row)

# 4) Apply Loudness Masks

In [None]:
# Define Loudness Masks for Voice and Guitars
# Stored in the original raw 'data' as v_mark and g_mask

loudness_threshold = -40

for row in data:
    # Get original loudness arrays
    loudness_voice_orig = row.get('dynamic', {}).get('voice_loudness', [])
    loudness_guitars_orig = row.get('dynamic', {}).get('guitars_loudness', [])

    # Compute masks from original signals
    voice_mask = [val >= loudness_threshold if val is not None else False for val in loudness_voice_orig]
    guitar_mask = [val >= loudness_threshold if val is not None else False for val in loudness_guitars_orig]

    # Store masks for later use (e.g., in row or separate dict)
    row['dynamic']['v_mask'] = voice_mask
    row['dynamic']['g_mask'] = guitar_mask

print("Loudness masks computed and stored from original signals.")


for original_row, processed_row in zip(data, data_processed):
    # Get masks from original data
    voice_mask = original_row.get('dynamic', {}).get('v_mask', [])
    guitar_mask = original_row.get('dynamic', {}).get('g_mask', [])

    title = processed_row.get('metadata', {}).get('title', 'Unknown')
    artist = processed_row.get('metadata', {}).get('artist', 'Unknown')

    print(f"\n=== Applying Masks for Piece: {title} by {artist} ===")

    # Skip if both masks are empty
    if not voice_mask and not guitar_mask:
        print("⚠️ No masks available for this piece.")
        continue

    for feature_name, values in processed_row['features'].items():
        if isinstance(values, list) and values:  # Only process non-empty lists
            masked_values = None

            # Apply voice mask
            if ('Voice' in feature_name or 'Vocals' in feature_name) and voice_mask and len(values) == len(voice_mask):
                masked_values = [val if mask else np.nan for val, mask in zip(values, voice_mask)]

            # Apply guitar mask
            elif ('Guitars' in feature_name or 'Harmonic' in feature_name) and guitar_mask and len(values) == len(guitar_mask):
                masked_values = [val if mask else np.nan for val, mask in zip(values, guitar_mask)]

            # Update and print if masking applied
            if masked_values is not None:
                processed_row['features'][feature_name] = masked_values
                print(f"Feature: {feature_name} → masked values: {masked_values[:20]}{'...' if len(masked_values) > 20 else ''}")

In [None]:
def get_processed_song(data_processed, title):
    title = title.lower()
    for row in data_processed:
        if title in row.get('metadata', {}).get('title', '').lower():
            return row['features']
    return None

# Usage:
features = get_processed_song(data_processed, "Não Choro por me Deixares")
if features:
    print(f"\n✅ Processed features for 'Não Choro por me Deixares':")
    for name, vals in features.items():
        if isinstance(vals, list):
            print(f"{name}: length={len(vals)}, values={vals[:10]}{'...' if len(vals) > 10 else ''}")
else:
    print("Song not found or no processed features.")


✅ Processed features for 'Não Choro por me Deixares':
Tempo Deviation: length=114, values=[4.822848464881643, 4.774652474714926, 4.419294465274571, 3.8390889895376508, 3.1445364309609474, 2.4461371730012527, 1.8444827084240596, 1.4005289672297034, 1.165322988727219, 1.1899118122256485]...
Voice Rhythmic Density: length=114, values=[nan, nan, nan, nan, nan, nan, nan, nan, 1.176625632824447, 1.1853332210780558]...
Guitars Rhythmic Density: length=114, values=[4.453942197571792, 4.427195520200733, 4.406462377106016, 4.396288202831304, 4.401218431920264, 4.422655992044713, 4.449433783389079, 4.467242199265943, 4.461771632987887, 4.418712477867488]...
Voice Loudness: length=114, values=[nan, nan, nan, nan, nan, nan, nan, nan, -45.48807951990585, -41.574990739502084]...
Guitars Loudness: length=114, values=[-15.831148479395624, -15.818744212238206, -15.617814506194819, -15.409014456122318, -15.400950332688275, -15.692660674422536, -16.315562085382346, -17.140151163872225, -17.99669450675854

# 5) Descriptive Statistics

In [None]:
import numpy as np
import pandas as pd

song_stats_raw = []

for row in data:
    title = row.get('metadata', {}).get('title', 'Unknown')
    artist = row.get('metadata', {}).get('artist', 'Unknown Artist')
    year = row.get('metadata', {}).get('project_year', None)

    # Convert year to numeric or NaN
    try:
        year = int(year)
    except (ValueError, TypeError):
        year = np.nan

    # Loop through features_config_emd (3 elements per tuple)
    for feature_category, feature_key_path, feature_name in features_config_emd:
        # Navigate nested dict without helper
        nested_data = row.get(feature_category, {})
        for key in feature_key_path:
            nested_data = nested_data.get(key, {})

        # Ensure it's a list of numeric values
        feature_values_raw = nested_data if isinstance(nested_data, list) else []
        numeric_values = [v for v in feature_values_raw if isinstance(v, (int, float)) and np.isfinite(v)]

        if numeric_values:
            song_stats_raw.append({
                'title': title,
                'artist': artist,
                'year': year,
                'feature': feature_name,
                'mean': np.mean(numeric_values),
                'std': np.std(numeric_values)
            })

# Build DataFrame if data exists
if song_stats_raw:
    df_song_stats_raw = pd.DataFrame(song_stats_raw)

    # Combine mean and std into one column
    df_song_stats_raw['mean_std'] = df_song_stats_raw.apply(
        lambda row: f"{row['mean']:.2f} ({row['std']:.2f})", axis=1
    )

    # Pivot table
    df_final_raw = df_song_stats_raw.pivot_table(
        index=['title', 'artist', 'year'],
        columns='feature',
        values='mean_std',
        aggfunc='first'
    )

    # Sort by year (NaN goes last)
    df_final_raw = df_final_raw.reset_index().sort_values(by='year', na_position='last').set_index(['title', 'artist', 'year'])

    display(df_final_raw)
else:
    print("No raw feature data available to calculate song statistics.")

# 6) Pearson Correlation

In [None]:
# Sort and group processed data by artist
data_sorted_by_artist = sorted(data_processed, key=lambda x: x.get('metadata', {}).get('artist', 'Unknown Artist'))
artist_groups = itertools.groupby(data_sorted_by_artist, key=lambda x: x.get('metadata', {}).get('artist', 'Unknown Artist'))

global_matrices = []
artist_correlation_matrices = [] # New list to store artist-level matrices

for artist, pieces in artist_groups:
    artist_pieces = list(pieces)
    piece_correlation_matrices_artist = {}

    for row in artist_pieces:
        title = row.get('metadata', {}).get('title', 'Unknown')
        features = row.get('features', {})

        # Filter out masks and keep only numeric features
        piece_data = {k: v for k, v in features.items()
                      if isinstance(v, list) and k not in ['Voice Mask', 'Guitar Mask'] and len(v) > 0}

        # Ensure all features have equal length
        if len(piece_data) > 1:
            lengths = {len(vals) for vals in piece_data.values()}
            if len(lengths) == 1:  # All equal length
                df_piece = pd.DataFrame(piece_data)

                # Compute correlation
                correlation_matrix = df_piece.corr(method='pearson')
                piece_correlation_matrices_artist[title] = correlation_matrix
                global_matrices.append(correlation_matrix) # These are the per-piece matrices

                # Plot per-piece heatmap
                plt.figure(figsize=(8, 6))
                sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt=".2f", vmin=-1, vmax=1)
                plt.title(f"Correlation Matrix - {title} by {artist}")
                plt.tight_layout()
                plt.show()
            else:
                print(f"⚠️ Skipping {title}: features have different lengths.")

    # Artist-level average absolute mean correlation matrix
    if piece_correlation_matrices_artist:
        stacked = np.stack([m.values for m in piece_correlation_matrices_artist.values()])
        avg_abs_matrix = np.mean(np.abs(stacked), axis=0)
        artist_matrix = pd.DataFrame(
            avg_abs_matrix,
            index=piece_correlation_matrices_artist[next(iter(piece_correlation_matrices_artist))].index,
            columns=piece_correlation_matrices_artist[next(iter(piece_correlation_matrices_artist))].columns
        )
        artist_correlation_matrices.append((artist, artist_matrix)) # Store artist matrix with label

        plt.figure(figsize=(10, 8))
        sns.heatmap(artist_matrix, annot=True, cmap='coolwarm', fmt=".2f", vmin=0, vmax=1)
        plt.title(f"Average Absolute Correlation Matrix - {artist}")
        plt.tight_layout()
        plt.show()

# Global average absolute mean correlation matrix
if global_matrices:
    stacked_global = np.stack([m.values for m in global_matrices])
    avg_abs_global = np.mean(np.abs(stacked_global), axis=0)
    global_matrix = pd.DataFrame(
        avg_abs_global,
        index=global_matrices[0].index,
        columns=global_matrices[0].columns
    )

    plt.figure(figsize=(10, 8))
    sns.heatmap(global_matrix, annot=True, cmap='coolwarm', fmt=".2f", vmin=0, vmax=1)
    plt.title("Global Average Absolute Correlation Matrix (All Pieces)")
    plt.tight_layout()
    plt.show()


# 7) Mutual Information

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
from sklearn.feature_selection import mutual_info_regression

# Sort and group processed data by artist
data_sorted_by_artist = sorted(data_processed, key=lambda x: x.get('metadata', {}).get('artist', 'Unknown Artist'))
artist_groups = itertools.groupby(data_sorted_by_artist, key=lambda x: x.get('metadata', {}).get('artist', 'Unknown Artist'))

global_mi_matrices = []

for artist, pieces in artist_groups:
    artist_pieces = list(pieces)
    piece_mi_matrices_artist = {}

    for row in artist_pieces:
        title = row.get('metadata', {}).get('title', 'Unknown')
        features = row.get('features', {})

        # Filter out masks and keep only numeric features
        piece_data = {k: v for k, v in features.items()
                      if isinstance(v, list) and k not in ['Voice Mask', 'Guitar Mask'] and len(v) > 0}

        # Ensure all features have equal length
        if len(piece_data) > 1:
            lengths = {len(vals) for vals in piece_data.values()}
            if len(lengths) == 1:  # All equal length
                df_piece = pd.DataFrame(piece_data)

                # Initialize MI matrix
                mi_matrix = pd.DataFrame(np.zeros((df_piece.shape[1], df_piece.shape[1])),
                                         index=df_piece.columns,
                                         columns=df_piece.columns)

                # Compute Mutual Information for each pair
                for i, col_i in enumerate(df_piece.columns):
                    for j, col_j in enumerate(df_piece.columns):
                        if i != j:
                            valid_rows = df_piece[[col_i, col_j]].dropna()
                            if valid_rows.shape[0] > 1:
                                mi = mutual_info_regression(valid_rows[[col_i]], valid_rows[col_j], discrete_features=False)[0]
                                mi_matrix.iloc[i, j] = mi

                piece_mi_matrices_artist[title] = mi_matrix
                global_mi_matrices.append(mi_matrix)

                # Plot per-piece MI heatmap
                plt.figure(figsize=(8, 6))
                sns.heatmap(mi_matrix, annot=True, cmap='viridis', fmt=".2f")
                plt.title(f"Mutual Information Matrix - {title} by {artist}")
                plt.tight_layout()
                plt.show()
            else:
                print(f"⚠️ Skipping {title}: features have different lengths.")

    # Artist-level median MI matrix
    if piece_mi_matrices_artist:
        stacked = np.stack([m.values for m in piece_mi_matrices_artist.values()])
        median_matrix = np.median(stacked, axis=0)
        artist_mi_matrix = pd.DataFrame(median_matrix,
                                        index=piece_mi_matrices_artist[next(iter(piece_mi_matrices_artist))].index,
                                        columns=piece_mi_matrices_artist[next(iter(piece_mi_matrices_artist))].columns)

        plt.figure(figsize=(10, 8))
        sns.heatmap(artist_mi_matrix, annot=True, cmap='viridis', fmt=".2f")
        plt.title(f"Median Mutual Information Matrix - {artist}")
        plt.tight_layout()
        plt.show()

# Global median MI matrix
if global_mi_matrices:
    stacked_global = np.stack([m.values for m in global_mi_matrices])
    median_global = np.median(stacked_global, axis=0)
    global_mi_matrix = pd.DataFrame(median_global,
                                    index=global_mi_matrices[0].index,
                                    columns=global_mi_matrices[0].columns)

    plt.figure(figsize=(10, 8))
    sns.heatmap(global_mi_matrix, annot=True, cmap='viridis', fmt=".2f")
    plt.title("Global Median Mutual Information Matrix (All Pieces)")
    plt.tight_layout()
    plt.show()

# 8) Phrase Clustering

In [None]:
def get_nested_value(d, keys):
    """Safely retrieve nested values from a dictionary using a list of keys."""
    for key in keys:
        if isinstance(d, dict):
            d = d.get(key, {})
        else:
            return []
    return d if isinstance(d, list) else []

def analyze_feature(data, feature_category, feature_key_path, time_key, time_category):
    phrase_data = []
    for row in data:
        title = row.get('metadata', {}).get('title', 'Unknown')
        feature_values = get_nested_value(row.get(feature_category, {}), feature_key_path)
        time_values = row.get(time_category, {}).get(time_key, [])
        structural = row.get('structural', {})
        phrase_labels = structural.get('phrase_labels', [])
        phrase_times = structural.get('phrase_times', [])
        phrase_durations = structural.get('phrase_durations', [])

        if not (len(phrase_labels) == len(phrase_times) == len(phrase_durations)):
            continue
        if not (isinstance(time_values, list) and isinstance(feature_values, list)):
            continue

        for label, start, duration in zip(phrase_labels, phrase_times, phrase_durations):
            end = start + duration
            # Extract all values within the phrase's time window, including NaNs
            segment_values = []
            for t, v in zip(time_values, feature_values):
                if start - 1e-9 <= t <= end + 1e-9:
                    segment_values.append(v)

            # Convert to numpy array for consistent handling later (e.g., NaN checks)
            values_np = np.array(segment_values, dtype=float)
            phrase_data.append({'title': title, 'phrase_label': label, 'values': values_np})

    # Filter out entries where 'values' was empty after extraction (e.g., no time points matched)
    df_phrases = pd.DataFrame([p for p in phrase_data if len(p['values']) > 0])
    if df_phrases.empty:
        print(f"No valid phrase data found for {'/'.join(feature_key_path)}")
        return pd.DataFrame()

    # Determine max_len based on initial extracted sequence lengths
    max_len = df_phrases['values'].apply(len).max()
    if max_len == 0:
        print(f"Max length is 0 for {'/'.join(feature_key_path)}, returning empty DataFrame.")
        return pd.DataFrame()

    def interpolate_and_smooth_and_normalize(sequences_of_np_arrays, target_length, sigma=1):
        interpolated_smoothed_normalized = []
        for seq_np in sequences_of_np_arrays: # seq_np is already a numpy array, potentially with NaNs
            valid_count = np.count_nonzero(~np.isnan(seq_np))
            total_count = len(seq_np)

            # Handle empty sequences or sequences that are mostly NaNs (less than 80% valid)
            if total_count == 0 or valid_count == 0 or (valid_count / total_count < 0.8 and total_count > 0):
                interpolated_smoothed_normalized.append(np.zeros(target_length).tolist())
                continue

            # Step 1: Interpolate NaNs using pandas Series interpolate (handles leading/trailing NaNs)
            interpolated_seq_pre_resample = seq_np.copy() # Make a copy to avoid modifying original array
            if valid_count >= 2: # Need at least 2 non-NaN points for linear interpolation to work effectively
                interpolated_seq_pre_resample = pd.Series(interpolated_seq_pre_resample).interpolate(method='linear', limit_direction='both').values
                # If there are still NaNs (e.g., only one non-NaN point and limit_direction couldn't fill),
                # fill remaining with the mean of non-NaN values or 0 if no valid points (should be caught by valid_count check)
                if np.isnan(interpolated_seq_pre_resample).any():
                    if valid_count > 0: # This case covers when valid_count >= 2 but interpolate couldn't fill all (e.g., all same value)
                        fill_value = np.nanmean(interpolated_seq_pre_resample)
                        interpolated_seq_pre_resample = np.nan_to_num(interpolated_seq_pre_resample, nan=fill_value)
                    else:
                        interpolated_seq_pre_resample = np.zeros_like(interpolated_seq_pre_resample)
            elif valid_count == 1: # If only one valid point, fill the entire sequence with that point
                interpolated_seq_pre_resample = np.full_like(seq_np, seq_np[~np.isnan(seq_np)][0])
            # If valid_count is 0, it's handled by the earlier 'if' condition.


            # Step 2: Resample, Smooth, and Normalize
            if len(interpolated_seq_pre_resample) >= 2:
                x_old = np.arange(len(interpolated_seq_pre_resample))
                f_interp = interp1d(x_old, interpolated_seq_pre_resample, kind='linear', fill_value="extrapolate")
                x_new = np.linspace(0, len(interpolated_seq_pre_resample) - 1, target_length)
                resampled_seq = f_interp(x_new)
                smoothed = gaussian_filter1d(resampled_seq, sigma=sigma)

                mean = np.mean(smoothed)
                std = np.std(smoothed)
                if std != 0:
                    normalized = (smoothed - mean) / std
                else:
                    normalized = smoothed - mean # Center but don't scale if std is 0
                interpolated_smoothed_normalized.append(normalized.tolist())
            elif len(interpolated_seq_pre_resample) == 1:
                # If after interpolation, there's effectively one point, normalize it to 0 and fill target_length
                interpolated_smoothed_normalized.append(np.zeros(target_length).tolist()) # Center to 0 for a flat line
            else: # Empty sequence after all steps (should be caught by previous conditions)
                interpolated_smoothed_normalized.append(np.zeros(target_length).tolist())

        return interpolated_smoothed_normalized


    df_phrases['values'] = interpolate_and_smooth_and_normalize(df_phrases['values'].tolist(), max_len, sigma=1)
    return df_phrases


def plot_feature_clusters_cosine(df_feature, feature_name):
    if df_feature.empty or 'values' not in df_feature.columns:
        print(f"Skipping {feature_name}: missing data.")
        return

    print(f"Plotting dendrogram and clusters for {feature_name} (Cosine)...")
    sequences = df_feature['values'].tolist()
    array_sequences = np.array(sequences)

    if array_sequences.ndim != 2:
        print(f"Error: Expected 2D array for clustering, got shape {array_sequences.shape}")
        return

    valid_indices = [i for i, seq in enumerate(array_sequences) if np.linalg.norm(seq) > 0]
    if len(valid_indices) < 2:
        print(f"Skipping {feature_name}: not enough valid sequences for cosine distance.")
        return

    array_sequences = array_sequences[valid_indices]
    df_feature = df_feature.iloc[valid_indices].reset_index(drop=True)

    distance_matrix = pdist(array_sequences, metric='cosine')
    linked = linkage(distance_matrix, method='average')

    heights = linked[:, 2]
    height_diffs = np.diff(heights)
    threshold = np.percentile(height_diffs, 98)
    split_indices = np.where(height_diffs > threshold)[0]
    estimated_clusters = len(array_sequences) - split_indices[0] if len(split_indices) > 0 else 5

    clusters = fcluster(linked, t=estimated_clusters, criterion='maxclust')
    df_feature['cluster'] = clusters

    # Ajuste do color_threshold para corresponder ao número de clusters
    if estimated_clusters < len(linked) + 1:
        color_threshold = linked[-estimated_clusters + 1, 2]
    else:
        color_threshold = linked[-1, 2] + 1

    combined_labels = [f"{row['title']}_{row['phrase_label']}" for index, row in df_feature.iterrows()]

    plt.figure(figsize=(12, 6))
    plt.title(f"Dendrogram - {feature_name} (Clusters: {estimated_clusters})")
    dendrogram(linked,
               labels=combined_labels,
               leaf_rotation=90,
               leaf_font_size=7,
               color_threshold=color_threshold)
    plt.xlabel("Phrase Label")
    plt.ylabel("Cosine Distance")
    plt.tight_layout()
    plt.show()

    # Plot clusters com 10 ou mais sequências
    for cluster_id in sorted(df_feature['cluster'].unique()):
        cluster_df = df_feature[df_feature['cluster'] == cluster_id]
        if len(cluster_df) < 10:
            continue
        cluster_sequences = cluster_df['values'].tolist()
        cluster_cosine_distances = squareform(pdist(np.array(cluster_sequences), metric='cosine'))
        total_distances = np.sum(cluster_cosine_distances, axis=1)
        medoid_index = np.argmin(total_distances)
        medoid_sequence = np.array(cluster_sequences[medoid_index])

        plt.figure(figsize=(10, 5))
        for seq in cluster_sequences:
            plt.plot(seq, color='lightgray', linewidth=0.5)
        plt.plot(medoid_sequence, color='red', linewidth=2, label='Medoid')
        plt.title(f"{feature_name} - Cluster {cluster_id} (n={len(cluster_df)})")
        plt.xlabel("Interpolated Index")
        plt.ylabel("Feature Value (Normalized)")
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

        print(f"\nSequences in {feature_name} - Cluster {cluster_id}:")
        for index, row in cluster_df.iterrows():
            print(f"- {row['title']}_{row['phrase_label']}")


def plot_feature_clusters_dtw(df_feature, feature_name):
    if df_feature.empty or 'values' not in df_feature.columns:
        print(f"Skipping {feature_name}: missing data.")
        return

    print(f"Plotting dendrogram and clusters for {feature_name} (DTW)...")
    sequences = df_feature['values'].tolist()
    array_sequences = np.array(sequences)

    if array_sequences.ndim != 2:
        print(f"Error: Expected 2D array for clustering, got shape {array_sequences.shape}")
        return
    dtw_distances = np.zeros((len(sequences), len(sequences)))
    for i in range(len(sequences)):
        for j in range(i + 1, len(sequences)):
            dist = dtw.distance(sequences[i], sequences[j])
            dtw_distances[i, j] = dist
            dtw_distances[j, i] = dist

    linked = linkage(squareform(dtw_distances), method='average')

    heights = linked[:, 2]
    height_diffs = np.diff(heights)
    threshold = np.percentile(height_diffs, 98)
    split_indices = np.where(height_diffs > threshold)[0]
    estimated_clusters = len(sequences) - split_indices[0] if len(split_indices) > 0 else 5

    clusters = fcluster(linked, t=estimated_clusters, criterion='maxclust')
    df_feature['cluster'] = clusters

    # Ajuste do color_threshold para corresponder ao número de clusters
    if estimated_clusters < len(linked) + 1:
        color_threshold = linked[-estimated_clusters + 1, 2]
    else:
        color_threshold = linked[-1, 2] + 1

    combined_labels = [f"{row['title']}_{row['phrase_label']}" for index, row in df_feature.iterrows()]

    plt.figure(figsize=(12, 6))
    plt.title(f"DTW Dendrogram - {feature_name} (Clusters: {estimated_clusters})")
    dendrogram(linked,
               labels=combined_labels,
               leaf_rotation=90,
               leaf_font_size=7,
               color_threshold=color_threshold)
    plt.xlabel("Phrase Label")
    plt.ylabel("DTW Distance")
    plt.tight_layout()
    plt.show()

    # Plot clusters com 10 ou mais sequências
    for cluster_id in sorted(df_feature['cluster'].unique()):
        cluster_df = df_feature[df_feature['cluster'] == cluster_id]
        if len(cluster_df) < 10:
            continue
        cluster_sequences = cluster_df['values'].tolist()
        cluster_dtw_distances = np.zeros((len(cluster_sequences), len(cluster_sequences)))
        for i in range(len(cluster_sequences)):
            for j in range(i + 1, len(cluster_sequences)):
                dist = dtw.distance(cluster_sequences[i], cluster_sequences[j])
                cluster_dtw_distances[i, j] = dist
                cluster_dtw_distances[j, i] = dist

        total_distances = np.sum(cluster_dtw_distances, axis=1)
        medoid_index = np.argmin(total_distances)
        medoid_sequence = np.array(cluster_sequences[medoid_index])

        plt.figure(figsize=(10, 5))
        for seq in cluster_sequences:
            plt.plot(seq, color='lightgray', linewidth=0.5)
        plt.plot(medoid_sequence, color='red', linewidth=2, label='Medoid')
        plt.title(f"{feature_name} - DTW Cluster {cluster_id} (n={len(cluster_df)})")
        plt.xlabel("Interpolated Index")
        plt.ylabel("Feature Value (Normalized)")
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

        print(f"\nSequences in {feature_name} - DTW Cluster {cluster_id}:")
        for index, row in cluster_df.iterrows():
            print(f"- {row['title']}_{row['phrase_label']}")

method_selector = 'cosine'  # ou 'dtw'

features_config = [
    ('rhythmic', ['tempo_deviations'], 'beat_times', 'rhythmic', 'Tempo Deviations'),
    ('rhythmic', ['guitars_rhythmic_density'], 'beat_times', 'rhythmic', 'Guitars Rhythmic Density'),
    ('rhythmic', ['voice_rhythmic_density'], 'beat_times', 'rhythmic', 'Voice Rhythmic Density'),
    ('melodic', ['voice_melodic_contour'], 'beat_times', 'rhythmic', 'Melodic Voice Contour'),
    ('melodic', ['guitars_harmonic_contour'], 'beat_times', 'rhythmic', 'Harmonic Guitars Contour'),
    ('dynamic', ['voice_loudness'], 'beat_times', 'rhythmic', 'Voice Loudness'),
    ('dynamic', ['guitars_loudness'], 'beat_times', 'rhythmic', 'Guitars Loudness'),
    ('harmonic', ['tonal_dissonance'], 'beat_times', 'rhythmic', 'Tonal Dissonance'),
    ('harmonic', ['tonal_dispersion'], 'beat_times', 'rhythmic', 'Tonal Dispersion'),
]

feature_names = { '/'.join(config[1]): config[4] for config in features_config }

results = {}
for feature_category, feature_key_path, time_key, time_category, feature_name in features_config:
    print(f"Analyzing {feature_name}")
    results['/'.join(feature_key_path)] = analyze_feature(data, feature_category, feature_key_path, time_key, time_category)

for feature_key, df_feature in results.items():
    friendly_name = feature_names.get(feature_key, feature_key)
    if method_selector == 'cosine':
        plot_feature_clusters_cosine(df_feature, friendly_name)
    elif method_selector == 'dtw':
        plot_feature_clusters_dtw(df_feature, friendly_name)

# EMD plots


In [None]:
import numpy as np
import pandas as pd
import ipywidgets as widgets
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
from PyEMD import EMD

# --- BEGIN MODIFICATION: Moved from cell 79_Yrn2ma15u ---
# Ensure `data` and `data_processed` are available globally
# Assuming `data` holds the original raw song objects and `data_processed` holds the EMD processed ones
# If these variables are not present, this will raise an error.
if 'data' not in globals() or not isinstance(data, list) or not data:
    raise NameError("Global variable 'data' (raw song data) not found or is empty.")
if 'data_processed' not in globals() or not isinstance(data_processed, list) or not data_processed:
    raise NameError("Global variable 'data_processed' (EMD data) not found or is empty.")

ALL_RAW_DATA = data
ALL_EMD_DATA = data_processed

# This list defines all features, their raw data paths, and EMD processed labels
PARAM_INFO = [
    {"display_name": "Tempo Deviation", "raw_path": "rhythmic.tempo_deviations", "emd_label": "Tempo Deviation"},
    {"display_name": "Voice Rhythmic Density", "raw_path": "rhythmic.voice_rhythmic_density", "emd_label": "Voice Rhythmic Density"},
    {"display_name": "Guitars Rhythmic Density", "raw_path": "rhythmic.guitars_rhythmic_density", "emd_label": "Guitars Rhythmic Density"},
    {"display_name": "Voice Loudness", "raw_path": "dynamic.voice_loudness", "emd_label": "Voice Loudness"},
    {"display_name": "Guitars Loudness", "raw_path": "dynamic.guitars_loudness", "emd_label": "Guitars Loudness"},
    {"display_name": "Tonal Dissonance", "raw_path": "harmonic.tonal_dissonance", "emd_label": "Tonal Dissonance"},
    {"display_name": "Tonal Dispersion", "raw_path": "harmonic.tonal_dispersion", "emd_label": "Tonal Dispersion"},
    {"display_name": "Melodic Voice Contour", "raw_path": "melodic.voice_melodic_contour", "emd_label": "Melodic Voice Contour"},
    {"display_name": "Harmonic Guitars Contour", "raw_path": "melodic.guitars_harmonic_contour", "emd_label": "Harmonic Guitars Contour"},
]
# --- END MODIFICATION ---

# Ensure ALL_RAW_DATA and PARAM_INFO are available from previous cells
# This check is now redundant as they are defined above
# if 'ALL_RAW_DATA' not in globals() or not isinstance(ALL_RAW_DATA, list) or not ALL_RAW_DATA:
#     raise NameError("Global variable 'ALL_RAW_DATA' not found or is empty. Please run previous cells.")

# if 'PARAM_INFO' not in globals() or not isinstance(PARAM_INFO, list) or not PARAM_INFO:
#     raise NameError("Global variable 'PARAM_INFO' not found or is empty. Please run previous cells.")

# Re-using helper functions from the previous plotting widget (assuming they are in global scope)
def to_float_array(v):
    if v is None:
        return None
    try:
        if hasattr(v, "to_numpy"):
            arr = v.to_numpy(dtype=float)
        else:
            arr = np.asarray(v, dtype=float)
    except Exception:
        return None
    if arr.ndim == 0:
        arr = arr.reshape(1)
    elif arr.ndim > 1:
        arr = arr.squeeze()
        if arr.ndim > 1:
            arr = arr.ravel()
    return arr

def get_path(dct, path):
    if dct is None or not isinstance(path, str) or not path:
        return None
    cur = dct
    for part in path.split('.'):
        if isinstance(cur, dict) and part in cur:
            cur = cur.get(part)
        else:
            return None
    return cur

def _get_param_info_by_display_name(display_name):
    for p in PARAM_INFO:
        if p['display_name'] == display_name:
            return p
    return None

def _get_feature_values_raw(song_obj, display_name):
    # This function specifically gets raw data, ignoring EMD data source option
    param_info = _get_param_info_by_display_name(display_name)
    if not param_info:
        return None
    return to_float_array(get_path(song_obj, param_info['raw_path']))

def _song_label(song_obj):
    md = song_obj.get("metadata", {})
    title = md.get("title", "Untitled")
    artist = md.get("artist", "Unknown")
    year = md.get("project_year", "")
    return f"{title} — {artist}" + (f" ({year})" if str(year).strip() else "")

def _is_1d_numeric_series(x):
    if x is None:
        return False
    try:
        arr = np.asarray(x)
    except Exception:
        return False
    return arr.ndim == 1 and np.issubdtype(arr.dtype, np.number)

def _make_structural_dfs(song_obj):
    """Creates DataFrames for sections and phrases from a raw song object."""
    S = song_obj.get("structural", {})
    df_sections = None
    df_phrases = None
    st, sl = S.get("section_times"), S.get("section_labels")
    if _is_1d_numeric_series(st) and isinstance(sl, (list, tuple)) and len(st) == len(sl):
        df_sections = pd.DataFrame({"Initial_Time": st, "Label": sl})
    pt, pl = S.get("phrase_times"), S.get("phrase_labels")
    if _is_1d_numeric_series(pt) and isinstance(pl, (list, tuple)) and len(pt) == len(pl):
        df_phrases = pd.DataFrame({"Initial_Time": pt, "Label": pl})
    return df_sections, df_phrases

def format_axes_emd(ax, x_axis, df_sections, df_phrases, title=None, mode="time", beat_times=None):
    """Bottom axis: time or beats; top axis: the other. Adds sections & phrases."""
    def sec_to_minsec(t):
        m = int(t // 60)
        s = int(round(t % 60))
        return f"{m}:{s:02d}"

    x_axis = np.asarray(x_axis)
    if x_axis.size == 0:
        return

    n_ticks = 10
    x_lo = x_axis[0] if x_axis.size > 0 else 0
    x_hi = x_axis[-1] if x_axis.size > 0 else 1
    tick_positions = np.linspace(x_lo, x_hi, n_ticks)

    bt = np.asarray(beat_times) if beat_times is not None else x_axis

    if mode == "beats":
        ax.set_xticks(tick_positions)
        beat_indices = np.searchsorted(bt, tick_positions)
        ax.set_xticklabels([f"{b+1}" for b in beat_indices], fontsize=9)
        ax.set_xlabel("Beat")

        ax_top = ax.secondary_xaxis('top')
        ax_top.set_xticks(tick_positions)
        ax_top.set_xticklabels([sec_to_minsec(t) for t in tick_positions], fontsize=9)
        ax_top.set_xlabel("Time (mm:ss)")
    else:
        ax.set_xticks(tick_positions)
        ax.set_xticklabels([sec_to_minsec(t) for t in tick_positions], fontsize=9)
        ax.set_xlabel("Time (mm:ss)")

        ax_top = ax.secondary_xaxis('top')
        ax_top.set_xticks(tick_positions)
        beat_indices = np.searchsorted(bt, tick_positions)
        ax_top.set_xticklabels([f"{b+1}" for b in beat_indices], fontsize=9)
        ax_top.set_xlabel("Beat")

    # Sections
    if df_sections is not None and all(c in df_sections.columns for c in ["Initial_Time", "Label"]):
        for time, label in zip(df_sections["Initial_Time"], df_sections["Label"]):
            ax.axvline(x=time, color="black", lw=1.2, alpha=0.9)
            ax.text(time, -0.18, label, rotation=90, ha="center", va="top",
                    fontsize=8, color="black", transform=ax.get_xaxis_transform(), clip_on=False)

    # Phrases
    if df_phrases is not None and all(c in df_phrases.columns for c in ["Initial_Time", "Label"]):
        for t, lab in zip(df_phrases["Initial_Time"], df_phrases["Label"]):
            ax.axvline(x=t, color="lightgrey", lw=1.0, alpha=0.7)
            ax.text(t, 1.15, lab, rotation=90, ha="center", va="bottom",
                    fontsize=8, color="grey", transform=ax.get_xaxis_transform(), clip_on=False)

    if title:
        ax.set_title(title, pad=70)

    ax.margins(x=0.05)
    ax.grid(False)


# Build song list for dropdown
song_options_emd = []
title_to_index_emd = {}
for idx, song in enumerate(ALL_RAW_DATA): # Use ALL_RAW_DATA for consistent song indexing and metadata
    lbl = _song_label(song)
    while lbl in title_to_index_emd:
        lbl = f"{lbl}  #{idx}"
    title_to_index_emd[lbl] = idx
    song_options_emd.append(lbl)

feature_options_emd = sorted([p['display_name'] for p in PARAM_INFO])

# Widgets
song_dd_emd = widgets.Dropdown(options=song_options_emd, description='Song:', layout=widgets.Layout(width='60%'))
feature_dd_emd = widgets.Dropdown(options=feature_options_emd, description='Feature:', layout=widgets.Layout(width='50%'))

out_emd = widgets.Output()

def plot_emd_components(song_obj, display_name):
    with out_emd:
        clear_output(wait=True)

        raw_signal = _get_feature_values_raw(song_obj, display_name)

        if raw_signal is None or len(raw_signal) == 0 or np.all(np.isnan(raw_signal)):
            print(f"No raw data found for '{display_name}' in selected song or signal is constant/all NaN.")
            return

        # Handle NaNs before EMD, replace with interpolation or 0 for EMD processing
        signal_for_emd = np.copy(raw_signal)
        finite_mask = np.isfinite(signal_for_emd)
        if np.any(finite_mask) and not np.all(finite_mask):
            # Interpolate NaNs if there are enough non-NaN values
            series = pd.Series(signal_for_emd)
            signal_for_emd = series.interpolate(method='linear', limit_direction='both').values
            # If still NaNs (e.g., all NaNs initially or only one data point), fill with 0 or mean
            if np.any(np.isnan(signal_for_emd)):
                 signal_for_emd = np.nan_to_num(signal_for_emd, nan=np.nanmean(signal_for_emd) if np.any(finite_mask) else 0.0)
        elif not np.any(finite_mask):
             signal_for_emd = np.zeros_like(signal_for_emd)

        if np.all(signal_for_emd == signal_for_emd[0]):
             print(f"EMD cannot be applied to a constant signal for '{display_name}'.")
             return

        emd = EMD()
        try:
            imfs = emd(signal_for_emd)
        except Exception as e:
            print(f"EMD failed for '{display_name}': {e}")
            return

        # Prepare components for plotting
        imf1 = imfs[0] if imfs.shape[0] > 0 else np.zeros_like(raw_signal)
        imf2 = imfs[1] if imfs.shape[0] > 1 else np.zeros_like(raw_signal)
        imf3_plus = np.sum(imfs[2:], axis=0) if imfs.shape[0] > 2 else np.zeros_like(raw_signal)

        # X-axis (beat times from raw song object)
        beat_times = get_path(song_obj, "rhythmic.beat_times")
        if _is_1d_numeric_series(beat_times):
            x_axis = np.asarray(beat_times, dtype=float)
            mode = "time"
        else:
            x_axis = np.arange(len(raw_signal))
            mode = "index"

        # Ensure all signals have the same length as x_axis
        min_len = min(len(x_axis), len(raw_signal))
        x_axis = x_axis[:min_len]
        raw_signal = raw_signal[:min_len]
        imf1 = imf1[:min_len]
        imf2 = imf2[:min_len]
        imf3_plus = imf3_plus[:min_len]

        # Structural info from raw song object
        df_sections, df_phrases = _make_structural_dfs(song_obj)

        md = song_obj.get("metadata", {})
        plot_title = f"EMD Components for {display_name}\n{md.get('title', 'Untitled')} by {md.get('artist', 'Unknown')}"

        fig, axes = plt.subplots(4, 1, figsize=(18, 12), sharex=True, constrained_layout=True)

        # Plot Raw Signal
        axes[0].plot(x_axis, raw_signal, label='Raw Signal', color='blue')
        axes[0].set_title('Raw Signal')
        axes[0].legend()
        format_axes_emd(axes[0], x_axis, df_sections, df_phrases, title=plot_title, mode=mode, beat_times=beat_times)

        # Plot IMF1
        axes[1].plot(x_axis, imf1, label='IMF1', color='green')
        axes[1].set_title('IMF1')
        axes[1].legend()
        format_axes_emd(axes[1], x_axis, df_sections, df_phrases, mode=mode, beat_times=beat_times)

        # Plot IMF2
        axes[2].plot(x_axis, imf2, label='IMF2', color='red')
        axes[2].set_title('IMF2')
        axes[2].legend()
        format_axes_emd(axes[2], x_axis, df_sections, df_phrases, mode=mode, beat_times=beat_times)

        # Plot IMF3+
        axes[3].plot(x_axis, imf3_plus, label='IMF3+', color='purple')
        axes[3].set_title('IMF3+ (Sum of IMF3 and higher)')
        axes[3].legend()
        format_axes_emd(axes[3], x_axis, df_sections, df_phrases, mode=mode, beat_times=beat_times)

        plt.xlabel(f"Time ({mode})")
        plt.show()

def on_change_emd(*args):
    selected_song_label = song_dd_emd.value
    selected_feature_name = feature_dd_emd.value

    if selected_song_label and selected_feature_name:
        idx = title_to_index_emd[selected_song_label]
        song_obj = ALL_RAW_DATA[idx] # Always use raw data for EMD input
        plot_emd_components(song_obj, selected_feature_name)

# Wire up the widgets
song_dd_emd.observe(on_change_emd, names='value')
feature_dd_emd.observe(on_change_emd, names='value')

# Layout and display
display(widgets.VBox([
    song_dd_emd,
    feature_dd_emd,
]), out_emd)

# Initial plot
on_change_emd()

# Final Plots (Raw and EMD)

In [None]:

# --- Imports ---
import numpy as np
import pandas as pd
import ipywidgets as widgets
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
from scipy.ndimage import gaussian_filter1d

# -------------------------------------------------------------
# 0) DATASET DISCOVERY (Now directly uses global data and data_processed)
# -------------------------------------------------------------

# Ensure `data` and `data_processed` are available globally
# Assuming `data` holds the original raw song objects and `data_processed` holds the EMD processed ones
# If these variables are not present, this will raise an error.
if 'data' not in globals() or not isinstance(data, list) or not data:
    raise NameError("Global variable 'data' (raw song data) not found or is empty.")
if 'data_processed' not in globals() or not isinstance(data_processed, list) or not data_processed:
    raise NameError("Global variable 'data_processed' (EMD data) not found or is empty.")

ALL_RAW_DATA = data
ALL_EMD_DATA = data_processed

# -------------------------------------------------------------
# 1) CORE HELPERS
# -------------------------------------------------------------

def to_float_array(v):
    """Convert array-like to 1D float numpy array; preserve NaNs."""
    if v is None:
        return None
    try:
        if hasattr(v, "to_numpy"):
            arr = v.to_numpy(dtype=float)
        else:
            arr = np.asarray(v, dtype=float)
    except Exception:
        return None
    if arr.ndim == 0:
        arr = arr.reshape(1)
    elif arr.ndim > 1:
        arr = np.squeeze(arr)
        if arr.ndim > 1:
            arr = arr.ravel()
    return arr

def get_path(dct, path):
    """Traverse nested dict with dot-path (e.g., 'rhythmic.bpms_raw')."""
    if dct is None or not isinstance(path, str) or not path:
        return None
    cur = dct
    for part in path.split('.'):
        if isinstance(cur, dict) and part in cur:
            cur = cur.get(part)
        else:
            return None
    return cur

def format_title_from_metadata(template, song):
    """Fill template using metadata keys."""
    md = (song or {}).get("metadata", {}) if isinstance(song, dict) else {}
    return template.format(
        title=md.get("title", "Untitled"),
        artist=md.get("artist", "Unknown"),
        project_year=md.get("project_year", ""),
        sample_rate=md.get("sample_rate", "")
    )

# -------------------------------------------------------------
# Helper functions for feature access based on data source type
# -------------------------------------------------------------

# This list defines all features, their raw data paths, and EMD processed labels
PARAM_INFO = [
    {"display_name": "Tempo Deviation", "raw_path": "rhythmic.tempo_deviations", "emd_label": "Tempo Deviation"},
    {"display_name": "Voice Rhythmic Density", "raw_path": "rhythmic.voice_rhythmic_density", "emd_label": "Voice Rhythmic Density"},
    {"display_name": "Guitars Rhythmic Density", "raw_path": "rhythmic.guitars_rhythmic_density", "emd_label": "Guitars Rhythmic Density"},
    {"display_name": "Voice Loudness", "raw_path": "dynamic.voice_loudness", "emd_label": "Voice Loudness"},
    {"display_name": "Guitars Loudness", "raw_path": "dynamic.guitars_loudness", "emd_label": "Guitars Loudness"},
    {"display_name": "Tonal Dissonance", "raw_path": "harmonic.tonal_dissonance", "emd_label": "Tonal Dissonance"},
    {"display_name": "Tonal Dispersion", "raw_path": "harmonic.tonal_dispersion", "emd_label": "Tonal Dispersion"},
    {"display_name": "Melodic Voice Contour", "raw_path": "melodic.voice_melodic_contour", "emd_label": "Melodic Voice Contour"},
    {"display_name": "Harmonic Guitars Contour", "raw_path": "melodic.guitars_harmonic_contour", "emd_label": "Harmonic Guitars Contour"},
]

# PARAM_MAP will now just map display_name to itself for dropdown purposes
PARAM_MAP = {p['display_name']: p['display_name'] for p in PARAM_INFO}

def _get_param_info_by_display_name(display_name):
    for p in PARAM_INFO:
        if p['display_name'] == display_name:
            return p
    return None

def _get_feature_values(song_obj, display_name, data_source_type):
    param_info = _get_param_info_by_display_name(display_name)
    if not param_info:
        return None

    if data_source_type == 'Raw Data':
        return to_float_array(get_path(song_obj, param_info['raw_path']))
    elif data_source_type == 'EMD Data':
        return to_float_array(song_obj.get('features', {}).get(param_info['emd_label'], None))
    return None

def _is_feature_available(song_obj, display_name, data_source_type):
    val = _get_feature_values(song_obj, display_name, data_source_type)
    return val is not None and val.size > 0 # Check if it's not empty array either

def _available_params_for_song(song_obj, data_source_type):
    avail = []
    for p_info in PARAM_INFO:
        if _is_feature_available(song_obj, p_info['display_name'], data_source_type):
            avail.append(p_info['display_name'])
    return sorted(avail)


# -------------------------------------------------------------
# 2) PLOTTING UTILITIES
# -------------------------------------------------------------

def minmax_normalize(x):
    x = np.asarray(x, dtype=float)
    finite = np.isfinite(x)
    if not np.any(finite):
        return np.zeros_like(x)
    lo, hi = np.nanmin(x[finite]), np.nanmax(x[finite])
    if hi - lo < 1e-9:
        y = np.zeros_like(x)
    else:
        y = (x - lo) / (hi - lo)
    y[~finite] = np.nan
    return y

def format_axes(ax, x_axis, df_sections, df_phrases, title=None, mode="time", beat_times=None):
    """Bottom axis: time or beats; top axis: the other. Adds sections & phrases."""
    def sec_to_minsec(t):
        m = int(t // 60)
        s = int(round(t % 60))
        return f"{m}:{s:02d}"

    x_axis = np.asarray(x_axis)
    if x_axis.size == 0:
        return

    n_ticks = 10
    x_lo = x_axis[0] if x_axis.size > 0 else 0
    x_hi = x_axis[-1] if x_axis.size > 0 else 1
    tick_positions = np.linspace(x_lo, x_hi, n_ticks)

    bt = np.asarray(beat_times) if beat_times is not None else x_axis

    if mode == "beats":
        ax.set_xticks(tick_positions)
        beat_indices = np.searchsorted(bt, tick_positions)
        ax.set_xticklabels([f"{b+1}" for b in beat_indices], fontsize=9)
        ax.set_xlabel("Beat")

        ax_top = ax.secondary_xaxis('top')
        ax_top.set_xticks(tick_positions)
        ax_top.set_xticklabels([sec_to_minsec(t) for t in tick_positions], fontsize=9)
        ax_top.set_xlabel("Time (mm:ss)")
    else:
        ax.set_xticks(tick_positions)
        ax.set_xticklabels([sec_to_minsec(t) for t in tick_positions], fontsize=9)
        ax.set_xlabel("Time (mm:ss)")

        ax_top = ax.secondary_xaxis('top')
        ax_top.set_xticks(tick_positions)
        beat_indices = np.searchsorted(bt, tick_positions)
        ax_top.set_xticklabels([f"{b+1}" for b in beat_indices], fontsize=9)
        ax_top.set_xlabel("Beat")

    # Sections
    if df_sections is not None and all(c in df_sections.columns for c in ["Initial_Time", "Label"]):
        for time, label in zip(df_sections["Initial_Time"], df_sections["Label"]):
            ax.axvline(x=time, color="black", lw=1.2, alpha=0.9)
            ax.text(time, -0.18, label, rotation=90, ha="center", va="top",
                    fontsize=8, color="black", transform=ax.get_xaxis_transform(), clip_on=False)

    # Phrases
    if df_phrases is not None and all(c in df_phrases.columns for c in ["Initial_Time", "Label"]):
        for t, lab in zip(df_phrases["Initial_Time"], df_phrases["Label"]):
            ax.axvline(x=t, color="lightgrey", lw=1.0, alpha=0.7)
            ax.text(t, 1.15, lab, rotation=90, ha="center", va="bottom",
                    fontsize=8, color="grey", transform=ax.get_xaxis_transform(), clip_on=False)

    if title:
        ax.set_title(title, pad=70)

    ax.margins(x=0.05)
    ax.grid(False)

def plot(variables, norm=False, smoothing=0, x_axis=None, df_sections=None, df_phrases=None,
         title=None, mode="time", plot_index=None, peaks=None, y_label=None,
         song=None, title_template=None, beat_times=None, data_source_type='Raw Data'): # Added data_source_type
    """
    variables: list of specs; each can be
      - "Guitars Loudness" (display name)
      - ("Guitars Loudness", True, 2, "Guitars Loudness")
    song: required for feature lookup (either raw or processed song object)
    data_source_type: 'Raw Data' or 'EMD Data'
    """
    import os

    def resolve_series(display_name):
        """Return a float array and apply masks (for raw data) based on display_name."""
        y = _get_feature_values(song, display_name, data_source_type)

        if y is None:
            return None

        # Apply masks only if data_source_type is 'Raw Data'
        # For 'EMD Data', masks (NaNs) are already part of the processed features.
        if data_source_type == 'Raw Data':
            low = display_name.lower()
            # Apply masks where appropriate
            if "guitar" in low:
                m = to_float_array(get_path(song, "dynamic.g_mask"))
                if m is not None and m.size > 0:
                    L = min(len(y), len(m))
                    y = y[:L].copy()
                    m = m[:L].astype(bool)
                    y[~m] = np.nan
            if "voice" in low:
                m = to_float_array(get_path(song, "dynamic.v_mask"))
                if m is not None and m.size > 0:
                    L = min(len(y), len(m))
                    y = y[:L].copy()
                    m = m[:L].astype(bool)
                    y[~m] = np.nan
        return y

    # Final title (template uses metadata)
    final_title = title
    if final_title is None and title_template:
        final_title = format_title_from_metadata(title_template, song)
    elif final_title and "{" in final_title:
        final_title = format_title_from_metadata(final_title, song)

    if isinstance(variables, list):
        fig, ax = plt.subplots(figsize=(18, 6), constrained_layout=True)

        global_min, global_max = np.inf, -np.inf
        plotted_any = False

        for var in variables:
            if isinstance(var, tuple):
                # Expecting (display_name, nrm, smooth, label)
                if len(var) == 4:
                    display_name, nrm, smooth, label = var
                elif len(var) == 3:
                    display_name, nrm, smooth = var
                    label = str(display_name)
                else:
                    display_name, nrm, smooth, label = var, norm, smoothing, str(var)
            else:
                display_name, nrm, smooth, label = var, norm, smoothing, str(var)

            y = resolve_series(display_name)
            if y is None or len(y) == 0:
                print(f"Variable '{display_name}' not found or empty for {data_source_type}.")
                continue

            y_plot = np.copy(y)
            if nrm:
                y_plot = minmax_normalize(y_plot)
            if smooth > 0:
                finite_mask = np.isfinite(y_plot)
                if np.any(finite_mask):
                    y_plot[finite_mask] = gaussian_filter1d(y_plot[finite_mask], sigma=smooth)

            # x-values
            if x_axis is not None:
                x_vals = np.asarray(x_axis)
                min_len = min(len(x_vals), len(y_plot))
                x_vals = x_vals[:min_len]
                y_plot = y_plot[:min_len]
            else:
                x_vals = np.arange(len(y_plot))

            y_masked = np.ma.masked_invalid(y_plot)
            ax.plot(x_vals, y_masked, label=label)
            plotted_any = True

            finite_y_plot = y_plot[np.isfinite(y_plot)]
            if finite_y_plot.size > 0 and not nrm:
                global_min = min(global_min, np.min(finite_y_plot))
                global_max = max(global_max, np.max(finite_y_plot))

            if peaks and display_name in peaks: # peaks should now map to display names
                peak_indices = np.asarray(peaks[display_name])
                valid_peak_indices = peak_indices[peak_indices < len(x_vals)]
                ax.scatter(x_vals[valid_peak_indices], y_plot[valid_peak_indices],
                           color='red', label=f"{label} Peaks")

        # Y-limits
        if not norm and np.isfinite(global_min) and np.isfinite(global_max):
            y_range = global_max - global_min
            if y_range < 1e-9:
                ax.set_ylim(global_min - 0.1, global_max + 0.1)
            else:
                ax.set_ylim(global_min - y_range * 0.05, global_max + y_range * 0.05)
        elif norm:
            ax.set_ylim(-0.05, 1.05)

        # Axes formatting
        if x_axis is not None and isinstance(x_axis, (list, np.ndarray)) and len(x_axis) > 0:
            format_axes(ax, x_axis, df_sections, df_phrases, title=final_title, mode=mode, beat_times=beat_times)
        else:
            if final_title:
                ax.set_title(final_title, pad=70)

        if y_label is not None:
            ax.set_ylabel(y_label)

        if plotted_any:
            ax.legend()

        if plot_index is not None:
            os.makedirs("./saved_plots", exist_ok=True)
            plot_path = os.path.join("./saved_plots", f"plot_{plot_index}.pdf")
            plt.savefig(plot_path, bbox_inches="tight")
            print(f"Saved plot to {plot_path}")

        plt.show()

# -------------------------------------------------------------
# 3) UI (Song, Var1, Var2 + Normalize/Smooth) LIMITED TO REQUESTED PARAMETERS
# -------------------------------------------------------------

def _is_1d_numeric_series(x):
    if x is None:
        return False
    try:
        arr = np.asarray(x)
    except Exception:
        return False
    return arr.ndim == 1 and np.issubdtype(arr.dtype, np.number)

def _make_structural_dfs(song_obj):
    """Creates DataFrames for sections and phrases from a raw song object."""
    S = song_obj.get("structural", {})
    df_sections = None
    df_phrases = None
    st, sl = S.get("section_times"), S.get("section_labels")
    if _is_1d_numeric_series(st) and isinstance(sl, (list, tuple)) and len(st) == len(sl):
        df_sections = pd.DataFrame({"Initial_Time": st, "Label": sl})
    pt, pl = S.get("phrase_times"), S.get("phrase_labels")
    if _is_1d_numeric_series(pt) and isinstance(pl, (list, tuple)) and len(pt) == len(pl):
        df_phrases = pd.DataFrame({"Initial_Time": pt, "Label": pl})
    return df_sections, df_phrases

def _song_label(song_obj):
    md = song_obj.get("metadata", {})
    title = md.get("title", "Untitled")
    artist = md.get("artist", "Unknown")
    year = md.get("project_year", "")
    return f"{title} — {artist}" + (f" ({year})" if str(year).strip() else "")

# Build song list for dropdown (always based on raw data for consistent indexing/labeling)
song_options = []
title_to_index = {}
for idx, song in enumerate(ALL_RAW_DATA):
    lbl = _song_label(song)
    while lbl in title_to_index:
        lbl = f"{lbl}  #{idx}"
    title_to_index[lbl] = idx
    song_options.append(lbl)

# Widgets
data_source_dd = widgets.Dropdown(options=['Raw Data', 'EMD Data'], description='Data Source:')
song_dd = widgets.Dropdown(options=song_options, description='Song:', layout=widgets.Layout(width='60%'))

var1_dd  = widgets.Dropdown(options=[], description='Var 1:', layout=widgets.Layout(width='50%'))
norm1_cb = widgets.Checkbox(value=False, description='Normalize 1')
smooth1_sl = widgets.IntSlider(value=0, min=0, max=6, step=1, description='Smooth 1σ', layout=widgets.Layout(width='35%'))

var2_dd  = widgets.Dropdown(options=[], description='Var 2:', layout=widgets.Layout(width='50%'))
norm2_cb = widgets.Checkbox(value=False, description='Normalize 2')
smooth2_sl = widgets.IntSlider(value=0, min=0, max=6, step=1, description='Smooth 2σ', layout=widgets.Layout(width='35%'))

out = widgets.Output()

# State variables
current_song = None             # The song object currently selected for plotting (raw or processed)
current_raw_song = None         # The corresponding raw song object (for structural info)
beat_times_for_axes = None
AVAILABLE_PARAMS = []           # list of display names available in current song/data source

def _rebuild_song_state(selected_label, data_source_type):
    """Load the selected song and compute x-axis, structures, and available parameters."""
    global current_song, current_raw_song, beat_times_for_axes, AVAILABLE_PARAMS

    # 1. Determine which list of songs to use for features
    if data_source_type == 'Raw Data':
        current_features_list = ALL_RAW_DATA
    else: # 'EMD Data'
        current_features_list = ALL_EMD_DATA

    # 2. Get the index from the selected label (which refers to ALL_RAW_DATA's index)
    idx = title_to_index[selected_label]

    # 3. Set the current song object for feature plotting
    current_song = current_features_list[idx]

    # 4. Always get the *raw* song object for structural info (sections, phrases, beat_times)
    current_raw_song = ALL_RAW_DATA[idx]

    # 5. Determine available parameters for the selected song and data source type
    AVAILABLE_PARAMS = _available_params_for_song(current_song, data_source_type)

    # 6. Extract x-axis data and structural markers from the raw song object
    bt = get_path(current_raw_song, "rhythmic.beat_times")
    if _is_1d_numeric_series(bt):
        beat_times_for_axes = np.asarray(bt, dtype=float)
        x_axis = beat_times_for_axes
        mode = "time"
    else:
        pt = get_path(current_raw_song, "structural.phrase_times")
        if _is_1d_numeric_series(pt):
            x_axis = np.asarray(pt, dtype=float)
            beat_times_for_axes = None
            mode = "time"
        else:
            # If no beat_times or phrase_times, use index over the longest available param series
            longest = 0
            for p_info in PARAM_INFO:
                s = _get_feature_values(current_song, p_info['display_name'], data_source_type)
                if s is not None:
                    longest = max(longest, len(s))
            x_axis = np.arange(longest) if longest > 0 else np.array([])
            beat_times_for_axes = None
            mode = "time"

    df_sections, df_phrases = _make_structural_dfs(current_raw_song)
    plot_title_template = "{title} — {artist} ({project_year})\n" + f"Data Source: {data_source_type}"

    return AVAILABLE_PARAMS, x_axis, df_sections, df_phrases, plot_title_template, mode

def _refresh_var_dropdowns():
    keys = AVAILABLE_PARAMS
    var1_dd.options = keys
    var2_dd.options = keys
    # sensible defaults
    prefs = [
        "Tempo Deviation",
        "Voice Loudness",
        "Guitars Loudness",
        "Voice Rhythmic Density",
        "Guitars Rhythmic Density",
        "Tonal Dissonance",
        "Tonal Dispersion",
        "Melodic Voice Contour",
        "Harmonic Guitars Contour",
    ]
    var1_dd.value = next((k for k in prefs if k in keys), (keys[0] if keys else None))
    var2_dd.value = next((k for k in prefs if k in keys and k != var1_dd.value),
                         (keys[1] if len(keys) > 1 else None))

def _draw_plot(*_):
    out.clear_output(wait=True)
    if current_song is None or current_raw_song is None:
        with out:
            print("Please select a song.")
        return

    data_source_type = data_source_dd.value # Get data source type from widget
    params_local, x_axis, df_sections, df_phrases, plot_title_template, mode = \
        _rebuild_song_state(song_dd.value, data_source_type)

    variables = []
    # Build specs using selected display names and carry normalization/smoothing
    if var1_dd.value is not None and var1_dd.value in params_local:
        variables.append((var1_dd.value, norm1_cb.value, smooth1_sl.value, var1_dd.value))
    if var2_dd.value is not None and var2_dd.value in params_local:
        variables.append((var2_dd.value, norm2_cb.value, smooth2_sl.value, var2_dd.value))

    with out:
        plot(
            variables=variables,
            x_axis=x_axis,
            df_sections=df_sections,
            df_phrases=df_phrases,
            title=None, # title template is set inside rebuild_song_state
            title_template=plot_title_template,
            mode=mode,
            y_label=None,
            plot_index=None,
            peaks=None,
            song=current_song, # current_song is either raw or processed, for feature data
            beat_times=beat_times_for_axes,
            data_source_type=data_source_type # Pass data source type to plot function
        )

def _on_song_change(change):
    if change['name'] == 'value' and change['new'] is not None:
        _rebuild_song_state(change['new'], data_source_dd.value)
        _refresh_var_dropdowns()
        _draw_plot()

def _on_data_source_change(change):
    if change['name'] == 'value' and change['new'] is not None:
        _rebuild_song_state(song_dd.value, change['new'])
        _refresh_var_dropdowns()
        _draw_plot()

# Initialize & wire
if song_options:
    # Initial state rebuild with default selections
    _rebuild_song_state(song_options[0], data_source_dd.value)
    _refresh_var_dropdowns()

song_dd.observe(_on_song_change, names='value')
data_source_dd.observe(_on_data_source_change, names='value') # New observer for data source
var1_dd.observe(lambda ch: _draw_plot(), names='value')
var2_dd.observe(lambda ch: _draw_plot(), names='value')
norm1_cb.observe(lambda ch: _draw_plot(), names='value')
norm2_cb.observe(lambda ch: _draw_plot(), names='value')
smooth1_sl.observe(lambda ch: _draw_plot(), names='value')
smooth2_sl.observe(lambda ch: _draw_plot(), names='value')

# Layout & display
controls = widgets.VBox([
    data_source_dd, # Add data source dropdown to the layout
    song_dd,
    widgets.HBox([var1_dd, norm1_cb, smooth1_sl]),
    widgets.HBox([var2_dd, norm2_cb, smooth2_sl]),
])
display(controls, out)

# Initial render
_draw_plot()

# Viz

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.manifold import MDS
from sklearn.metrics import pairwise_distances

# --- 1. Prepare all correlation matrices for MDS ---
# Re-generate piece-level correlation matrices with associated metadata

piece_correlation_matrices_with_metadata = []

def flatten_upper_triangle(matrix):
    # Extract upper triangle (excluding diagonal) to get unique correlations
    mask = np.triu(np.ones(matrix.shape), k=1).astype(bool)
    return matrix.values[mask]

for processed_row in data_processed:
    title = processed_row.get('metadata', {}).get('title', 'Unknown Title')
    artist = processed_row.get('metadata', {}).get('artist', 'Unknown Artist')
    features = processed_row.get('features', {})

    # Filter out masks and keep only numeric features
    piece_data = {k: v for k, v in features.items()
                  if isinstance(v, list) and k not in ['Voice Mask', 'Guitar Mask'] and len(v) > 0}

    # Ensure all features have equal length and enough data for correlation
    if len(piece_data) > 1:
        lengths = {len(vals) for vals in piece_data.values()}
        if len(lengths) == 1 and list(lengths)[0] > 1: # Ensure at least 2 data points for correlation
            df_piece = pd.DataFrame(piece_data)
            correlation_matrix = df_piece.corr(method='pearson')

            piece_correlation_matrices_with_metadata.append({
                'title': title,
                'artist': artist,
                'matrix': correlation_matrix
            })

# Filter to include only piece-level data for MDS
flattened_vectors = []
labels = [] # Will be track titles
artists_for_plot = [] # Will be artists for coloring

for entry in piece_correlation_matrices_with_metadata:
    flat_vec = flatten_upper_triangle(entry['matrix'])
    flattened_vectors.append(flat_vec)
    labels.append(entry['title'])
    artists_for_plot.append(entry['artist'])

# Convert to numpy array
X = np.array(flattened_vectors)

# --- 2. Compute pairwise distances ---
# Using Euclidean distance between the flattened vectors
distance_matrix = pairwise_distances(X, metric='euclidean')

# --- 3. Apply MDS ---
# n_components=2 for a 2D plot
mds = MDS(n_components=2, dissimilarity='precomputed', random_state=42)
X_transformed = mds.fit_transform(distance_matrix)

# Create a DataFrame for easy plotting
df_mds = pd.DataFrame(X_transformed, columns=['MDS1', 'MDS2'])
df_mds['Label'] = labels # Track title
df_mds['Artist'] = artists_for_plot # Artist for coloring

# --- 4. Visualize the results ---
plt.figure(figsize=(12, 10))
sns.scatterplot(data=df_mds, x='MDS1', y='MDS2', hue='Artist', s=150, alpha=0.8)

# Annotate points with their labels (track names)
for i, row in df_mds.iterrows():
    plt.annotate(row['Label'], (row['MDS1'] + 0.02, row['MDS2'] + 0.02), fontsize=9)

plt.title('MDS of Pearson Correlation Matrices (Individual Pieces)')
plt.xlabel('MDS Dimension 1')
plt.ylabel('MDS Dimension 2')
plt.grid(True)
plt.axhline(0, color='grey', linestyle='--', linewidth=0.8)
plt.axvline(0, color='grey', linestyle='--', linewidth=0.8)
plt.tight_layout()
plt.show()