Libreries 

In [None]:
import os
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px
from itertools import combinations
from scipy import stats
from scipy.io import loadmat, whosmat
from scipy.spatial.distance import pdist, squareform, cdist
from scipy.cluster.hierarchy import dendrogram, linkage
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans

import src
from src import config, loadmatNina

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.cluster.hierarchy as sch
from scipy.spatial.distance import pdist, squareform
from sklearn.preprocessing import StandardScaler



In [None]:
# Choose the database to analyze
database = 'DB4'

data_path = f'data/{database}'

# Find the folder named with the convention s + "number"
folder = None
for item in os.listdir(data_path):
    if re.match(r'[sS]\d+', item) or re.match(r'Subject\d+', item):
        folder = item
        break

if folder:
    folder_path = os.path.join(data_path, folder)
    results = []

    # Iterate over all .mat files in the folder
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.mat'):
            file_path = os.path.join(folder_path, file_name)
            info = whosmat(file_path)
            results.append((file_name, info))

    # Create a DataFrame to store the results
    data = {}
    for file_name, info in results:
        for item in info:
            if item[0] not in data:
                data[item[0]] = {}
            data[item[0]][file_name] = item[1:]

    df = pd.DataFrame(data)
    df = df.transpose()
    df.columns.name = 'File Name'

    print(df)
else:
    print("No folder found with the convention s + 'number'")

Functions

- For complete signal

In [None]:
import numpy as np
import pywt

def calculate_emg_metrics(signal, fs=1000):
    """
    Calculates various metrics for an EMG signal.

    Parameters:
    - signal: NumPy array containing the EMG signal.
    - fs: Sampling frequency in Hz (default: 1000 Hz).

    Returns:
    - A dictionary with the computed metrics.
    """
    # Mean Absolute Value (MAV)
    mav = np.mean(np.abs(signal))
    
    # Integrated Absolute Value (IAV)
    iav = np.sum(np.abs(signal))
    
    # Root Mean Square (RMS)
    rms = np.sqrt(np.mean(signal**2))
    
    # Waveform Length (WL)
    wl = np.sum(np.abs(np.diff(signal)))
    
    # Zero Crossings (ZC)
    zc = np.sum(np.diff(np.sign(signal)) != 0)
    
    # Slope Sign Changes (SSC)
    diff_signal = np.diff(signal)
    ssc = np.sum((diff_signal[1:] * diff_signal[:-1]) < 0)
    
    # Variance (VAR)
    var = np.var(signal)
    
    # Coefficient of Variation (CoV)
    mean_signal = np.mean(signal)
    cov = (np.std(signal) / mean_signal) if mean_signal != 0 else 0
    
    # Mean Frequency (MNF)
    freqs = np.fft.rfftfreq(len(signal), d=1/fs)
    fft_magnitude = np.abs(np.fft.rfft(signal))
    mnf = np.sum(freqs * fft_magnitude) / np.sum(fft_magnitude)
    
    # Marginal Discrete Wavelet Transform (mDWT)
    coeffs = pywt.wavedec(signal, 'db4', level=4)
    mdwt = np.sum([np.sum(np.abs(c)) for c in coeffs])
    
    # Temporal Difference (TD)
    td = np.sum(np.abs(np.diff(signal)))
    
    # Mean Absolute Value Slope (MAVS)
    mavs = np.mean(np.abs(np.diff(signal)))
    
    # Return the metrics as a dictionary
    metrics = {
        "MAV": mav,
        "IAV": iav,
        "RMS": rms,
        "WL": wl,
        "ZC": zc,
        "SSC": ssc,
        "VAR": var,
        "CoV": cov,
        "MNF": mnf,
        "mDWT": mdwt,
        "TD": td,
        "MAVS": mavs
    }
    
    return metrics


- For signal with means and standard deviation

In [None]:
import numpy as np
import pywt

def calculate_emg_metrics_std(signal, fs=1000):
    """
    Calculates various metrics for an EMG signal, including mean and standard deviation.

    Parameters:
    - signal: NumPy array containing the EMG signal.
    - fs: Sampling frequency in Hz (default: 1000 Hz).

    Returns:
    - A dictionary with the computed metrics.
    """
    if signal.ndim == 2:
        metrics_per_channel = [calculate_emg_metrics(signal[:, ch], fs) for ch in range(signal.shape[1])]
        averaged_metrics = {key: np.mean([m[key] for m in metrics_per_channel]) for key in metrics_per_channel[0]}
        return averaged_metrics
    
    # Mean Absolute Value (MAV)
    mav = np.mean(np.abs(signal))
    mav_std = np.std(np.abs(signal))
    
    # Integrated Absolute Value (IAV)
    iav = np.sum(np.abs(signal))
    iav_std = np.std(np.abs(signal))
    
    # Root Mean Square (RMS)
    rms = np.sqrt(np.mean(signal**2))
    rms_std = np.std(signal)
    
    # Waveform Length (WL)
    wl = np.sum(np.abs(np.diff(signal)))
    wl_std = np.std(np.abs(np.diff(signal)))
    
    # Zero Crossings (ZC)
    zc = np.sum(np.diff(np.sign(signal)) != 0)
    zc_std = np.std(np.diff(np.sign(signal)) != 0)
    
    # Slope Sign Changes (SSC)
    diff_signal = np.diff(signal)
    ssc = np.sum((diff_signal[1:] * diff_signal[:-1]) < 0)
    ssc_std = np.std((diff_signal[1:] * diff_signal[:-1]) < 0)
    
    # Variance (VAR)
    var = np.var(signal)
    var_std = np.std(signal)
    
    # Coefficient of Variation (CoV)
    mean_signal = np.mean(signal)
    cov = (np.std(signal) / mean_signal) if mean_signal != 0 else 0
    cov_std = np.std(cov)
    
    # Mean Frequency (MNF)
    freqs = np.fft.rfftfreq(len(signal), d=1/fs)
    fft_magnitude = np.abs(np.fft.rfft(signal))
    mnf = np.sum(freqs * fft_magnitude) / np.sum(fft_magnitude)
    mnf_std = np.std(freqs * fft_magnitude) / np.sum(fft_magnitude)
    
    # Marginal Discrete Wavelet Transform (mDWT)
    coeffs = pywt.wavedec(signal, 'db4', level=4)
    mdwt = np.sum([np.sum(np.abs(c)) for c in coeffs])
    mdwt_std = np.std([np.sum(np.abs(c)) for c in coeffs])
    
    # Temporal Difference (TD)
    td = np.sum(np.abs(np.diff(signal)))
    td_std = np.std(np.abs(np.diff(signal)))
    
    # Mean Absolute Value Slope (MAVS)
    mavs = np.mean(np.abs(np.diff(signal)))
    mavs_std = np.std(np.abs(np.diff(signal)))
    
    # Return the metrics as a dictionary
    metrics = {
        "MAV": mav, "MAV_STD": mav_std,
        "IAV": iav, "IAV_STD": iav_std,
        "RMS": rms, "RMS_STD": rms_std,
        "WL": wl, "WL_STD": wl_std,
        "ZC": zc, "ZC_STD": zc_std,
        "SSC": ssc, "SSC_STD": ssc_std,
        "VAR": var, "VAR_STD": var_std,
        "CoV": cov, "CoV_STD": cov_std,
        "MNF": mnf, "MNF_STD": mnf_std,
        "mDWT": mdwt, "mDWT_STD": mdwt_std,
        "TD": td, "TD_STD": td_std,
        "MAVS": mavs, "MAVS_STD": mavs_std
    }
    
    return metrics


- This functions calculate the metrics for channel and average the values for a complete result

In [None]:
def calculate_emg_metrics_means(signal):
    """
    Calculates the metrics of an EMG signal. If there are multiple channels, it computes 
    the metrics for each channel and then averages the results.
    """
    if signal.ndim == 2:  # If the signal has multiple channels
        metrics_per_channel = [calculate_emg_metrics_means(signal[:, ch]) for ch in range(signal.shape[1])]
        averaged_metrics = {key: np.mean([m[key] for m in metrics_per_channel]) for key in metrics_per_channel[0]}
        return averaged_metrics
    
    # Mean Absolute Value (MAV)
    mav = np.mean(np.abs(signal))
    
    # Integrated Absolute Value (IAV)
    iav = np.sum(np.abs(signal))
    
    # Root Mean Square (RMS)
    rms = np.sqrt(np.mean(signal**2))
    
    # Waveform Length (WL)
    wl = np.sum(np.abs(np.diff(signal)))
    
    # Zero Crossings (ZC)
    zc = np.sum(np.diff(np.sign(signal)) != 0)
    
    # Slope Sign Changes (SSC)
    diff_signal = np.diff(signal)
    ssc = np.sum((diff_signal[1:] * diff_signal[:-1]) < 0)
    
    # Variance (VAR)
    var = np.var(signal)
    
    # Coefficient of Variation (CoV)
    mean_signal = np.mean(signal)
    cov = (np.std(signal) / mean_signal) if mean_signal != 0 else 0
    
    # Mean Frequency (MNF)
    freqs = np.fft.rfftfreq(len(signal), d=1/fs)
    fft_magnitude = np.abs(np.fft.rfft(signal))
    mnf = np.sum(freqs * fft_magnitude) / np.sum(fft_magnitude)
    
    # Marginal Discrete Wavelet Transform (mDWT)
    coeffs = pywt.wavedec(signal, 'db4', level=4)
    mdwt = np.sum([np.sum(np.abs(c)) for c in coeffs])
    
    # Temporal Difference (TD)
    td = np.sum(np.abs(np.diff(signal)))
    
    # Mean Absolute Value Slope (MAVS)
    mavs = np.mean(np.abs(np.diff(signal)))
    
    # Return the metrics as a dictionary
    metrics = {
        "MAV": mav,
        "IAV": iav,
        "RMS": rms,
        "WL": wl,
        "ZC": zc,
        "SSC": ssc,
        "VAR": var,
        "CoV": cov,
        "MNF": mnf,
        "mDWT": mdwt,
        "TD": td,
        "MAVS": mavs
    }
    
    return metrics

Plots and metrics for complete grasp

In [None]:
# Database name
database = 'DB4'

# Full path to the database folder
data_path = os.path.abspath(os.path.join('data', database))

# List of subjects, generating names from 's1' to 's10'
subjects = [f's{i}' for i in range(1, 11)]

# Iterate over each subject
for subject in subjects:
    subject_dir = os.path.join(data_path, subject)
    
    # Iterate over exercise files E1, E2, and E3 for the current subject
    for exercise in ["E1", "E2", "E3"]:
        filename = f"{subject.upper()}_{exercise}_A1.mat"
        file_path = os.path.join(subject_dir, filename)
        
        # Check if the file exists
        if not os.path.exists(file_path):
            print(f"File not found: {file_path}")
            continue
        
        print(f"\nProcessing: {filename}")
        
        # Load data from the .mat file
        mat_data = src.loadmatNina(database, filename, subject=subject)
        
        # Verify the structure of the loaded dictionary
        print(f"Keys in mat_data: {mat_data.keys()}")
        
        # Retrieve re-labeled data and the list of labeled grasps
        test_df, grasps_etiquetados = src.build_dataframe(
            mat_file=mat_data,
            database=database,
            filename=filename,
            rectify=False,
            normalize=True
        )
        
        # Iterate over each labeled grasp
        for grasp in grasps_etiquetados:
            try:
                # Check if 'emg' key exists in mat_data
                if 'emg' not in mat_data:
                    raise KeyError(f"The key 'emg' is not in mat_data. Available keys: {mat_data.keys()}")
                
                # Get the EMG signal for the labeled grasp
                emg_signal = mat_data['emg'][grasp]  # Adjust based on the actual structure
                
                # Compute EMG signal metrics
                metrics = calculate_emg_metrics(emg_signal)
                
                # Print computed metrics
                print(f"\nMetrics for Grasp {grasp}:")
                for key, value in metrics.items():
                    print(f"{key}: {value:.4f}")
                
                # Plot the EMG signal for the grasp
                src.plot_emg_data(
                    database=database,
                    mat_file=mat_data,
                    grasp_number=grasp,
                    interactive=False,
                    include_rest=True,
                    use_stimulus=False,
                    addFourier=False,
                    padding=100,
                    title=f"{filename} - Grasp {grasp}"
                )
            except KeyError as e:
                print(f"    Error: {str(e)}")
            except Exception as e:
                print(f"    Error processing grasp {grasp}: {str(e)}")
                continue


Dataframe with metrics for a complete signal without discriminating by channel

In [None]:
# List to store all extracted metrics
metrics_data = []

# Iterate over each subject in the database
for subject in subjects:
    subject_dir = os.path.join(data_path, subject)
    
    # Iterate over exercise files E1, E2, and E3
    for exercise in ["E1", "E2", "E3"]:
        filename = f"{subject.upper()}_{exercise}_A1.mat"
        file_path = os.path.join(subject_dir, filename)
        
        # Check if the file exists before processing
        if not os.path.exists(file_path):
            continue  # Skip if file is not available
        
        # Load data from the .mat file
        mat_data = src.loadmatNina(database, filename, subject=subject)
        
        # Build DataFrame with re-labeled data
        test_df, grasps_etiquetados = src.build_dataframe(
            mat_file=mat_data,
            database=database,
            filename=filename,
            rectify=False,
            normalize=True
        )
        
        # Iterate over labeled grasps
        for grasp in grasps_etiquetados:
            try:
                # Retrieve the corresponding EMG signal
                emg_signal = mat_data['emg'][grasp]
                
                # Compute EMG signal metrics
                metrics = calculate_emg_metrics(emg_signal)
                
                # Append metrics with metadata to the list
                metrics_data.append({
                    "subject": subject,
                    "exercise": exercise,
                    "filename": filename,
                    "grasp": grasp,
                    **metrics  # Unpack metrics into the dictionary
                })
                
            except Exception as e:
                print(f"Error in {filename} - Grasp {grasp}: {str(e)}")
                continue

# Create a DataFrame with organized metrics
metrics_df = pd.DataFrame(metrics_data)

# Reorder columns (optional) for better visualization
column_order = ["subject", "exercise", "filename", "grasp"] + list(metrics.keys())
metrics_df = metrics_df[column_order]

# Print the final DataFrame with extracted metrics
print("\nMetrics DataFrame:")
metrics_df


Dataframe with average of metrics for channels in each grasp

Dataframe with mean and standard deviation

In [None]:
import os
import pandas as pd
import src

# List to store all computed metrics
metrics_data = []

# Iterate over each subject in the database
for subject in subjects:
    subject_dir = os.path.join(data_path, subject)
    
    # Iterate over exercise files E1, E2, and E3
    for exercise in ["E1", "E2", "E3"]:
        filename = f"{subject.upper()}_{exercise}_A1.mat"
        file_path = os.path.join(subject_dir, filename)
        
        # Check if the file exists before processing
        if not os.path.exists(file_path):
            continue  # Skip if file is not available
        
        # Load data from the .mat file
        mat_data = src.loadmatNina(database, filename, subject=subject)
        
        # Build DataFrame with re-labeled data
        test_df, grasps_etiquetados = src.build_dataframe(
            mat_file=mat_data,
            database=database,
            filename=filename,
            rectify=False,
            normalize=True
        )
        
        # Iterate over labeled grasps
        for grasp in grasps_etiquetados:
            try:
                # Retrieve the corresponding EMG signal
                emg_signal = mat_data['emg'][grasp]
                
                # Compute EMG signal metrics using standard deviation
                metrics = calculate_emg_metrics_std(emg_signal)
                
                # Append metrics with metadata to the list
                metrics_data.append({
                    "subject": subject,
                    "exercise": exercise,
                    "filename": filename,
                    "grasp": grasp,
                    **metrics  # Unpack metrics into the dictionary
                })
                
            except Exception as e:
                print(f"Error in {filename} - Grasp {grasp}: {str(e)}")
                continue

# Create a DataFrame with organized metrics
metrics_df_std = pd.DataFrame(metrics_data)

# Reorder columns (optional) for better visualization
column_order = ["subject", "exercise", "filename", "grasp"] + list(metrics.keys())
metrics_df_std = metrics_df_std[column_order]

# Print the final DataFrame with extracted metrics
print("\nMetrics DataFrame:")
display(metrics_df_std)


Dataframe for every channels of data base

In [None]:
# List to store all generated DataFrames
all_dataframes = []

# Look for folders matching the pattern "s + number" or "Subject + number"
for folder in os.listdir(data_path):
    if re.match(r'[sS]\d+', folder) or re.match(r'Subject\d+', folder):
        folder_path = os.path.join(data_path, folder)
        
        # Iterate over all .mat files in the folder
        for file_name in os.listdir(folder_path):
            if file_name.endswith('.mat'):
                file_path = os.path.join(folder_path, file_name)
                
                # Attempt to load the .mat file
                try:
                    mat_data = src.loadmatNina(database, file_name, subject=folder)
                except Exception as e:
                    print(f"⚠️ Error loading {file_name}: {str(e)}")
                    continue
                
                # Attempt to process the file with src.build_dataframe
                try:
                    test_df, grasps = src.build_dataframe(
                        mat_file=mat_data,
                        database=database,
                        filename=file_name,
                        rectify=False,
                        normalize=True
                    )
                    
                    # Add a column with the subject name (folder) to the DataFrame
                    test_df['subject'] = folder  
                    
                    # Append the processed DataFrame to the list
                    all_dataframes.append(test_df)

                except Exception as e:
                    print(f"Error processing {file_name}: {str(e)}")
                    continue

# Concatenate all DataFrames into a single one if data is available
if all_dataframes:  
    combined_df = pd.concat(all_dataframes, ignore_index=True)
    
    # Display the combined DataFrame
    print("\n Combined DataFrame:")
    display(combined_df)  

else:
    print("Warning: No DataFrames were generated. Check the input data.")


Dataframe with metrics for channel

In [None]:
import numpy as np
import pywt
import pandas as pd

def calculate_emg_metrics(signal, fs=1000):
    """
    Calculates various metrics for an EMG signal, including mean and standard deviation.

    Parameters:
    - signal: NumPy array containing the EMG signal.
    - fs: Sampling frequency in Hz (default: 1000 Hz).

    Returns:
    - A dictionary with the computed metrics.
    """
    try:
        if signal.ndim == 2:
            metrics_per_channel = [calculate_emg_metrics(signal[:, ch], fs) for ch in range(signal.shape[1])]
            averaged_metrics = {key: np.mean([m[key] for m in metrics_per_channel]) for key in metrics_per_channel[0]}
            return averaged_metrics
        
        abs_signal = np.abs(signal)
        diff_signal = np.diff(signal)
        diff_abs_signal = np.abs(diff_signal)
        
        # Compute Metrics
        metrics = {
            "MAV": np.mean(abs_signal), "MAV_STD": np.std(abs_signal),
            "IAV": np.sum(abs_signal), "IAV_STD": np.std(abs_signal),
            "RMS": np.sqrt(np.mean(signal**2)), "RMS_STD": np.std(signal),
            "WL": np.sum(diff_abs_signal), "WL_STD": np.std(diff_abs_signal),
            "ZC": np.sum(np.diff(np.sign(signal)) != 0), "ZC_STD": np.std(np.diff(np.sign(signal)) != 0),
            "SSC": np.sum((diff_signal[1:] * diff_signal[:-1]) < 0), "SSC_STD": np.std((diff_signal[1:] * diff_signal[:-1]) < 0),
            "VAR": np.var(signal), "VAR_STD": np.std(signal),
            "CoV": (np.std(signal) / np.mean(signal)) if np.mean(signal) != 0 else 0,
            "TD": np.sum(diff_abs_signal), "TD_STD": np.std(diff_abs_signal),
            "MAVS": np.mean(diff_abs_signal), "MAVS_STD": np.std(diff_abs_signal),
            "MNP": np.mean(signal**2), "MNP_STD": np.std(signal**2),
        }
        
        # Spectral Metrics
        freqs = np.fft.rfftfreq(len(signal), d=1/fs)
        fft_magnitude = np.abs(np.fft.rfft(signal))
        metrics["MNF"] = np.sum(freqs * fft_magnitude) / np.sum(fft_magnitude) if np.sum(fft_magnitude) != 0 else 0
        metrics["MNF_STD"] = np.std(freqs * fft_magnitude) / np.sum(fft_magnitude) if np.sum(fft_magnitude) != 0 else 0
        
        # Wavelet Transform
        coeffs = pywt.wavedec(signal, 'db4', level=4)
        mdwt_values = np.array([np.sum(np.abs(c)) for c in coeffs])
        metrics["mDWT"] = np.sum(mdwt_values)
        metrics["mDWT_STD"] = np.std(mdwt_values)
        
        # Kurtosis
        std_signal = np.std(signal)
        metrics["Kurt"] = np.mean((signal - np.mean(signal)) ** 4) / (std_signal ** 4) if std_signal != 0 else 0
        metrics["Kurt_STD"] = np.std(metrics["Kurt"])
        
        return metrics
    
    except Exception as e:
        print(f"Error in calculate_emg_metrics: {e}")
        return {}

# List to store the calculated metrics for each channel
metrics_data = []

# Iterate over each subject and each identified movement (relabeled or stimulus)
for (subject, relabeled), group in combined_df.groupby(['subject', 'relabeled']):  # Change 'relabeled' to 'stimulus' if needed
    # Iterate over each EMG channel
    for channel in group.columns:  # Loop through all DataFrame columns
        if channel.startswith('Channel'):  # Filter only EMG signal columns
            # Get the signal values for the current channel
            channel_signal = group[channel].values
            
            # Compute EMG signal metrics for the current channel
            metrics = calculate_emg_metrics(channel_signal)
            
            # Append metadata and computed metrics to the list
            metrics_data.append({
                "subject": subject,  # Subject identification
                "relabeled": relabeled,  # Movement identification (relabeled or stimulus)
                "channel": channel,  # EMG channel
                **metrics  # Unpack all computed metrics
            })

# Create a DataFrame containing all the obtained metrics
metrics_df = pd.DataFrame(metrics_data)

# Reorder columns for better visualization (optional)
column_order = ["subject", "relabeled", "channel"] + list(metrics.keys())
metrics_df = metrics_df[column_order]

# Display the DataFrame with the computed metrics
print("\nMetrics DataFrame by Channel, Subject, and Relabeled:")
display(metrics_df)


In [None]:
# Remove the 'channel' column to group data by subject and movement type
grouped_df = metrics_df.drop(columns=['channel'])

# Compute the mean value of each metric grouped by subject and movement
df_mean = grouped_df.groupby(['subject', 'relabeled']).mean()

# Compute the standard deviation of each metric grouped by subject and movement
df_std = grouped_df.groupby(['subject', 'relabeled']).std()

# Rename columns to indicate they contain mean values
df_mean.columns = [f"{col} mean" for col in df_mean.columns]

# Rename columns to indicate they contain standard deviation values
df_std.columns = [f"{col} std" for col in df_std.columns]

# Merge the mean and standard deviation DataFrames into a single DataFrame
df_result = df_mean.merge(df_std, on=['subject', 'relabeled']).reset_index()

# Display the final DataFrame with aggregated metrics
display(df_result)


Dendogram for grasp 

In [None]:
# Select only the columns containing numerical features
features = df_result.iloc[:, 2:]  # Exclude 'subject' and 'relabeled'

# Normalize the data to improve comparability and avoid bias due to different scales
df_scaled = StandardScaler().fit_transform(features)

# Apply hierarchical clustering using the Ward method (minimizes variance within clusters)
linked = sch.linkage(df_scaled, method='ward')

# Create and visualize the dendrogram
plt.figure(figsize=(20, 10))
sch.dendrogram(
    linked, 
    labels=df_result['relabeled'].values,  # Labels on the x-axis based on the 'relabeled' variable
    leaf_rotation=90,  # Rotate labels for better readability
    leaf_font_size=8  # Adjust font size
)
plt.title("Dendrogram based on the 'relabeled' variable")
plt.xlabel("Clusters")
plt.ylabel("Euclidean Distance")
plt.show()


In [None]:
# Group the data by 'relabeled' and calculate the mean and standard deviation of each numerical feature
grouped = df_result.select_dtypes(include=['number']).groupby(df_result['relabeled']).agg(['mean', 'std'])
display(grouped)

# Flatten column names to make them easier to work with
grouped.columns = ['_'.join(col).strip() for col in grouped.columns.values]

# Normalize the data to prevent magnitude differences from affecting the clustering distance
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaled_features = scaler.fit_transform(grouped)

# Apply hierarchical clustering using the Ward method (minimizes variance within clusters)
linked = sch.linkage(scaled_features, method='ward')

# Create and visualize the dendrogram
plt.figure(figsize=(12, 6))
sch.dendrogram(
    linked, 
    labels=grouped.index.tolist(),  # Labels on the x-axis based on the 'relabeled' variable
    leaf_rotation=90,  # Rotate labels for better readability
    leaf_font_size=8  # Adjust font size
)
plt.title("Dendrogram based on mean and standard deviation per grasp type")
plt.xlabel("Grasps")
plt.ylabel("Euclidean Distance") 
plt.show()


In [None]:
# 1. Compute the average of metrics per channel
# Exclude 'subject', 'relabeled', and 'channel' to keep only the metric columns
metrics_columns = [col for col in metrics_df.columns if col not in ["subject", "relabeled", "channel"]]

# Group by 'channel' and compute the mean of each metric
average_metrics_df = metrics_df.groupby('channel')[metrics_columns].mean().reset_index()
display(average_metrics_df)

# 2. Prepare data for clustering
X = average_metrics_df[metrics_columns].values  # Extract metric values as an array for clustering

# 3. Compute the distance matrix and perform hierarchical clustering
Z = linkage(X, method='ward')  # 'ward' minimizes variance within clusters

# 4. Plot the dendrogram with adjustments for better visualization
plt.figure(figsize=(15, 8)) 
plt.title('Dendrogram of EMG Channels (Average Metrics)', fontsize=16, pad=20)
plt.xlabel('Channels', fontsize=14)
plt.ylabel('Distance', fontsize=14)

# Adjust the dendrogram to prevent overlapping labels
dendrogram(
    Z,
    labels=average_metrics_df['channel'].values,  # Labels for each channel
    leaf_rotation=90,  # Rotate labels for better readability
    leaf_font_size=12,  # Adjust font size
    color_threshold=0.7 * max(Z[:, 2]),  # Threshold to color clusters
)

plt.tight_layout()  # Automatically adjust layout for better fit
plt.show()


In [None]:
# 1. Compute the mean and standard deviation of metrics per channel
# Exclude 'subject', 'relabeled', and 'channel' to keep only numerical metric columns
metrics_columns = [col for col in metrics_df.columns if col not in ["subject", "relabeled", "channel"]]

# Group by 'channel' and compute the mean and standard deviation for each metric
agg_metrics_df = metrics_df.groupby('channel')[metrics_columns].agg(['mean', 'std']).reset_index()

# Flatten column names for easier access (concatenating "metric_type")
agg_metrics_df.columns = ['_'.join(col).strip('_') for col in agg_metrics_df.columns]

display(agg_metrics_df)  # Display the aggregated metrics table

# 2. Prepare data for clustering using only the metric averages
X = agg_metrics_df[[col for col in agg_metrics_df.columns if col.endswith('_mean')]].values  # Extract only "_mean" columns

# 3. Compute the distance matrix and perform hierarchical clustering
Z = linkage(X, method='ward')  # 'ward' minimizes variance within clusters

# 4. Plot the dendrogram with adjustments for better visualization
plt.figure(figsize=(15, 8)) 
plt.title('Dendrogram of EMG Channels (Average Metrics)', fontsize=16, pad=20)
plt.xlabel('Channels', fontsize=14)
plt.ylabel('Distance', fontsize=14)

# Adjust the dendrogram to prevent overlapping labels
dendrogram(
    Z,
    labels=agg_metrics_df['channel'].values,  # Labels for EMG channels
    leaf_rotation=90,  # Rotate labels for better readability
    leaf_font_size=12,  # Adjust font size
    color_threshold=0.7 * max(Z[:, 2]),  # Threshold to color clusters
)

plt.tight_layout()  # Automatically adjust layout for better fit
plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.cluster.hierarchy import dendrogram, linkage

# 1. Select only metric columns for clustering, excluding metadata
# Exclude non-metric columns ('subject', 'relabeled', and 'channel')
metrics_columns = [col for col in metrics_df.columns if col not in ["subject", "relabeled", "channel"]]
X = metrics_df[metrics_columns].values  # Convert to a NumPy array for clustering

# 2. Compute the distance matrix and perform hierarchical clustering
Z = linkage(X, method='ward')  # 'ward' method minimizes variance within clusters

# 3. Plot the dendrogram with adjustments for better visualization
plt.figure(figsize=(20, 10))  # Increase figure size
plt.title('Dendrogram of EMG Channels', fontsize=16, pad=20)
plt.xlabel('Channels', fontsize=14)
plt.ylabel('Distance', fontsize=14)

# Configure the dendrogram
dendrogram(
    Z,
    labels=metrics_df['channel'].values,  # Labels for EMG channels
    leaf_rotation=90,  # Rotate labels for better readability
    leaf_font_size=10,  # Adjust font size
    color_threshold=0.7 * max(Z[:, 2]),  # Threshold to color clusters
)

plt.tight_layout()  # Automatically adjust layout for better fit
plt.show()


Mahalanobis use

In [None]:
# 1. Group by 'relabeled' and compute the mean and standard deviation of each metric
grouped = df_result.select_dtypes(include=['number']).groupby(df_result['relabeled']).agg(['mean', 'std'])

# 2. Flatten column names for easier access
grouped.columns = ['_'.join(col).strip() for col in grouped.columns.values]

# 3. Normalize the data to avoid magnitude differences affecting clustering
scaler = StandardScaler()
scaled_features = scaler.fit_transform(grouped)

# 4. Compute the covariance matrix and its pseudoinverse (for Mahalanobis distance)
cov_matrix = np.cov(scaled_features, rowvar=False)  # Covariance matrix
inv_cov_matrix = np.linalg.pinv(cov_matrix)  # Pseudoinverse instead of inverse

# 5. Compute Mahalanobis distances between each pair of groups
mahalanobis_distances = pdist(scaled_features, metric='mahalanobis', VI=inv_cov_matrix)

# 6. Convert to a square distance matrix
distance_matrix = squareform(mahalanobis_distances)

# 7. Apply hierarchical clustering using Mahalanobis distance
linked = sch.linkage(distance_matrix, method='average')  # 'average' method for more stability

# 8. Generate the dendrogram
plt.figure(figsize=(12, 6))
sch.dendrogram(linked, labels=grouped.index.tolist(), leaf_rotation=90, leaf_font_size=8)
plt.title("Dendrogram based on Mahalanobis Distance", fontsize=14, pad=15)
plt.xlabel("Grasps", fontsize=12)
plt.ylabel("Mahalanobis Distance", fontsize=12)
plt.show()


In [None]:
# Select only the relevant columns for clustering analysis
X = df_result.iloc[:, 3:35].values  # Assuming df_result is equivalent to ypolfqrt in R

# Normalize the data to prevent bias due to differences in variable scales
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Compute the covariance matrix and its inverse
cov_matrix = np.cov(X_scaled, rowvar=False)
inv_cov_matrix = np.linalg.pinv(cov_matrix)  # Use pseudoinverse to avoid singularity issues

# Compute the Mahalanobis distance between samples
mahalanobis_distances = pdist(X_scaled, metric='mahalanobis', VI=inv_cov_matrix)

# Convert the distance vector into a square distance matrix for clustering
distance_matrix = squareform(mahalanobis_distances)

# Apply hierarchical clustering using the Ward method
# This method minimizes the variance within the formed clusters.
linked = sch.linkage(distance_matrix, method='ward')

# Create and visualize the dendrogram with label and font size adjustments
plt.figure(figsize=(12, 6))  # Adjust figure size
sch.dendrogram(
    linked, 
    labels=df_result['relabeled'].values,  # Use grasp movement labels
    leaf_rotation=90,  # Rotate labels for better readability
    leaf_font_size=8  # Adjust font size for labels
)
plt.title("Dendrogram based on Mahalanobis Distance")  # Set plot title
plt.xlabel("Grasps")  # X-axis label
plt.ylabel("Mahalanobis Distance")  # Y-axis label
plt.show()  # Display the plot


In [None]:
# Group the data by the 'relabeled' column and compute the mean and standard deviation
grouped = df_result.select_dtypes(include=['number']).groupby(df_result['relabeled']).agg(['mean', 'std'])

# Flatten column names for easier data access
# Add '_mean' and '_std' suffixes to identify each statistic
grouped.columns = ['_'.join(col).strip() for col in grouped.columns.values]

# Normalize the data so that all features are on the same scale
scaler = StandardScaler()
scaled_features = scaler.fit_transform(grouped)

# Compute the covariance matrix and invert it
cov_matrix = np.cov(scaled_features, rowvar=False)
inv_cov_matrix = np.linalg.pinv(cov_matrix)  # Use the pseudo-inverse to avoid issues with singular matrices

# Compute the Mahalanobis distance between samples
mahalanobis_distances = pdist(scaled_features, metric='mahalanobis', VI=inv_cov_matrix)

# Convert the distance vector into a square matrix
distance_matrix = squareform(mahalanobis_distances)

# Apply hierarchical clustering using the Ward method
linked = sch.linkage(distance_matrix, method='ward')

# Generate the dendrogram to visualize the clusters
plt.figure(figsize=(12, 6))
sch.dendrogram(linked, labels=grouped.index.tolist(), leaf_rotation=90, leaf_font_size=8)
plt.title("Dendrogram based on mean and standard deviation with Mahalanobis distance")
plt.xlabel("Grasps")
plt.ylabel("Mahalanobis Distance") 
plt.show()
