# NinaPro DB - 52-Class EMG Analysis (Movement Classes Only)

This notebook provides a comprehensive analysis of the NinaPro Database (DB) EMG dataset, combining all three exercises (E1, E2, E3) across 10 subjects to create a unified 52-class movement classification dataset.

**Note: The rest class has been completely removed from this analysis.**

## Dataset Overview
- **Database**: NinaPro DB
- **Subjects**: 10 subjects (S1-S10)
- **Exercises**: 3 exercises per subject
  - E1: Basic finger movements (12 classes → Classes 0-11)
  - E2: Hand configurations & wrist movements (17 classes → Classes 12-28)  
  - E3: Grasping & functional movements (23 classes → Classes 29-51)
- **Total Classes**: 52 movement classes only (0-51, rest class removed)
- **EMG Channels**: 16 channels per subject

## 1. Load EMG Data from NinaPro DB

Load and combine EMG data from all exercises and subjects with proper class relabeling.

In [None]:
import scipy.io as sio
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import pandas as pd

# Load all three exercise files for all 10 subjects
base_path = r'C:\Users\mspan\Documents\Code\Bachelor\NinaPro5'
exercises = ['E1', 'E2', 'E3']
subjects = range(1, 11)  # Subjects 1 to 10

print("NinaPro DB - Combining All Exercises for All Subjects (52 Classes)")
print("Loading all exercise files for all subjects...")
print()

all_subjects_data = {}

# Load data from all subjects and exercises
for subject in subjects:
    subject_data = {}
    for exercise in exercises:
        file_path = f"{base_path}\\s{subject}\\S{subject}_{exercise}_A1.mat"
        try:
            data = sio.loadmat(file_path)
            subject_data[exercise] = data
            print(f"Loaded S{subject}_{exercise}")
        except FileNotFoundError:
            print(f"File not found: {file_path}")
            continue
    
    if subject_data:  # Only add if at least some data is loaded
        all_subjects_data[f'S{subject}'] = subject_data

print(f"\nLoaded data for {len(all_subjects_data)} subjects")
print()

# Combine all exercises for all subjects with proper class relabeling
print("Combining All Exercises For All Subjects")

# Strategy: Relabel classes to be sequential across exercises
# E1: Classes 1-12 → Keep as 1-12
# E2: Classes 1-17 → Relabel as 13-29 (12 + 1-17)
# E3: Classes 1-23 → Relabel as 30-52 (29 + 1-23)

combined_emg_all_subjects = []
combined_labels_all_subjects = []

for subject_id, subject_data in all_subjects_data.items():
    print(f"\nProcessing {subject_id}:")
    
    combined_emg_subject = []
    combined_labels_subject = []
    class_offset = 0
    
    for exercise in exercises:
        if exercise not in subject_data:
            continue
            
        data = subject_data[exercise]
        emg = data['emg']
        restimulus = data['restimulus'].flatten()
        
        # Get unique movements for this exercise
        unique_movements = np.unique(restimulus)
        movement_classes = unique_movements[unique_movements != 0]
        
        print(f"  {exercise}: {len(movement_classes)} movement classes")
        
        # Create relabeled stimulus
        relabeled_stimulus = restimulus.copy()
        
        # Relabel movement classes (keep rest as 0)
        for i, original_class in enumerate(movement_classes):
            new_class = class_offset + i + 1
            relabeled_stimulus[restimulus == original_class] = new_class
        
        # Update offset for next exercise
        class_offset += len(movement_classes)
        
        # Add to subject's combined data
        combined_emg_subject.append(emg)
        combined_labels_subject.append(relabeled_stimulus)
    
    # Concatenate this subject's data
    if combined_emg_subject:
        subject_emg_combined = np.vstack(combined_emg_subject)
        subject_labels_combined = np.concatenate(combined_labels_subject)
        
        combined_emg_all_subjects.append(subject_emg_combined)
        combined_labels_all_subjects.append(subject_labels_combined)
        
        print(f"  Combined shape: EMG {subject_emg_combined.shape}, Labels {subject_labels_combined.shape}")

# Final concatenation across all subjects
print(f"\nFinal Combined Dataset")
combined_emg_array_with_rest = np.vstack(combined_emg_all_subjects)
combined_labels_array_with_rest = np.concatenate(combined_labels_all_subjects)

print(f"Combined EMG shape (with rest): {combined_emg_array_with_rest.shape}")
print(f"Combined labels shape (with rest): {combined_labels_array_with_rest.shape}")

# Remove rest class (class 0) from the dataset
print(f"\nRemoving rest class")
mask = combined_labels_array_with_rest != 0
combined_emg_array = combined_emg_array_with_rest[mask]
combined_labels_array_no_rest = combined_labels_array_with_rest[mask]

# Relabel classes to be consecutive (1-52 becomes 0-51)
unique_classes_no_rest = np.unique(combined_labels_array_no_rest)
label_mapping = {old_class: new_class for new_class, old_class in enumerate(unique_classes_no_rest)}

combined_labels_array = np.array([label_mapping[label] for label in combined_labels_array_no_rest])

print(f"After removing rest class:")
print(f"  EMG shape: {combined_emg_array.shape}")
print(f"  Labels shape: {combined_labels_array.shape}")
print(f"  Original rest samples removed: {np.sum(combined_labels_array_with_rest == 0):,}")

# Analyze final class distribution
unique_final_classes = np.unique(combined_labels_array)
movement_classes_final = unique_final_classes

print(f"\nFinal class analysis (movement classes only):")
print(f"Total movement classes: {len(movement_classes_final)} (classes 0-{max(movement_classes_final)})")
print(f"Class range: {min(unique_final_classes)} to {max(unique_final_classes)}")

# Count samples per class (summary)
print(f"\nClass distribution summary:")
movement_counts = []
for class_id in movement_classes_final:
    count = np.sum(combined_labels_array == class_id)
    movement_counts.append(count)

print(f"  Movement classes 0-51: {min(movement_counts):,} to {max(movement_counts):,} samples each")
print(f"  Total movement samples: {sum(movement_counts):,}")
print(f"  Total samples: {len(combined_labels_array):,}")

print(f"\nCombined dataset ready in variables 'combined_emg_array' and 'combined_labels_array'")
print(f"Note: Rest class has been completely removed from the dataset")

## 2. Explore Dataset Structure

Examine the shape, data types, and basic statistics of the combined EMG array and labels.

In [None]:
# Basic dataset information
print("Dataset Structure Analysis")
print(f"EMG Data Shape: {combined_emg_array.shape}")
print(f"  - Total samples: {combined_emg_array.shape[0]:,}")
print(f"  - EMG channels: {combined_emg_array.shape[1]}")
print(f"Data type: {combined_emg_array.dtype}")
print(f"Memory usage: {combined_emg_array.nbytes / (1024**3):.2f} GB")

print(f"\nLabels Shape: {combined_labels_array.shape}")
print(f"Labels data type: {combined_labels_array.dtype}")
print(f"Unique labels: {len(np.unique(combined_labels_array))}")

# EMG signal statistics
print("\nEmg Signal Statistics")
print(f"Min value: {combined_emg_array.min():.3f}")
print(f"Max value: {combined_emg_array.max():.3f}")
print(f"Mean value: {combined_emg_array.mean():.3f}")
print(f"Std deviation: {combined_emg_array.std():.3f}")

# Per-channel statistics
print("\nPer-Channel EMG Statistics")
for channel in range(combined_emg_array.shape[1]):
    channel_data = combined_emg_array[:, channel]
    print(f"Channel {channel+1:2d}: Mean={channel_data.mean():8.3f}, Std={channel_data.std():8.3f}, "
          f"Min={channel_data.min():8.3f}, Max={channel_data.max():8.3f}")

# Class distribution
print("\nClass Distribution")
class_counts = Counter(combined_labels_array)
for class_id in sorted(class_counts.keys()):
    count = class_counts[class_id]
    percentage = (count / len(combined_labels_array)) * 100
    print(f"Class {class_id:2d} (Movement {class_id+1:2d}): {count:7,} samples ({percentage:5.2f}%)")

## 3. Visualize Class Distribution

Create visualizations showing the distribution of samples across all 52 movement classes (rest class has been removed).

In [None]:
# Create exercise distribution visualization (MOVEMENT CLASSES ONLY)
fig, ax = plt.subplots(figsize=(12, 8))

# Get class counts and set up data
class_counts = Counter(combined_labels_array)
classes = sorted(class_counts.keys())
counts = [class_counts[c] for c in classes]

# Exercise-wise distribution (updated ranges for 0-based indexing) in shades of grey
exercise_colors = ['#2d2d2d', '#6d6d6d', '#a8a8a8']  # Dark grey, medium grey, light grey
exercise_labels = ['E1 (Classes 0-11)', 'E2 (Classes 12-28)', 'E3 (Classes 29-51)']
exercise_ranges = [(0, 11), (12, 28), (29, 51)]

for i, (start, end) in enumerate(exercise_ranges):
    ex_classes = [c for c in classes if start <= c <= end]
    ex_counts = [class_counts[c] for c in ex_classes]
    ax.bar(ex_classes, ex_counts, alpha=0.8, color=exercise_colors[i], 
           label=exercise_labels[i], edgecolor='black', linewidth=0.5)

ax.set_title('Sample Distribution by Class (Movement Classes Only)', fontsize=16, fontweight='bold')
ax.set_xlabel('Class ID (0-51)', fontsize=14)
ax.set_ylabel('Number of Samples', fontsize=14)
ax.legend(fontsize=12)
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print detailed statistics
print("Class Distribution Statistics")
print(f"Total movement classes: {len(classes)}")
print(f"Movement samples: {sum(movement_counts):,} (100.0%)")
print(f"\nMovement class statistics:")
print(f"  Mean samples per class: {np.mean(movement_counts):.1f}")
print(f"  Std deviation: {np.std(movement_counts):.1f}")
print(f"  Min samples: {min(movement_counts):,}")
print(f"  Max samples: {max(movement_counts):,}")
print(f"  Range: {max(movement_counts) - min(movement_counts):,}")

# Check class balance
cv = np.std(movement_counts) / np.mean(movement_counts)
print(f"  Coefficient of variation: {cv:.3f}")
if cv < 0.1:
    print("  → Classes are well balanced")
elif cv < 0.3:
    print("  → Classes are moderately balanced")
else:
    print("  → Classes are imbalanced")

In [None]:
# Verify dataset balance after removing rest class

print("Dataset Balance Verification (Only Movement Classes)")
print("")

# Get class counts
class_counts = Counter(combined_labels_array)
movement_classes = list(class_counts.keys())
movement_samples = [class_counts[c] for c in movement_classes]

# Calculate statistics
avg_movement_samples = np.mean(movement_samples)
total_movement_samples = sum(movement_samples)

print(f"Total movement classes: {len(movement_classes)}")
print(f"Average samples per movement class: {avg_movement_samples:.1f}")
print(f"Total movement samples: {total_movement_samples:,}")
print(f"")
print(f"Class balance statistics:")
print(f"  Min samples: {min(movement_samples):,}")
print(f"  Max samples: {max(movement_samples):,}")
print(f"  Standard deviation: {np.std(movement_samples):.1f}")
print(f"  Coefficient of variation: {np.std(movement_samples)/avg_movement_samples:.3f}")
print(f"")
print(f"Rest class successfully removed")
print(f"No more class imbalance issues from dominant rest class")
print(f"All classes are now movement/gesture classes")

In [None]:
# Examine how classes are organized in the data
print("Data Organization Analysis")
print("")

# Check if all samples of each class are grouped together or mixed by subject
print("\n1. Checking class organization:")
print("   Are all samples of class 1 stored together, or mixed by subject?")

# Look at the first 10,000 samples to see the pattern
sample_size = 10000
sample_labels = combined_labels_array[:sample_size]
print(f"\nFirst {sample_size} labels preview:")
print(f"Labels: {sample_labels[:50]}...")

# Count transitions between classes
transitions = 0
for i in range(1, sample_size):
    if sample_labels[i] != sample_labels[i-1]:
        transitions += 1

print(f"\nClass transitions in first {sample_size} samples: {transitions}")
if transitions > sample_size * 0.1:  # More than 10% transitions
    print("→ MIXED: Classes are mixed/interleaved (not grouped by class)")
else:
    print("→ GROUPED: Classes appear to be grouped together")

# Analyze the full dataset structure
print(f"\n2. Full data analysis:")
total_transitions = 0
for i in range(1, len(combined_labels_array)):
    if combined_labels_array[i] != combined_labels_array[i-1]:
        total_transitions += 1

print(f"Total class transitions in full dataset: {total_transitions:,}")
print(f"Total samples: {len(combined_labels_array):,}")
print(f"Transition rate: {total_transitions/len(combined_labels_array)*100:.2f}%")

if total_transitions > len(combined_labels_array) * 0.05:  # More than 5% transitions
    print("→ MIXED: Data is organized by SUBJECT/TIME, not by class")
    print("   (This is the correct format for time-series EMG data)")
else:
    print("→ GROUPED: Data is organized by CLASS")
    print("   (This would be unusual for EMG time-series data)")

# Check the pattern for a specific class
target_class = 0
class_indices = np.where(combined_labels_array == target_class)[0]
consecutive_blocks = []
block_start = class_indices[0]
block_length = 1

for i in range(1, len(class_indices)):
    if class_indices[i] == class_indices[i-1] + 1:  # Consecutive
        block_length += 1
    else:  # Gap found
        consecutive_blocks.append(block_length)
        block_start = class_indices[i]
        block_length = 1
consecutive_blocks.append(block_length)  # Add the last block

print(f"\n3. CLASS {target_class} DISTRIBUTION ANALYSIS:")
print(f"Total occurrences of class {target_class}: {len(class_indices):,}")
print(f"Number of separate blocks: {len(consecutive_blocks)}")
print(f"Block sizes: min={min(consecutive_blocks)}, max={max(consecutive_blocks)}, avg={np.mean(consecutive_blocks):.1f}")

if len(consecutive_blocks) > 20:  # Many separate blocks
    print(f"→ Class {target_class} appears in {len(consecutive_blocks)} separate time segments")
    print("→ This indicates data is organized by SUBJECT/TIME (correct for EMG)")
else:
    print(f"→ Class {target_class} appears in few large blocks")
    print("→ This would indicate data is organized by CLASS (unusual for EMG)")

In [None]:
# Detailed analysis of data organization by subject and exercise
print("\nDetailed Data Organization Analysis")
print("")

# Recreate the subject-wise organization to understand the structure
print("\n4. How the data was originally combined:")
print("   Understanding the concatenation process...")

# Let's look at how data was combined from the loading process
print("\nOriginal data combination process:")
print("1. For each subject (S1-S10):")
print("   - Load E1, E2, E3 exercises")
print("   - Concatenate all exercises for that subject")
print("   - Add subject's combined data to final array")
print("2. Final step: Concatenate all subjects vertically")

# This means the data structure is:
# [S1_E1+E2+E3] + [S2_E1+E2+E3] + [S3_E1+E2+E3] + ... + [S10_E1+E2+E3]

print(f"\n5. Actual Data Organization:")
print(f"The data is organized as:")
print(f"[Subject 1: E1+E2+E3] → [Subject 2: E1+E2+E3] → ... → [Subject 10: E1+E2+E3]")

# Let's verify this by checking the 10 blocks for class 0
print(f"\n6. Verification - Class 0 Blocks:")
target_class = 0
class_indices = np.where(combined_labels_array == target_class)[0]

# Find block boundaries
block_starts = [class_indices[0]]
block_ends = []
current_block_start = class_indices[0]

for i in range(1, len(class_indices)):
    if class_indices[i] != class_indices[i-1] + 1:  # Gap found
        block_ends.append(class_indices[i-1])
        block_starts.append(class_indices[i])
        current_block_start = class_indices[i]
block_ends.append(class_indices[-1])

print(f"Class {target_class} appears in {len(block_starts)} blocks:")
for i, (start, end) in enumerate(zip(block_starts, block_ends)):
    block_size = end - start + 1
    print(f"  Block {i+1}: positions {start:7,} to {end:7,} (size: {block_size:5,})")

print(f"\n7. Conclusion:")
print(f"Each class appears in exactly 10 blocks = 10 subjects")
print(f"Within each subject, classes are grouped by exercise (E1→E2→E3)")
print(f"Data organization: BY SUBJECT, then BY EXERCISE, then BY TIME")
print(f"This is the CORRECT format for EMG time-series data")

print(f"\nData flow:")
print(f"Subject 1: [E1 classes 0-11] → [E2 classes 12-28] → [E3 classes 29-51]")
print(f"Subject 2: [E1 classes 0-11] → [E2 classes 12-28] → [E3 classes 29-51]") 
print(f"...")
print(f"Subject 10: [E1 classes 0-11] → [E2 classes 12-28] → [E3 classes 29-51]")

print(f"\n8. Implications for machine learning:")
print(f"Temporal structure is preserved within subjects")
print(f"Each class appears across all 10 subjects")
print(f"Split-first approach will properly separate subjects")
print(f"No risk of data leakage between train/test splits")

## 5. Stratified K-Fold Cross-Validation Setup

Implement stratified k-fold cross-validation to ensure robust model evaluation across all 52 movement classes while maintaining proper class distribution in each fold.

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight
import warnings
warnings.filterwarnings('ignore')

print("Stratified K-Fold Cross-Validation Setup")
print("")

# Configuration
N_FOLDS = 5
RANDOM_STATE = 42
WINDOW_SIZE = 200
OVERLAP_RATIO = 0.5

print(f"Configuration:")
print(f"  Number of folds: {N_FOLDS}")
print(f"  Window size: {WINDOW_SIZE}")
print(f"  Overlap ratio: {OVERLAP_RATIO}")
print(f"  Random state: {RANDOM_STATE}")

# Initialize stratified k-fold
skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDOM_STATE)

print(f"\nDataset overview:")
print(f"  Total samples: {len(combined_emg_array):,}")
print(f"  EMG channels: {combined_emg_array.shape[1]}")
print(f"  Number of classes: {len(np.unique(combined_labels_array))}")
print(f"  Classes range: {min(combined_labels_array)} to {max(combined_labels_array)}")

# Check class distribution before splitting
class_counts = Counter(combined_labels_array)
print(f"\nClass distribution before splitting:")
print(f"  Min samples per class: {min(class_counts.values()):,}")
print(f"  Max samples per class: {max(class_counts.values()):,}")
print(f"  Average samples per class: {np.mean(list(class_counts.values())):.1f}")

# Verify that stratification will work
min_class_count = min(class_counts.values())
if min_class_count < N_FOLDS:
    print(f"\nWARNING: Smallest class has only {min_class_count} samples")
    print(f"   This is less than {N_FOLDS} folds - stratification may fail!")
else:
    print(f"\nAll classes have sufficient samples for {N_FOLDS}-fold stratification")

print(f"\nExpected samples per fold: ~{len(combined_labels_array) // N_FOLDS:,}")
print(f"Expected samples per class per fold: ~{min_class_count // N_FOLDS}")

In [None]:
def create_overlapping_windows(X, y, window_size, overlap_ratio, label_consistency_threshold=0.8):
    """
    Create overlapping windows from time series data with label consistency checking.
    
    Parameters:
    -----------
    X : numpy.ndarray
        Input data with shape (n_samples, n_features)
    y : numpy.ndarray
        Labels with shape (n_samples,)
    window_size : int
        Size of each window
    overlap_ratio : float
        Overlap between windows (0.0 to 1.0)
    label_consistency_threshold : float
        Minimum ratio of consistent labels required in a window
        
    Returns:
    --------
    windows : numpy.ndarray
        Windowed data with shape (n_windows, window_size, n_features)
    window_labels : numpy.ndarray
        Labels for each window
    """
    
    # Calculate step size based on overlap
    step_size = int(window_size * (1 - overlap_ratio))
    
    windows = []
    window_labels = []
    
    # Create windows
    for start in range(0, len(X) - window_size + 1, step_size):
        end = start + window_size
        
        # Extract window
        window_data = X[start:end]
        window_label_sequence = y[start:end]
        
        # Determine the window label using majority vote
        unique_labels, counts = np.unique(window_label_sequence, return_counts=True)
        majority_label = unique_labels[np.argmax(counts)]
        
        # Check label consistency
        consistency = np.sum(window_label_sequence == majority_label) / len(window_label_sequence)
        
        # Only keep windows with sufficient label consistency
        if consistency >= label_consistency_threshold:
            windows.append(window_data)
            window_labels.append(majority_label)
    
    return np.array(windows), np.array(window_labels)

print("Windowing function defined")
print("  - Creates overlapping windows from time series")
print("  - Uses majority vote for window labels")
print("  - Filters out inconsistent windows (>80% consistency required)")

In [None]:
N_AUGMENT = 2  # Number of augmentations per original set

def augment_emg_windows(X, noise_pct=0.05, scale_range=(0.9, 1.1), jitter_max=0.05):
    # Calculate standard deviation per channel in the window
    signal_std = np.std(X, axis=(0, 1), keepdims=True)
    noise_std = signal_std * noise_pct
    # Add Gaussian noise relative to signal std
    X_aug = X + np.random.normal(0, noise_std, X.shape)
    # Random scaling
    scale = np.random.uniform(scale_range[0], scale_range[1], (X.shape[0], 1, 1))
    X_aug = X_aug * scale
    # Jitter
    jitter = np.random.uniform(-jitter_max, jitter_max, X.shape)
    X_aug = X_aug + jitter
    return X_aug

# Store all fold data
kfold_data = []
fold_statistics = []

for fold_idx, (train_indices, test_indices) in enumerate(skf.split(combined_emg_array, combined_labels_array)):
    print(f"\nProcessing Fold {fold_idx + 1}/{N_FOLDS}")
    
    # Split data
    X_train_raw = combined_emg_array[train_indices]
    y_train_raw = combined_labels_array[train_indices]
    X_test_raw = combined_emg_array[test_indices]
    y_test_raw = combined_labels_array[test_indices]
    
    print(f"  Raw split - Train: {len(X_train_raw):,}, Test: {len(X_test_raw):,}")
    
    # Create windowed data
    print("  Creating windowed data")
    X_train_windowed, y_train_windowed = create_overlapping_windows(
        X_train_raw, y_train_raw, WINDOW_SIZE, OVERLAP_RATIO
    )
    X_test_windowed, y_test_windowed = create_overlapping_windows(
        X_test_raw, y_test_raw, WINDOW_SIZE, OVERLAP_RATIO
    )
    
    # Augmentiere Trainingsdaten mehrfach und hänge sie an die Originaldaten an
    X_augmented_list = [X_train_windowed]
    y_augmented_list = [y_train_windowed]
    for _ in range(N_AUGMENT):
        X_aug = augment_emg_windows(X_train_windowed)
        X_augmented_list.append(X_aug)
        y_augmented_list.append(y_train_windowed)  # Labels bleiben gleich

    X_train_windowed = np.concatenate(X_augmented_list, axis=0)
    y_train_windowed = np.concatenate(y_augmented_list, axis=0)

    print(f"  Windowed data - Train: {len(X_train_windowed):,}, Test: {len(X_test_windowed):,}")
    
    # Check class distribution in windowed data
    train_classes = np.unique(y_train_windowed)
    test_classes = np.unique(y_test_windowed)
    
    if len(train_classes) != 52 or len(test_classes) != 52:
        print(f"  Warning: Missing classes - Train: {len(train_classes)}/52, Test: {len(test_classes)}/52")
    else:
        print(f"  All 52 classes present in both train and test sets")
    
    # Normalize data (fit on train, transform both)
    scaler = StandardScaler()
    
    # Reshape for normalization (flatten windows)
    X_train_flat = X_train_windowed.reshape(-1, X_train_windowed.shape[-1])
    X_test_flat = X_test_windowed.reshape(-1, X_test_windowed.shape[-1])
    
    # Fit scaler on training data only
    scaler.fit(X_train_flat)
    
    # Transform both sets
    X_train_normalized = scaler.transform(X_train_flat).reshape(X_train_windowed.shape)
    X_test_normalized = scaler.transform(X_test_flat).reshape(X_test_windowed.shape)
    
    # Compute class weights for this fold
    class_weights = compute_class_weight(
        'balanced', 
        classes=np.unique(y_train_windowed), 
        y=y_train_windowed
    )
    class_weight_dict = dict(zip(np.unique(y_train_windowed), class_weights))
    
    # Store fold data
    fold_data = {
        'fold_idx': fold_idx,
        'X_train': X_train_normalized,
        'y_train': y_train_windowed,
        'X_test': X_test_normalized,
        'y_test': y_test_windowed,
        'scaler': scaler,
        'class_weights': class_weight_dict,
        'train_indices': train_indices,
        'test_indices': test_indices
    }
    kfold_data.append(fold_data)
    
    # Calculate statistics
    train_class_counts = Counter(y_train_windowed)
    test_class_counts = Counter(y_test_windowed)
    
    fold_stats = {
        'fold': fold_idx + 1,
        'train_samples': len(X_train_normalized),
        'test_samples': len(X_test_normalized),
        'train_classes': len(train_class_counts),
        'test_classes': len(test_class_counts),
        'train_min_class': min(train_class_counts.values()) if train_class_counts else 0,
        'train_max_class': max(train_class_counts.values()) if train_class_counts else 0,
        'test_min_class': min(test_class_counts.values()) if test_class_counts else 0,
        'test_max_class': max(test_class_counts.values()) if test_class_counts else 0
    }
    fold_statistics.append(fold_stats)
    
    print(f"  Fold {fold_idx + 1} processed successfully")

print(f"\nAll {N_FOLDS} Folds Created Successfully")
print(f"Total folds with complete data: {len(kfold_data)}")

In [None]:
# Display fold statistics
print("\nK-Fold Statistics Summary")
print("")

# Create DataFrame for better visualization
import pandas as pd
df_stats = pd.DataFrame(fold_statistics)

print("Per-fold statistics:")
print(df_stats.to_string(index=False))

# Overall statistics
print(f"\nOverall K-Fold Statistics:")
print(f"  Average train samples per fold: {df_stats['train_samples'].mean():.0f}")
print(f"  Average test samples per fold: {df_stats['test_samples'].mean():.0f}")
print(f"  Train/test ratio: {df_stats['train_samples'].mean() / df_stats['test_samples'].mean():.2f}")

print(f"\nClass distribution consistency:")
print(f"  All folds have all 52 classes: {all(df_stats['train_classes'] == 52) and all(df_stats['test_classes'] == 52)}")
print(f"  Train set class balance (min-max per fold):")
for i, stats in enumerate(fold_statistics):
    ratio = stats['train_max_class'] / stats['train_min_class'] if stats['train_min_class'] > 0 else float('inf')
    print(f"    Fold {i+1}: {stats['train_min_class']}-{stats['train_max_class']} samples (ratio: {ratio:.2f})")

#Sample Distribution Across K-Folds
fig1, ax1 = plt.subplots(figsize=(8, 6))
folds = df_stats['fold']
ax1.bar(folds, df_stats['train_samples'], alpha=0.7, label='Train', color='#2d2d2d')
ax1.bar(folds, df_stats['test_samples'], alpha=0.7, label='Test', color="#a8a8a8")
ax1.set_xlabel('Fold')
ax1.set_ylabel('Number of Samples')
ax1.set_title('Sample Distribution Across K-Folds')
ax1.legend()
ax1.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

#Class Balance Within Training Folds
fig2, ax2 = plt.subplots(figsize=(8, 6))
ax2.bar(folds, df_stats['train_min_class'], alpha=0.7, label='Min class (train)', color='#2d2d2d')
ax2.bar(folds, df_stats['train_max_class'], alpha=0.7, label='Max class (train)', color='#a8a8a8')
ax2.set_xlabel('Fold')
ax2.set_ylabel('Samples per Class')
ax2.set_title('Class Balance Within Training Folds')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\nSTRATIFIED K-FOLD SETUP COMPLETE")
print(f"Ready for cross-validation training with {N_FOLDS} folds")
print(f"Data stored in 'kfold_data' list - each fold contains:")
print(f"  - Preprocessed and normalized training/test data")
print(f"  - Windowed EMG signals ({WINDOW_SIZE} timesteps)")
print(f"  - Balanced class weights")
print(f"  - Individual scalers fitted on training data only")

In [None]:
# K-FOLD DATA VALIDATION - Are the folds actually different?
print("\nK-Fold Data Validation")
print("")

print("Detailed Fold Analysis:")
print("")

# Check if the issue is visual or actual data duplication
print("1. Checking sample distribution differences:")

for i in range(N_FOLDS):
    fold_data = kfold_data[i]
    train_classes = np.unique(fold_data['y_train'], return_counts=True)
    test_classes = np.unique(fold_data['y_test'], return_counts=True)
    
    print(f"\nFold {i+1}:")
    print(f"  Train samples: {len(fold_data['X_train']):,}")
    print(f"  Test samples: {len(fold_data['X_test']):,}")
    print(f"  Train classes present: {len(train_classes[0])}")
    print(f"  Test classes present: {len(test_classes[0])}")
    
    # Show first few class counts to check for differences
    train_counts_dict = dict(zip(train_classes[0], train_classes[1]))
    test_counts_dict = dict(zip(test_classes[0], test_classes[1]))
    
    # Sample a few classes to show variation
    sample_classes = [0, 1, 2, 3, 4]
    train_sample_counts = [train_counts_dict.get(c, 0) for c in sample_classes]
    test_sample_counts = [test_counts_dict.get(c, 0) for c in sample_classes]
    
    print(f"  Sample class counts (classes 0-4):")
    print(f"    Train: {train_sample_counts}")
    print(f"    Test:  {test_sample_counts}")

print(f"\n2. Checking if train/test indices overlap between folds:")

# Check if any fold has overlapping indices
index_overlaps = []
for i in range(N_FOLDS):
    for j in range(i+1, N_FOLDS):
        fold_i_train = set(kfold_data[i]['train_indices'])
        fold_j_train = set(kfold_data[j]['train_indices'])
        
        overlap = len(fold_i_train.intersection(fold_j_train))
        total_unique = len(fold_i_train.union(fold_j_train))
        overlap_percent = (overlap / total_unique) * 100 if total_unique > 0 else 0
        
        index_overlaps.append((i+1, j+1, overlap, overlap_percent))

# Show overlap analysis
print("\nTraining index overlaps between folds:")
for fold_i, fold_j, overlap, overlap_percent in index_overlaps[:5]:  # Show first 5
    print(f"  Fold {fold_i} vs Fold {fold_j}: {overlap:,} overlapping indices ({overlap_percent:.1f}%)")

if any(overlap > 0 for _, _, overlap, _ in index_overlaps):
    print("  WARNING: Some training indices overlap between folds!")
else:
    print("  Good: No training indices overlap between folds")

print(f"\n3. Statistical differences in class distributions:")

# Create a more detailed statistical comparison
class_distribution_matrix = np.zeros((N_FOLDS, 52))

for i in range(N_FOLDS):
    fold_data = kfold_data[i]
    train_classes, train_counts = np.unique(fold_data['y_train'], return_counts=True)
    
    # Fill in the counts for this fold
    for class_id, count in zip(train_classes, train_counts):
        class_distribution_matrix[i, class_id] = count

# Calculate statistics
class_means = np.mean(class_distribution_matrix, axis=0)
class_stds = np.std(class_distribution_matrix, axis=0)
class_cvs = class_stds / (class_means + 1e-8)  # Coefficient of variation

print(f"Class distribution statistics across folds:")
print(f"  Classes with highest variation (CV > 0.1):")
high_var_classes = np.where(class_cvs > 0.1)[0]
for class_id in high_var_classes[:10]:  # Show first 10
    print(f"    Class {class_id:2d}: mean={class_means[class_id]:6.1f}, std={class_stds[class_id]:6.1f}, CV={class_cvs[class_id]:.3f}")

if len(high_var_classes) == 0:
    print("    All classes have very similar distributions across folds (CV < 0.1)")

print(f"\n4. Are the bar plots misleading?")

# Check if the visual similarity is due to scale issues
train_sample_differences = []
test_sample_differences = []

for i in range(N_FOLDS):
    train_samples = len(kfold_data[i]['X_train'])
    test_samples = len(kfold_data[i]['X_test'])
    train_sample_differences.append(train_samples)
    test_sample_differences.append(test_samples)

train_range = max(train_sample_differences) - min(train_sample_differences)
test_range = max(test_sample_differences) - min(test_sample_differences)
train_cv = np.std(train_sample_differences) / np.mean(train_sample_differences)
test_cv = np.std(test_sample_differences) / np.mean(test_sample_differences)

print(f"\nSample count analysis:")
print(f"  Training samples per fold: {train_sample_differences}")
print(f"  Test samples per fold: {test_sample_differences}")
print(f"  Train sample range: {train_range:,} samples")
print(f"  Test sample range: {test_range:,} samples")
print(f"  Train CV: {train_cv:.4f}")
print(f"  Test CV: {test_cv:.4f}")

if train_cv < 0.01 and test_cv < 0.01:
    print(f"\nConclusion:")
    print(f"="*15)
    print("The k-folds ARE actually very similar in size!")
    print("   This is EXPECTED with stratified k-fold cross-validation.")
    print("   - Stratified k-fold ensures each fold has similar class proportions")
    print("   - This leads to very similar fold sizes")
    print("   - The plot is NOT misleading - the folds are genuinely similar in size")
    print("\nWhat makes folds different:")
    print("   - Different samples (indices) are in each fold")
    print("   - Same proportion of each class, but different actual samples")
    print("   - This provides different training/validation sets for robust evaluation")
else:
    print(f"\nConclusion:")
    print(f"="*15)
    print("The folds show more variation than expected for stratified k-fold")

print(f"\n5. Sample identity verification:")

# Check if the actual data samples are different
fold_0_train = kfold_data[0]['X_train'][:5]  # First 5 samples from fold 0
fold_1_train = kfold_data[1]['X_train'][:5]  # First 5 samples from fold 1

# Check if any samples are identical
identical_samples = 0
for i in range(5):
    for j in range(5):
        if np.allclose(fold_0_train[i], fold_1_train[j], rtol=1e-10):
            identical_samples += 1

print(f"Sample comparison between Fold 1 and Fold 2:")
print(f"  Identical samples found in first 5: {identical_samples}/25 comparisons")

if identical_samples == 0:
    print("  Different samples in different folds")
else:
    print("  Some identical samples found across folds")

In [None]:
# IMPROVED K-FOLD VISUALIZATION - Better way to show fold differences
print("\nImproved K-Fold Visualization")
print("")

# Sample distribution with better scale
fig1, ax1 = plt.subplots(figsize=(8, 6))
folds = df_stats['fold']
train_samples = df_stats['train_samples']
test_samples = df_stats['test_samples']

bar_width = 0.35
x = np.arange(len(folds))

bars1 = ax1.bar(x - bar_width/2, train_samples, width=bar_width, alpha=0.7, label='Train', color='#2d2d2d', edgecolor='#2d2d2d')
bars2 = ax1.bar(x + bar_width/2, test_samples, width=bar_width, alpha=0.7, label='Test', color='#a8a8a8', edgecolor='#a8a8a8')

for i, (train, test) in enumerate(zip(train_samples, test_samples)):
    ax1.text(x[i] - bar_width/2, train + max(train_samples)*0.01, f'{train:,}', ha='center', va='bottom', fontsize=9, fontweight='bold')
    ax1.text(x[i] + bar_width/2, test + max(test_samples)*0.01, f'{test:,}', ha='center', va='bottom', fontsize=9, fontweight='bold')

ax1.set_xlabel('Fold')
ax1.set_ylabel('Number of Samples')
ax1.set_title('Sample Distribution')
ax1.set_xticks(x)
ax1.set_xticklabels(folds)
ax1.legend()
ax1.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# Relative differences from mean
fig2, ax2 = plt.subplots(figsize=(8, 6))
train_mean = np.mean(train_samples)
test_mean = np.mean(test_samples)

train_relative = [(x - train_mean) / train_mean * 100 for x in train_samples]
test_relative = [(x - test_mean) / test_mean * 100 for x in test_samples]

ax2.bar(folds, train_relative, alpha=0.7, label='Train difference from mean', color='#2d2d2d')
ax2.bar(folds, test_relative, alpha=0.7, label='Test difference from mean', color='#a8a8a8')

ax2.set_xlabel('Fold')
ax2.set_ylabel('Percentage Difference from Mean')
ax2.set_title('Relative Differences Between Folds')
ax2.legend()
ax2.grid(True, alpha=0.3)
ax2.axhline(y=0, color='#888888', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()

#Class distribution heatmap
fig3, ax3 = plt.subplots(figsize=(8, 6))
sample_classes = list(range(0, 52, 3))[:20]  # Every 3rd class
class_data_sample = class_distribution_matrix[:, sample_classes]

# Normalize each class row to show relative distribution across folds (percentages)
class_data_sample_pct = class_data_sample / class_data_sample.sum(axis=0, keepdims=True) * 100

im = ax3.imshow(class_data_sample_pct.T, aspect='auto', cmap='Greys', interpolation='nearest')
ax3.set_xlabel('Fold')
ax3.set_ylabel('Class ID')
ax3.set_title('Class Distribution Heatmap')
ax3.set_xticks(range(N_FOLDS))
ax3.set_xticklabels([f'Fold {i+1}' for i in range(N_FOLDS)])
ax3.set_yticks(range(len(sample_classes)))
ax3.set_yticklabels(sample_classes)

cbar = plt.colorbar(im, ax=ax3, shrink=0.8)
cbar.set_label('Percentage of Samples')
plt.tight_layout()
plt.show()

#Subject distribution per fold
fig4, ax4 = plt.subplots(figsize=(8, 6))
subject_distribution = np.zeros((N_FOLDS, 10))  # Assuming 10 subjects

for fold_idx in range(N_FOLDS):
    train_indices = kfold_data[fold_idx]['train_indices']
    total_samples = len(combined_labels_array)
    samples_per_subject = total_samples // 10
    for idx in train_indices:
        subject = min(9, idx // samples_per_subject)
        subject_distribution[fold_idx, subject] += 1

subject_distribution_pct = subject_distribution / np.sum(subject_distribution, axis=1, keepdims=True) * 100

im2 = ax4.imshow(subject_distribution_pct, aspect='auto', cmap='Greys', interpolation='nearest')
ax4.set_xlabel('Subject')
ax4.set_ylabel('Fold')
ax4.set_title('Subject Data Distribution per Fold')
ax4.set_xticks(range(10))
ax4.set_xticklabels([f'S{i+1}' for i in range(10)])
ax4.set_yticks(range(N_FOLDS))
ax4.set_yticklabels([f'Fold {i+1}' for i in range(N_FOLDS)])

cbar2 = plt.colorbar(im2, ax=ax4, shrink=0.8)
cbar2.set_label('Percentage')
plt.tight_layout()
plt.show()

# Print key statistics summary
print("\nFold Differences Summary:")
print("")
train_cv = np.std(train_samples) / np.mean(train_samples) * 100
test_cv = np.std(test_samples) / np.mean(test_samples) * 100
max_class_cv = np.max(class_cvs) * 100

print(f"Sample Count Variation:")
print(f"• Train CV: {train_cv:.3f}%")
print(f"• Test CV: {test_cv:.3f}%")
print(f"\nClass Distribution:")
print(f"• Max class CV: {max_class_cv:.1f}%")
print(f"• Classes with CV > 10%: {len(high_var_classes)}")
print(f"\nConclusion: The folds ARE different - they contain different samples")
print(f"but maintain similar proportions due to stratified sampling.")

In [None]:
# Demonstrate how to access k-fold data
print("Accessing K-Fold Data")
print("")

# Show structure of the first fold
fold_0 = kfold_data[0]
print(f"Fold 0 structure:")
for key, value in fold_0.items():
    if hasattr(value, 'shape'):
        print(f"  {key}: {type(value).__name__} {value.shape}")
    elif isinstance(value, dict):
        print(f"  {key}: dict with {len(value)} items")
    else:
        print(f"  {key}: {type(value).__name__}")

print(f"\nExample usage:")
print(f"# Access fold 0 training data:")
print(f"X_train = kfold_data[0]['X_train']  # Shape: {fold_0['X_train'].shape}")
print(f"y_train = kfold_data[0]['y_train']  # Shape: {fold_0['y_train'].shape}")
print(f"class_weights = kfold_data[0]['class_weights']  # {len(fold_0['class_weights'])} classes")

print(f"\nData characteristics:")
print(f"  Input shape per sample: (window_size={WINDOW_SIZE}, channels={fold_0['X_train'].shape[2]})")
print(f"  Output classes: {len(np.unique(fold_0['y_train']))} classes (0-51)")
print(f"  Ready for CNN-LSTM model input!")

## Understanding WINDOW_SIZE = 200: How EMG Time Series is Split

This section explains exactly how the windowing process works and how one class from one subject gets split into multiple training samples.

In [None]:
# DEMONSTRATION: How WINDOW_SIZE = 200 Works
print("Windowing Demonstration")
print("")

# Let's analyze one specific class from one subject to show exactly how windowing works
print("1. Original Data Structure")
print("")

# Find a specific example: Class 0 from Subject 1
target_class = 0
class_indices = np.where(combined_labels_array == target_class)[0]

# The first block should be from Subject 1
first_block_start = class_indices[0]
first_block_end = None

# Find where this block ends (where labels change)
for i in range(1, len(class_indices)):
    if class_indices[i] != class_indices[i-1] + 1:  # Gap found
        first_block_end = class_indices[i-1]
        break

if first_block_end is None:  # Last block
    first_block_end = class_indices[-1]

block_size = first_block_end - first_block_start + 1

print(f"Example: Class {target_class} in Subject 1")
print(f"  Original continuous sequence:")
print(f"  - Start position: {first_block_start:,}")
print(f"  - End position: {first_block_end:,}")
print(f"  - Total samples: {block_size:,}")
print(f"  - EMG shape for this class: ({block_size:,}, {combined_emg_array.shape[1]})")

# Extract this specific sequence
class_emg_sequence = combined_emg_array[first_block_start:first_block_end+1]
class_labels_sequence = combined_labels_array[first_block_start:first_block_end+1]

print(f"\n2. Windowing Process")
print("")
print(f"Settings:")
print(f"  - WINDOW_SIZE = {WINDOW_SIZE} samples")
print(f"  - OVERLAP_RATIO = {OVERLAP_RATIO} (50%)")
print(f"  - Step size = {int(WINDOW_SIZE * (1 - OVERLAP_RATIO))} samples")

# Simulate the windowing process on this sequence
step_size = int(WINDOW_SIZE * (1 - OVERLAP_RATIO))
windows_from_this_class = 0
valid_windows = 0

print(f"\nWindowing this class sequence:")
for start in range(0, len(class_emg_sequence) - WINDOW_SIZE + 1, step_size):
    end = start + WINDOW_SIZE
    
    # Extract window
    window_labels = class_labels_sequence[start:end]
    
    # Check label consistency
    window_label = np.bincount(window_labels).argmax()
    label_consistency = np.mean(window_labels == window_label)
    
    windows_from_this_class += 1
    if label_consistency >= 0.8:
        valid_windows += 1
    
    if windows_from_this_class <= 5:  # Show first 5 windows as example
        print(f"  Window {windows_from_this_class}: samples {start:4d}-{end-1:4d}, "
              f"consistency: {label_consistency:.3f}, "
              f"valid: {'yes' if label_consistency >= 0.8 else 'no'}")

print(f"  ...")
print(f"  Total windows created: {windows_from_this_class}")
print(f"  Valid windows (>80% consistency): {valid_windows}")

print(f"\n3. Result")
print("")
print(f"One class sequence of {block_size:,} samples becomes:")
print(f"  → {valid_windows} separate training examples")
print(f"  → Each example has shape: ({WINDOW_SIZE}, {combined_emg_array.shape[1]})")
print(f"  → All examples have the same label: {target_class}")

efficiency = (valid_windows * WINDOW_SIZE) / block_size
print(f"\nData efficiency:")
print(f"  - Original samples used: {block_size:,}")
print(f"  - Window samples generated: {valid_windows * WINDOW_SIZE:,}")
print(f"  - Efficiency ratio: {efficiency:.2f} (due to overlap)")

print(f"\n4. Across All Subject And Classes")
print("")
print(f"This process happens for:")
print(f"  - ALL {len(np.unique(combined_labels_array))} classes")
print(f"  - ALL 10 subjects") 
print(f"  - Total original samples: {len(combined_labels_array):,}")
print(f"  - Total windowed samples in k-fold: ~{sum(len(fold['X_train']) + len(fold['X_test']) for fold in kfold_data):,}")

print(f"\nKey Insights:")
print(f"Yes, ONE class from ONE subject gets split into MULTIPLE training examples!")
print(f"This allows the model to learn from different temporal segments of the same movement.")

In [None]:
print("\nVisual Demonstration of Windowing")
print("")

import matplotlib.pyplot as plt
import numpy as np

sequence_length = 1000  # Show first 1000 samples for visualization
WINDOW_SIZE = 200  # Size of each window
OVERLAP_RATIO = 0.5
step_size = int(WINDOW_SIZE * (1 - OVERLAP_RATIO))

# Create a dummy EMG sequence for visualization
emg_sequence = np.zeros(sequence_length)
window_starts = list(range(0, sequence_length - WINDOW_SIZE + 1, step_size))
window_count = len(window_starts)

fig, ax = plt.subplots(figsize=(15, 2))
ax.plot(np.arange(sequence_length), emg_sequence, color='#bbbbbb', linewidth=2, label='EMG Sequence')

# Plot the first 8 windows
for i, start in enumerate(window_starts[:8]):
    end = start + WINDOW_SIZE
    ax.axvspan(start, end, color='#444444', alpha=0.5, label='Window' if i == 0 else None)
    ax.text((start + end) // 2, 0.05, f'W{i+1}', ha='center', va='bottom', fontsize=9, color='#222222', fontweight='bold')

# Add ellipsis for skipped windows
if window_count > 8:
    ax.text(sequence_length // 2, 0.15, '...', ha='center', va='bottom', fontsize=16, color='#888888')

ax.set_xlim(0, sequence_length)
ax.set_ylim(-0.1, 0.2)
ax.set_xlabel('Sample Index')
ax.set_yticks([])
ax.set_title('Visual Demonstration of Windowing (First 8 Windows)')
ax.legend(loc='upper right')
plt.tight_layout()
plt.show()

In [None]:
# CONCRETE EXAMPLE: Show actual EMG signal windowing
print("Concrete Example of EMG Windowing")
print("")

# Get the actual EMG sequence for Class 0, Subject 1
target_class = 0
class_indices = np.where(combined_labels_array == target_class)[0]
first_block_start = class_indices[0]

# Find the end of the first block
first_block_end = None
for i in range(1, len(class_indices)):
    if class_indices[i] != class_indices[i-1] + 1:
        first_block_end = class_indices[i-1]
        break
if first_block_end is None:
    first_block_end = class_indices[-1]

# Extract actual EMG data
actual_emg = combined_emg_array[first_block_start:first_block_start + 1000]  # First 1000 samples
actual_labels = combined_labels_array[first_block_start:first_block_start + 1000]

print(f"Real EMG data from Class {target_class}, Subject 1:")
print(f"Shape: {actual_emg.shape}")
print(f"Labels: all = {target_class} yes")
print()

# Show first few samples of one EMG channel
channel_0_data = actual_emg[:50, 0]  # First 50 samples, channel 0
print("First 50 EMG samples from Channel 0:")
print("Sample:  " + " ".join([f"{i:4d}" for i in range(0, 50, 5)]))
print("Value:   " + " ".join([f"{val:4.0f}" for val in channel_0_data[::5]]))
print()

# Apply windowing to this data
print("Applying windowing function:")
windowed_emg, windowed_labels = create_overlapping_windows(
    actual_emg, actual_labels, WINDOW_SIZE, OVERLAP_RATIO
)

print(f"Result:")
print(f"  Original shape: {actual_emg.shape}")
print(f"  Windowed shape: {windowed_emg.shape}")
print(f"  Number of windows created: {len(windowed_emg)}")
print(f"  Each window shape: ({WINDOW_SIZE}, {actual_emg.shape[1]})")
print(f"  All window labels: {np.unique(windowed_labels)} (should all be {target_class})")

print("\nExample windows created:")
for i in range(min(3, len(windowed_emg))):
    window_start = i * int(WINDOW_SIZE * (1 - OVERLAP_RATIO))
    window_end = window_start + WINDOW_SIZE
    print(f"  Window {i+1}: samples {window_start:3d}-{window_end-1:3d}, "
          f"shape: {windowed_emg[i].shape}, label: {windowed_labels[i]}")

print(f"\nSummary:")
print(f"One continuous EMG recording gets split into multiple training examples")
print(f"Each window is a complete 200-timestep sequence") 
print(f"Windows overlap by 50% for better coverage")
print(f"All windows from same class have same label")
print(f"Model learns from different temporal segments of same movement")

print(f"\nThis is exactly what happens in the k-fold data!")
print(f"   Each fold contains thousands of these 200-timestep windows")
print(f"   from all 52 classes across all 10 subjects.")

## 6. CNN-LSTM Model Implementation and K-Fold Training

Implement a robust CNN-LSTM architecture for EMG classification and train it using stratified k-fold cross-validation.

In [None]:
# Import necessary libraries for deep learning
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.metrics import SparseCategoricalAccuracy
from tensorflow.keras import callbacks
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.utils import plot_model


print("TensorFlow/Keras imports loaded successfully")
print(f"TensorFlow version: {tf.__version__}")
print(f"GPU available: {tf.config.list_physical_devices('GPU')}")

In [None]:
#inpuut_shape = (600, 12)  # for NinaPro 4
#input_shape = (600, 16)  # for NinaPro 5

def create_cnn_lstm_model(input_shape=(600, 16), num_classes=52, dropout_rate=0.3, model_name="Model_Name"):

    model = models.Sequential([
        layers.Conv1D(128, 15, activation='relu', input_shape=input_shape, kernel_regularizer=l2(0.001)),
        layers.BatchNormalization(),
        layers.MaxPooling1D(2),
        layers.Dropout(dropout_rate),

        layers.Conv1D(256, 9, activation='relu', kernel_regularizer=l2(0.001)),
        layers.BatchNormalization(),
        layers.MaxPooling1D(2),
        layers.Dropout(dropout_rate),

        #layers.Conv1D(128, 3, activation='relu', kernel_regularizer=l2(0.001)),
        #layers.BatchNormalization(),
        #layers.MaxPooling1D(2),
        #layers.Dropout(dropout_rate),

        layers.Bidirectional(layers.LSTM(96, return_sequences=True, kernel_regularizer=l2(0.001), recurrent_regularizer=l2(0.001))),
        layers.Bidirectional(layers.LSTM(64, return_sequences=False, kernel_regularizer=l2(0.001), recurrent_regularizer=l2(0.001))),
        layers.Dropout(dropout_rate),

        layers.Dense(128, activation='relu', kernel_regularizer=l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(dropout_rate),

        layers.Dense(num_classes, activation='softmax')
    ], name=model_name)

    model.compile(
        optimizer=RMSprop(learning_rate=0.001),
        loss=SparseCategoricalCrossentropy(),
        metrics=[SparseCategoricalAccuracy()]
    )

    return model

# Create a test model to show architecture
test_model = create_cnn_lstm_model(model_name="EMG_CNN_LSTM")
test_model.summary()
plot_model(test_model, to_file='cnn_lstm_model.png', show_shapes=True, show_layer_names=True, dpi=120)
print(f"\nModel created successfully")
print(f"Total parameters: {test_model.count_params():,}")

In [None]:
import io

# Save Keras summary as LaTeX verbatim block
with io.StringIO() as buf:
    test_model.summary(print_fn=lambda x: buf.write(x + '\n'))
    summary_str = buf.getvalue()

with open('cnn_lstm_model_summary.tex', 'w', encoding='utf-8') as f:
    f.write('\\begin{verbatim}\n')
    f.write(summary_str)
    f.write('\\end{verbatim}\n')

In [None]:
# Custom callback for enhanced monitoring and clean output

# Global list to track best validation accuracy progression
global_best_progression = []

class CleanTrainingCallback(callbacks.Callback):
    """Custom callback for clean, organized training output"""
    
    def __init__(self, fold_num):
        super().__init__()
        self.fold_num = fold_num
        self.epoch_start_time = 0
        
    def on_train_begin(self, logs=None):
        print(f"\nFold {self.fold_num} Training Started")
        print("=" * 60)
        print(f"{'Epoch':<6} {'Train Loss':<12} {'Train Acc':<12} {'Val Loss':<12} {'Val Acc':<12} {'Time':<8} {'Status'}")
        print("-" * 72)
        
    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = time.time()
        
    def on_epoch_end(self, epoch, logs=None):
        epoch_time = time.time() - self.epoch_start_time
        
        # Get metrics
        train_loss = logs.get('loss', 0)
        train_acc = logs.get('sparse_categorical_accuracy', 0)
        val_loss = logs.get('val_loss', 0)
        val_acc = logs.get('val_sparse_categorical_accuracy', 0)
       
        # Access global best accuracy variable
        global best_global_val_acc, global_best_progression
        
        status = ""
        # Check if this epoch improved the global best accuracy
        if val_acc > best_global_val_acc:
            best_global_val_acc = val_acc  # Update the global best!
            status = "New best accuracy"
            self.model.save('best_CNN_Model.keras')
            
        # Track global best progression (fold, epoch, current_val_acc, global_best_acc)
        global_best_progression.append({
            'fold': self.fold_num,
            'epoch': epoch + 1,
            'val_acc': val_acc,
            'global_best': best_global_val_acc
        })
        
        print(f"{epoch+1:<6} {train_loss:<12.4f} {train_acc:<12.4f} {val_loss:<12.4f} {val_acc:<12.4f} {epoch_time:<8.1f}s {status}")

print("Callbacks and monitoring system loaded")

In [None]:
# Training configuration
EPOCHS = 100
BATCH_SIZE = 96
PATIENCE = 10
LR_PATIENCE = 5  # Patience for learning rate reduction
VALIDATION_SPLIT = 0.15  # Use 15% of training data for validation

print("Training configuration")
print("")
print(f"Epochs: {EPOCHS}")
print(f"Batch size: {BATCH_SIZE}")
print(f"Early stopping patience: {PATIENCE}")
print(f"LR reduction patience: {LR_PATIENCE}")
print(f"Validation split: {VALIDATION_SPLIT}")
print(f"K-folds: {N_FOLDS}")

def create_callbacks(fold_num):
    """Create callbacks for training with enhanced early stopping and model saving"""
    
    print(f"\nCallback configuration for fold {fold_num}")
    print(f"   Early Stopping: Patience = {PATIENCE} epochs")
    print(f"   Learning Rate Reduction: Factor=0.5, patience={LR_PATIENCE} epochs")
    print(f"   Global Best Model: Will be saved automatically when global best is found")
    print()
    
    # Create fresh callback instances for each fold to avoid state carryover
    early_stopping = callbacks.EarlyStopping(
        monitor='val_sparse_categorical_accuracy',
        patience=PATIENCE,  # Use global PATIENCE variable
        restore_best_weights=True,
        verbose=1,  # Show early stopping messages
        mode='max',  # 'max' for accuracy (we want the highest accuracy)
        min_delta=0.0001  # Minimum improvement to count as progress
    )
    
    lr_reducer = callbacks.ReduceLROnPlateau(
        monitor='val_sparse_categorical_accuracy',
        factor=0.5,
        patience=LR_PATIENCE,  # Use global LR_PATIENCE variable
        min_lr=1e-7,
        verbose=1,  # Show LR reduction messages
        mode='max',  # 'max' for accuracy
        min_delta=0.0001
    )
    
    training_callback = CleanTrainingCallback(fold_num=fold_num)
    
    callbacks_list = [
        training_callback,
        early_stopping,
        lr_reducer
    ]
    
    return callbacks_list

In [None]:
import time
# Storage for results
kfold_results = {
    'fold_num': [],
    'train_acc': [],
    'val_acc': [],
    'test_acc': [],
    'train_loss': [],
    'val_loss': [],
    'test_loss': [],
    'training_time': [],
    'epochs_trained': []
}

# Storage for models and histories
trained_models = []
training_histories = []

# Initialize global best progression tracking
global_best_progression = []

# Global best model tracking
best_global_val_acc = 0.0
best_global_model = None
best_global_fold = 0

# Start k-fold training
start_time_total = time.time()

for fold_num in range(N_FOLDS):
    print(f"\n{'='*60}")
    print(f"Training fold {fold_num + 1}/{N_FOLDS}")
    print(f"{'='*60}")

    # Reset patience values for each fold BEFORE creating callbacks
    PATIENCE = 10      # Early stopping patience
    LR_PATIENCE = 5    # Learning rate reduction patience
    
    # Ensure completely fresh state for each fold
    tf.keras.backend.clear_session()
    
    # Get pre-prepared data for this fold from kfold_data
    fold_data = kfold_data[fold_num]
    X_train_fold = fold_data['X_train']
    y_train_fold = fold_data['y_train']
    X_test_fold = fold_data['X_test']
    y_test_fold = fold_data['y_test']
    
    print(f"Fold {fold_num + 1} data:")
    print(f"  Training samples: {len(X_train_fold):,}")
    print(f"  Test samples: {len(X_test_fold):,}")
    print(f"  Input shape: {X_train_fold.shape}")
    print(f"  Output classes: {len(np.unique(y_train_fold))}")
    
    # Create fresh model for this fold
    model = create_cnn_lstm_model()
    
    # Create callbacks for this fold (AFTER resetting patience values)
    fold_callbacks = create_callbacks(fold_num + 1)
    
    # Train model
    print(f"\nStarting training for fold {fold_num + 1}...")
    start_time_fold = time.time()
    
    history = model.fit(
        X_train_fold, y_train_fold,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        validation_split=VALIDATION_SPLIT,
        callbacks=fold_callbacks,
        verbose=0  # Reduced verbosity - our custom callback handles output
    )
    
    training_time_fold = time.time() - start_time_fold
    epochs_trained = len(history.history['loss'])
    
    # Get best validation accuracy for this fold
    best_fold_val_acc = max(history.history['val_sparse_categorical_accuracy'])
    
    # Find the epoch with the best validation accuracy
    best_epoch_idx = history.history['val_sparse_categorical_accuracy'].index(best_fold_val_acc)
    
    # Note: Global best model tracking and saving is handled automatically by CleanTrainingCallback
    
    # Evaluate on test set (model is already restored to best weights by EarlyStopping)
    test_loss, test_acc = model.evaluate(X_test_fold, y_test_fold, verbose=0)
    
    # Get metrics from the BEST epoch (not the last epoch)
    best_train_loss = history.history['loss'][best_epoch_idx]
    best_train_acc = history.history['sparse_categorical_accuracy'][best_epoch_idx]
    best_val_loss = history.history['val_loss'][best_epoch_idx]
    best_val_acc = best_fold_val_acc  # It's already saved as the best validation accuracy
    
    # Store results (using BEST epoch metrics, not last epoch)
    kfold_results['fold_num'].append(fold_num + 1)
    kfold_results['train_acc'].append(best_train_acc)
    kfold_results['val_acc'].append(best_val_acc)
    kfold_results['test_acc'].append(test_acc)
    kfold_results['train_loss'].append(best_train_loss)
    kfold_results['val_loss'].append(best_val_loss)
    kfold_results['test_loss'].append(test_loss)
    kfold_results['training_time'].append(training_time_fold)
    kfold_results['epochs_trained'].append(epochs_trained)
    
    # Store model and history
    trained_models.append(model)
    training_histories.append(history)
    
    # Explicit cleanup to ensure fresh state for next fold
    del model, history, fold_callbacks  # Delete references
    tf.keras.backend.clear_session()    # Clear TensorFlow session
    
    # Force garbage collection
    import gc
    gc.collect()

In [None]:
print(kfold_results)

In [None]:
# Analyze K-Fold Results
print("K-Fold Results Analysis")
print("")

# Convert results to DataFrame for analysis
results_df = pd.DataFrame(kfold_results)
print("Individual fold results:")
print(results_df.round(4))

# Calculate statistics
print(f"\nSummary Statistics:")
print("")

metrics = ['train_acc', 'val_acc', 'test_acc']
for metric in metrics:
    values = results_df[metric]
    mean_val = values.mean()
    std_val = values.std()
    print(f"{metric.replace('_', ' ').title()}:")
    print(f"  Mean ± Std: {mean_val:.4f} ± {std_val:.4f}")
    print(f"  Range: {values.min():.4f} - {values.max():.4f}")
    print()

# Performance analysis
test_accuracies = results_df['test_acc']
print(f"Final Model Performance:")
print(f"")
print(f"Test Accuracy: {test_accuracies.mean():.4f} ± {test_accuracies.std():.4f}")
print(f"Best fold: {test_accuracies.max():.4f} (Fold {test_accuracies.idxmax() + 1})")
print(f"Worst fold: {test_accuracies.min():.4f} (Fold {test_accuracies.idxmin() + 1})")
print(f"95% Confidence Interval: [{test_accuracies.mean() - 1.96*test_accuracies.std():.4f}, {test_accuracies.mean() + 1.96*test_accuracies.std():.4f}]")

# Compare with random baseline
random_accuracy = 1/52  # Random chance for 52 classes
improvement = (test_accuracies.mean() - random_accuracy) / random_accuracy * 100
print(f"\nComparison with random baseline:")
print(f"Random accuracy: {random_accuracy:.4f} ({random_accuracy*100:.2f}%)")
print(f"Improvement: {improvement:.1f}x better than random")

# Training efficiency
print(f"\nTraining Efficiency:")
print("")
avg_training_time = results_df['training_time'].mean()
avg_epochs = results_df['epochs_trained'].mean()
print(f"Average training time per fold: {avg_training_time/60:.1f} minutes")
print(f"Average epochs per fold: {avg_epochs:.1f}")
print(f"Total training time: {results_df['training_time'].sum()/60:.1f} minutes")

In [None]:
# Load Best Saved Model for Analysis
print("Loading Best Saved Model for Analysis")
print("")

import glob
import os
import time

# Find all saved model files
model_files = glob.glob("best_CNN_Model.keras")

if model_files:
    # Sort by modification time to get the most recent
    model_files.sort(key=os.path.getmtime, reverse=True)
    
    print(f"Found {len(model_files)} saved model files:")
    for i, file in enumerate(model_files[:5]):  # Show first 5
        mod_time = time.ctime(os.path.getmtime(file))
        file_size = os.path.getsize(file) / 1024 / 1024  # Size in MB
        print(f"  {i+1}. {file} ({file_size:.1f} MB, {mod_time})")
    
    # Load the most recent model (or choose a specific one)
    best_model_path = model_files[0]
    print(f"\nLoading best model: {best_model_path}")
    
    try:
        # Load the model for analysis (independent of training variables)
        analysis_model = tf.keras.models.load_model(best_model_path)
        print(f"Model loaded successfully!")
        print(f"   Model input shape: {analysis_model.input_shape}")
        print(f"   Model output shape: {analysis_model.output_shape}")
        print(f"   Total parameters: {analysis_model.count_params():,}")
        
    except Exception as e:
        print(f"Error loading model: {e}")
        print("   Will use the last trained model from memory instead")
        if 'trained_models' in locals() and trained_models:
            analysis_model = trained_models[-1]  # Use last trained model
        else:
            print("   No trained models available. Please run training first.")
            analysis_model = None
            
else:
    print("No saved model files found!")
    print("   Looking for files matching pattern: best_cnn_lstm_fold_*.keras")
    print("   Please run the training first or check the file path.")
    
    # Try to use models from memory if available
    if 'trained_models' in locals() and trained_models:
        analysis_model = trained_models[-1]
        print("   Using last trained model from memory instead")
    else:
        analysis_model = None
        print("   No models available for analysis")

print("")
if analysis_model is not None:
    print("Model loaded and available as 'analysis_model'")
else:
    print("Please run training first")

In [None]:
print("\nVisualising K-Fold Results")
print("")

from matplotlib.patches import Patch

x_pos = np.arange(len(results_df))
width = 0.25

# 1. Accuracy comparison across folds
fig1, ax1 = plt.subplots(figsize=(8, 6))
ax1.bar(x_pos - width, results_df['train_acc'], width, label='Train', alpha=0.8, color='#2d2d2d')
ax1.bar(x_pos, results_df['val_acc'], width, label='Validation', alpha=0.8, color='#6d6d6d')
ax1.bar(x_pos + width, results_df['test_acc'], width, label='Test', alpha=0.8, color='#a8a8a8')
ax1.set_xlabel('Fold')
ax1.set_ylabel('Accuracy')
ax1.set_title('Accuracy Across Folds')
ax1.set_xticks(x_pos)
ax1.set_xticklabels([f'Fold {i+1}' for i in range(N_FOLDS)])
legend_elements = [
    Patch(facecolor='#2d2d2d', edgecolor='black', label='Train'),
    Patch(facecolor='#6d6d6d', edgecolor='black', label='Validation'),
    Patch(facecolor='#a8a8a8', edgecolor='black', label='Test')
]
ax1.legend(handles=legend_elements, loc='best', title='Grey Tone Legend')
ax1.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 2. Loss comparison across folds
fig2, ax2 = plt.subplots(figsize=(8, 6))
ax2.bar(x_pos - width, results_df['train_loss'], width, label='Train', alpha=0.8, color='#2d2d2d')
ax2.bar(x_pos, results_df['val_loss'], width, label='Validation', alpha=0.8, color='#6d6d6d')
ax2.bar(x_pos + width, results_df['test_loss'], width, label='Test', alpha=0.8, color='#a8a8a8')
ax2.set_xlabel('Fold')
ax2.set_ylabel('Loss')
ax2.set_title('Loss Across Folds')
ax2.set_xticks(x_pos)
ax2.set_xticklabels([f'Fold {i+1}' for i in range(N_FOLDS)])
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 3. Training time and epochs
fig3, ax3 = plt.subplots(figsize=(8, 6))
ax3_twin = ax3.twinx()
bars1 = ax3.bar(x_pos, results_df['training_time']/60, alpha=0.8, color='#6d6d6d', label='Training Time')
ax3.set_xlabel('Fold')
ax3.set_ylabel('Training Time / minutes', color='#6d6d6d')
ax3.tick_params(axis='y', labelcolor='#6d6d6d')
bars2 = ax3_twin.bar(x_pos, results_df['epochs_trained'], alpha=0.6, color='#a8a8a8', width=0.6, label='Epochs')
ax3_twin.set_ylabel('Epochs Trained', color='#a8a8a8')
ax3_twin.tick_params(axis='y', labelcolor='#a8a8a8')
ax3.set_title('Training Efficiency')
ax3.set_xticks(x_pos)
ax3.set_xticklabels([f'Fold {i+1}' for i in range(N_FOLDS)])
plt.tight_layout()
plt.show()

# 4. Test accuracy distribution
fig4, ax4 = plt.subplots(figsize=(8, 6))
ax4.hist(results_df['test_acc'], bins=10, alpha=0.7, color='#6d6d6d', edgecolor='black')
ax4.axvline(results_df['test_acc'].mean(), color='red', linestyle='--', linewidth=2, label=f'Mean: {results_df["test_acc"].mean():.4f}')
ax4.set_xlabel('Test Accuracy')
ax4.set_ylabel('Frequency')
ax4.set_title('Test Accuracy Distribution')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 5. Accuracy vs Training Time
fig5, ax5 = plt.subplots(figsize=(8, 6))
ax5.scatter(results_df['training_time']/60, results_df['test_acc'], s=100, alpha=0.7, color='#6d6d6d')
for i, txt in enumerate(results_df['fold_num']):
    ax5.annotate(f'F{txt}', (results_df['training_time'].iloc[i]/60, results_df['test_acc'].iloc[i]), 
                xytext=(5, 5), textcoords='offset points')
ax5.set_xlabel('Training Time (minutes)')
ax5.set_ylabel('Test Accuracy')
ax5.set_title('Accuracy vs Training Time')
ax5.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 6. Performance summary table
fig6, ax6 = plt.subplots(figsize=(8, 4))
ax6.axis('tight')
ax6.axis('off')
summary_data = [
    ['Metric', 'Mean ± Std', 'Min', 'Max'],
    ['Test Accuracy', f'{test_accuracies.mean():.4f} ± {test_accuracies.std():.4f}', 
     f'{test_accuracies.min():.4f}', f'{test_accuracies.max():.4f}'],
    ['Val Accuracy', f'{results_df["val_acc"].mean():.4f} ± {results_df["val_acc"].std():.4f}', 
     f'{results_df["val_acc"].min():.4f}', f'{results_df["val_acc"].max():.4f}'],
    ['Training Time (min)', f'{results_df["training_time"].mean()/60:.1f} ± {results_df["training_time"].std()/60:.1f}', 
     f'{results_df["training_time"].min()/60:.1f}', f'{results_df["training_time"].max()/60:.1f}'],
    ['Epochs Trained', f'{results_df["epochs_trained"].mean():.1f} ± {results_df["epochs_trained"].std():.1f}', 
     f'{results_df["epochs_trained"].min()}', f'{results_df["epochs_trained"].max()}']
]
table = ax6.table(cellText=summary_data, cellLoc='center', loc='center', 
                 colWidths=[0.3, 0.3, 0.2, 0.2])
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1, 1.5)
for i in range(len(summary_data[0])):
    table[(0, i)].set_facecolor('#2d2d2d')
    table[(0, i)].set_text_props(weight='bold', color='white')
ax6.set_title('Performance Summary', fontweight='bold')
plt.tight_layout()
plt.show()

In [None]:
# Visualize Training Histories (Learning Curves)
print("\nTraining Histories Visualization")
print("")

# Plot learning curves for all folds
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
fig.suptitle('CNN-LSTM Training History - Learning Curves', fontsize=16, fontweight='bold')

# Colors for different folds
colors = ['#2d2d2d', '#2d2d2d', '#2d2d2d', '#2d2d2d', '#2d2d2d'] # rudimary solution

# Plot individual fold histories
for i in range(min(N_FOLDS, 5)):  # Show up to 5 folds
    row = i // 3
    col = i % 3
    
    if i < len(training_histories):
        history = training_histories[i]
        
        # Accuracy plot
        ax = axes[row, col]
        epochs = range(1, len(history.history['sparse_categorical_accuracy']) + 1)
        
        ax.plot(epochs, history.history['sparse_categorical_accuracy'], 
               'o-', color=colors[i], alpha=0.8, label='Train Accuracy')
        ax.plot(epochs, history.history['val_sparse_categorical_accuracy'], 
               's-', color=colors[i], alpha=0.6, label='Val Accuracy')
        
        ax.set_title(f'Fold {i+1} - Learning Curves')
        ax.set_xlabel('Epoch')
        ax.set_ylabel('Accuracy')
        ax.legend(loc='lower right')
        ax.grid(True, alpha=0.3)
        
        # Add final accuracies as text
        final_train_acc = history.history['sparse_categorical_accuracy'][-1]
        final_val_acc = history.history['val_sparse_categorical_accuracy'][-1]
        ax.text(0.05, 0.95, f'Final Train: {final_train_acc:.3f}\nFinal Val: {final_val_acc:.3f}', 
               transform=ax.transAxes, verticalalignment='top', 
               bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))

# If there is fewer than 6 folds, hide extra subplots
for i in range(N_FOLDS, 6):
    row = i // 3
    col = i % 3
    if row < 2 and col < 3:
        axes[row, col].set_visible(False)

plt.tight_layout()
plt.show()

# Aggregate learning curves
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

# Collect all training histories
all_train_acc = []
all_val_acc = []
all_train_loss = []
all_val_loss = []

max_epochs = max(len(hist.history['sparse_categorical_accuracy']) for hist in training_histories)

for history in training_histories:
    all_train_acc.append(history.history['sparse_categorical_accuracy'])
    all_val_acc.append(history.history['val_sparse_categorical_accuracy'])
    all_train_loss.append(history.history['loss'])
    all_val_loss.append(history.history['val_loss'])

# Pad sequences to same length and compute mean/std
def pad_and_compute_stats(sequences):
    # Pad sequences to max length
    padded = []
    for seq in sequences:
        if len(seq) < max_epochs:
            # Pad with last value
            padded_seq = seq + [seq[-1]] * (max_epochs - len(seq))
        else:
            padded_seq = seq
        padded.append(padded_seq)
    
    padded = np.array(padded)
    mean_seq = np.mean(padded, axis=0)
    std_seq = np.std(padded, axis=0)
    return mean_seq, std_seq

# Compute statistics
train_acc_mean, train_acc_std = pad_and_compute_stats(all_train_acc)
val_acc_mean, val_acc_std = pad_and_compute_stats(all_val_acc)
train_loss_mean, train_loss_std = pad_and_compute_stats(all_train_loss)
val_loss_mean, val_loss_std = pad_and_compute_stats(all_val_loss)

epochs = range(1, max_epochs + 1)

# Plot mean accuracy with confidence intervals
fig_acc, ax_acc = plt.subplots(figsize=(8, 6))
ax_acc.plot(epochs, train_acc_mean, 'o-', color='#2d2d2d', alpha=0.8, label='Train Accuracy')
ax_acc.fill_between(epochs, train_acc_mean - train_acc_std, train_acc_mean + train_acc_std, alpha=0.2, color='#2d2d2d')
ax_acc.plot(epochs, val_acc_mean, 's-', color='#a8a8a8', alpha=0.8, label='Validation Accuracy')
ax_acc.fill_between(epochs, val_acc_mean - val_acc_std, val_acc_mean + val_acc_std, alpha=0.2, color='#a8a8a8')
ax_acc.set_title('Average Learning Curves - Accuracy')
ax_acc.set_xlabel('Epoch')
ax_acc.set_ylabel('Accuracy')
ax_acc.legend(loc='lower right')
ax_acc.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# Plot mean loss with confidence intervals
fig_loss, ax_loss = plt.subplots(figsize=(8, 6))
ax_loss.plot(epochs, train_loss_mean, 'o-', color='#2d2d2d', alpha=0.8, label='Train Loss')
ax_loss.fill_between(epochs, train_loss_mean - train_loss_std, train_loss_mean + train_loss_std, alpha=0.2, color='#2d2d2d')
ax_loss.plot(epochs, val_loss_mean, 's-', color='#a8a8a8', alpha=0.8, label='Validation Loss')
ax_loss.fill_between(epochs, val_loss_mean - val_loss_std, val_loss_mean + val_loss_std, alpha=0.2, color='#a8a8a8')
ax_loss.set_title('Average Learning Curves - Loss')
ax_loss.set_xlabel('Epoch')
ax_loss.set_ylabel('Loss')
ax_loss.legend(loc='upper right')
ax_loss.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print("Training history visualization complete!")
print(f"   - Average convergence at epoch {np.argmax(val_acc_mean)+1}")
print(f"   - Best validation accuracy: {np.max(val_acc_mean):.4f}")
print(f"   - Final training stability: {'Good' if train_acc_std[-1] < 0.05 else 'Variable'}")

In [None]:
# Global Best Validation Accuracy Progression
print("\nGlobal Best Validation Accuracy Progression")
print("")

if global_best_progression:
    # Convert progression data to arrays for plotting
    progression_df = pd.DataFrame(global_best_progression)
    progression_df['global_epoch'] = range(1, len(progression_df) + 1)

    # Current validation accuracy vs global best progression
    fig1, ax1 = plt.subplots(figsize=(15, 6))
    ax1.plot(progression_df['global_epoch'], progression_df['val_acc'],
             'o-', color='lightblue', alpha=0.6, markersize=3, label='Current Val Accuracy')
    ax1.plot(progression_df['global_epoch'], progression_df['global_best'],
             'o-', color='darkred', linewidth=2, markersize=4, label='Global Best (Cumulative)')

    # Highlight improvements
    improvements = progression_df[progression_df['val_acc'] == progression_df['global_best']]
    if not improvements.empty:
        ax1.scatter(improvements['global_epoch'], improvements['global_best'],
                    color='red', s=50, alpha=0.8, zorder=5, label='New Global Best')

    # Add fold boundaries
    fold_boundaries = []
    current_fold = progression_df['fold'].iloc[0]
    for i, fold in enumerate(progression_df['fold']):
        if fold != current_fold:
            fold_boundaries.append(i)
            current_fold = fold
    for boundary in fold_boundaries:
        ax1.axvline(x=boundary, color='gray', linestyle='--', alpha=0.5)

    ax1.set_xlabel('Global Epoch (Across All Folds)')
    ax1.set_ylabel('Validation Accuracy')
    ax1.set_title('Validation Accuracy Progression')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

    # Global best accuracy by fold
    fig2, ax2 = plt.subplots(figsize=(10, 6))
    fold_maxes = progression_df.groupby('fold')['global_best'].max()
    fold_numbers = fold_maxes.index

    bars = ax2.bar(fold_numbers, fold_maxes, color='steelblue', alpha=0.7, edgecolor='navy')
    for bar, val in zip(bars, fold_maxes):
        height = bar.get_height()
        ax2.text(bar.get_x() + bar.get_width()/2., height + 0.001,
                 f'{val:.4f}', ha='center', va='bottom', fontweight='bold')

    final_best = progression_df['global_best'].iloc[-1]
    ax2.axhline(y=final_best, color='red', linestyle='-', linewidth=2,
                label=f'Final Global Best: {final_best:.4f}')

    ax2.set_xlabel('Fold Number')
    ax2.set_ylabel('Global Best Validation Accuracy')
    ax2.set_title('Global Best Accuracy Reached by Each Fold')
    ax2.legend()
    ax2.grid(True, alpha=0.3, axis='y')
    ax2.set_xticks(fold_numbers)
    plt.tight_layout()
    plt.show()

    # Summary statistics
    total_improvements = len(improvements)
    total_epochs = len(progression_df)
    improvement_rate = (total_improvements / total_epochs) * 100

    print(f"\nGlobal Best Summary:")
    print("")
    print(f"Total epochs trained: {total_epochs}")
    print(f"Global best improvements: {total_improvements}")
    print(f"Improvement rate: {improvement_rate:.1f}%")
    print(f"Final global best accuracy: {final_best:.4f}")

    # Which fold contributed most improvements
    improvement_by_fold = improvements['fold'].value_counts().sort_index()
    print(f"\nImprovements by fold:")
    for fold, count in improvement_by_fold.items():
        print(f"   Fold {fold}: {count} improvements")

    if not improvement_by_fold.empty:
        best_contributing_fold = improvement_by_fold.idxmax()
        print(f"\nFold {best_contributing_fold} contributed the most improvements ({improvement_by_fold.max()} times)")

else:
    print("No global best progression data available. Please run the training first.")

In [None]:
# Comprehensive Confusion Matrix for All 52 Classes
print("\nConfusion Matrix Analysis for All 52 Classes")
print("")

from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

# Use the best model to make predictions on all test data
print("Generating predictions using the best model...")

# Combine all test data from all folds to get comprehensive results
all_y_true = []
all_y_pred = []

for i in range(N_FOLDS):
    fold_data = kfold_data[i]
    X_test_fold = fold_data['X_test']
    y_test_fold = fold_data['y_test']
    
    # Make predictions using the best saved model
    y_pred_fold = analysis_model.predict(X_test_fold, verbose=0)
    y_pred_fold_classes = np.argmax(y_pred_fold, axis=1)
    
    all_y_true.extend(y_test_fold)
    all_y_pred.extend(y_pred_fold_classes)

all_y_true = np.array(all_y_true)
all_y_pred = np.array(all_y_pred)

# Generate confusion matrix
cm = confusion_matrix(all_y_true, all_y_pred)
print(f"Confusion matrix shape: {cm.shape}")
print(f"Total test samples: {len(all_y_true):,}")

tick_marks = np.arange(0, 52, 5)

# 1. Full confusion matrix (heatmap)
plt.figure(figsize=(12, 10))
plt.title('Full Confusion Matrix (52x52)', fontweight='bold')
im1 = plt.imshow(cm, interpolation='nearest', cmap='Blues')
plt.xlabel('Predicted Class')
plt.ylabel('True Class')
plt.colorbar(im1, shrink=0.8, label='Number of Samples')
plt.xticks(tick_marks)
plt.yticks(tick_marks)
plt.tight_layout()
plt.show()

# 2. Normalized confusion matrix (percentages)
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
plt.figure(figsize=(12, 10))
plt.title('Normalized Confusion Matrix (Recall per Class)', fontweight='bold')
im2 = plt.imshow(cm_normalized, interpolation='nearest', cmap='Reds', vmin=0, vmax=1)
plt.xlabel('Predicted Class')
plt.ylabel('True Class')
plt.colorbar(im2, shrink=0.8, label='Recall (True Positive Rate)')
plt.xticks(tick_marks)
plt.yticks(tick_marks)
plt.tight_layout()
plt.show()

# 3. Per-class accuracy (diagonal elements of normalized confusion matrix)
class_accuracies = np.diag(cm_normalized)
classes = np.arange(52)
plt.figure(figsize=(14, 6))
bars = plt.bar(classes, class_accuracies, color='steelblue', alpha=0.7, edgecolor='navy', linewidth=0.5)
plt.title('Per-Class Recall (Accuracy)', fontweight='bold')
plt.xlabel('Class')
plt.ylabel('Recall')
plt.grid(True, alpha=0.3, axis='y')
best_class = np.argmax(class_accuracies)
worst_class = np.argmin(class_accuracies)
bars[best_class].set_color('green')
bars[worst_class].set_color('red')
avg_accuracy = np.mean(class_accuracies)
plt.axhline(y=avg_accuracy, color='red', linestyle='--', linewidth=2, label=f'Average: {avg_accuracy:.3f}')
plt.legend(loc='lower right')
plt.xticks(tick_marks)
plt.tight_layout()
plt.show()

# 4. Per-class F1-score
class_stats = []
for i in range(52):
    tp = cm[i, i]
    fp = cm[:, i].sum() - tp
    fn = cm[i, :].sum() - tp
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    class_stats.append({
        'class': i,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'support': tp + fn
    })
class_stats_df = pd.DataFrame(class_stats)

plt.figure(figsize=(14, 6))
bars_f1 = plt.bar(classes, class_stats_df['f1_score'], color='orange', alpha=0.7, edgecolor='darkorange', linewidth=0.5)
plt.title('Per-Class F1-Score', fontweight='bold')
plt.xlabel('Class')
plt.ylabel('F1-Score')
plt.grid(True, alpha=0.3, axis='y')
best_f1_class = class_stats_df['f1_score'].idxmax()
worst_f1_class = class_stats_df['f1_score'].idxmin()
bars_f1[best_f1_class].set_color('green')
bars_f1[worst_f1_class].set_color('red')
avg_f1 = class_stats_df['f1_score'].mean()
plt.axhline(y=avg_f1, color='red', linestyle='--', linewidth=2, label=f'Average: {avg_f1:.3f}')
plt.legend(loc='lower right')
plt.xticks(tick_marks)
plt.tight_layout()
plt.show()

# Print detailed statistics
print(f"\nConfusion Matrix Statistics:")
print("")
print(f"Overall Accuracy: {np.trace(cm) / np.sum(cm):.4f}")
print(f"Average Per-Class Recall: {avg_accuracy:.4f}")
print(f"Average F1-Score: {avg_f1:.4f}")

print(f"\nBest Performing Classes:")
print("")
top_5_recall = class_stats_df.nlargest(5, 'recall')
for _, row in top_5_recall.iterrows():
    print(f"Class {int(row['class']):2d}: Recall={row['recall']:.3f}, F1={row['f1_score']:.3f}, Support={int(row['support'])}")

print(f"\nWorst Performing Classes:")
print("")
bottom_5_recall = class_stats_df.nsmallest(5, 'recall')
for _, row in bottom_5_recall.iterrows():
    print(f"Class {int(row['class']):2d}: Recall={row['recall']:.3f}, F1={row['f1_score']:.3f}, Support={int(row['support'])}")

# Most confused class pairs
print(f"\nMost Confused Class pairs:")
print("")
confused_pairs = []
for i in range(52):
    for j in range(52):
        if i != j and cm[i, j] > 0:
            confused_pairs.append((i, j, cm[i, j]))

confused_pairs.sort(key=lambda x: x[2], reverse=True)
for i, (true_class, pred_class, count) in enumerate(confused_pairs[:10]):
    print(f"{i+1:2d}. Class {true_class:2d} → Class {pred_class:2d}: {count:4d} samples")

print(f"\nClass Distribution In Test Set:")
print(f"="*32)
unique_classes, class_counts = np.unique(all_y_true, return_counts=True)
print(f"Number of unique classes in test set: {len(unique_classes)}")
print(f"Class distribution statistics:")
print(f"   Min samples per class: {np.min(class_counts)}")
print(f"   Max samples per class: {np.max(class_counts)}")
print(f"   Mean samples per class: {np.mean(class_counts):.1f}")
print(f"   Std samples per class: {np.std(class_counts):.1f}")
print(f"   Class imbalance ratio: {np.max(class_counts) / np.min(class_counts):.1f}")

In [None]:
# AUROC Curve for Multi-Class Classification (One-vs-Rest)

from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt

print("\nAUROC Curve Analysis")

n_classes = 52

# Collect all test data and predicted probabilities from all folds
all_y_true = []
all_y_score = []

for i in range(N_FOLDS):
    fold_data = kfold_data[i]
    X_test_fold = fold_data['X_test']
    y_test_fold = fold_data['y_test']
    y_pred_prob = analysis_model.predict(X_test_fold, verbose=0)
    all_y_true.extend(y_test_fold)
    all_y_score.append(y_pred_prob)

all_y_true = np.array(all_y_true)
all_y_score = np.vstack(all_y_score)

# Binarize the labels for multi-class ROC
y_true_bin = label_binarize(all_y_true, classes=np.arange(n_classes))

# Compute ROC curve and ROC area for each class
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], all_y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Plot ROC curves for the five worst classes
worst_classes = [40, 44, 38, 13, 36]    # Worst performing classes from last cell
best_class = 20                         # Best performing class from last cell in comparison

plt.figure(figsize=(12, 8))
for i in worst_classes:
    plt.plot(fpr[i], tpr[i], lw=2, color='grey', label=f'Class {i}')
# Plot best class in green
plt.plot(fpr[best_class], tpr[best_class], lw=2, color='black', label=f'Class {best_class}')
plt.plot([0, 1], [0, 1], 'k--', lw=2, label='Chance')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('AUROC Curves: Five Worst Classes (Grey) and Best Class (Black)')
plt.legend(loc='lower right')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# Print mean and best AUROC
mean_auc = np.mean(list(roc_auc.values()))
best_auc = np.max(list(roc_auc.values()))
print(f"\nMean AUROC across all classes: {mean_auc:.3f}")
print(f"Best AUROC: {best_auc:.3f}")

In [None]:
print("AUROC for Best and Five Worst Classes")
print("")

# Worst classes AUROC
for i in worst_classes:
    print(f"Worst class (Class {i}) AUROC: {roc_auc[i]:.3f}")

In [None]:
# Clean Feature Importance Analysis
print("\nFeature Importance Analysis")
print("")

# Clear any SHAP interference and restart fresh
import gc
if 'explainer' in globals():
    del explainer
gc.collect()

# Restart TensorFlow session to clear any SHAP interference
import tensorflow as tf
tf.keras.backend.clear_session()

# Reload the model fresh to avoid any SHAP contamination
print("Loading clean model for analysis...")
try:
    clean_model = tf.keras.models.load_model('best_CNN_Model.keras')
    print("Model loaded successfully")
except:
    print("Using analysis_model from memory")
    clean_model = analysis_model

# Select data for analysis
n_samples_analysis = min(30, len(all_y_true))
fold_data = kfold_data[0]
X_analysis = fold_data['X_test'][:n_samples_analysis]
y_analysis = fold_data['y_test'][:n_samples_analysis]

print(f"Analyzing {n_samples_analysis} samples")
print(f"Input shape: {X_analysis.shape}")

# Method 1: Permutation Feature Importance
print("\n1. Computing permutation feature importance...")

def permutation_importance_analysis(model, X, y, n_repeats=3):
    """Compute permutation importance for each feature"""
    # Get baseline predictions
    baseline_predictions = model.predict(X, verbose=0)
    baseline_accuracy = np.mean(np.argmax(baseline_predictions, axis=1) == y)
    
    importance_scores = np.zeros((X.shape[1], X.shape[2]))  # time_steps x channels
    
    for time_step in range(X.shape[1]):
        for channel in range(X.shape[2]):
            scores = []
            for _ in range(n_repeats):
                # Create a copy and permute this feature
                X_permuted = X.copy()
                np.random.shuffle(X_permuted[:, time_step, channel])
                
                # Get predictions with permuted feature
                permuted_predictions = model.predict(X_permuted, verbose=0)
                permuted_accuracy = np.mean(np.argmax(permuted_predictions, axis=1) == y)
                
                # Importance is the decrease in accuracy
                importance = baseline_accuracy - permuted_accuracy
                scores.append(importance)
            
            importance_scores[time_step, channel] = np.mean(scores)
    
    return importance_scores, baseline_accuracy

perm_importance, baseline_acc = permutation_importance_analysis(clean_model, X_analysis, y_analysis)
print(f"Baseline accuracy: {baseline_acc:.4f}")

# Method 2: Occlusion Analysis
print("2. Computing occlusion-based importance...")

def occlusion_analysis(model, X, y, occlusion_size=3):
    """Analyze feature importance by occluding regions"""
    baseline_predictions = model.predict(X, verbose=0)
    baseline_accuracy = np.mean(np.argmax(baseline_predictions, axis=1) == y)
    
    occlusion_scores = np.zeros((X.shape[1], X.shape[2]))
    
    for time_step in range(0, X.shape[1], occlusion_size):
        for channel in range(X.shape[2]):
            # Occlude this region
            X_occluded = X.copy()
            end_time = min(time_step + occlusion_size, X.shape[1])
            X_occluded[:, time_step:end_time, channel] = 0
            
            # Get predictions with occluded feature
            occluded_predictions = model.predict(X_occluded, verbose=0)
            occluded_accuracy = np.mean(np.argmax(occluded_predictions, axis=1) == y)
            
            # Importance is the decrease in accuracy
            importance = baseline_accuracy - occluded_accuracy
            for t in range(time_step, end_time):
                occlusion_scores[t, channel] = importance
    
    return occlusion_scores

occlusion_importance = occlusion_analysis(clean_model, X_analysis, y_analysis)

# Method 3: Variance-based Analysis
print("3. Computing variance-based feature importance...")

def variance_analysis(X):
    """Analyze feature importance based on variance"""
    # Normalize the data first
    X_norm = (X - np.mean(X, axis=0)) / (np.std(X, axis=0) + 1e-8)
    
    # Compute variance across samples for each feature
    feature_variance = np.var(X_norm, axis=0)
    
    return feature_variance

variance_importance = variance_analysis(X_analysis)

# Create comprehensive visualizations
fig, axes = plt.subplots(2, 3, figsize=(20, 12))
fig.suptitle('Feature Importance Analysis - Multiple Methods', fontsize=16, fontweight='bold')

# 1. Permutation importance by channel
ax1 = axes[0, 0]
channel_perm_importance = np.mean(perm_importance, axis=0)
channels = np.arange(16) # change to 16 for NinaPro DB5 or 12 for NinaPro DB4

bars1 = ax1.bar(channels, channel_perm_importance, color='steelblue', alpha=0.7, edgecolor='navy')
ax1.set_title('Channel Importance (Permutation)', fontweight='bold')
ax1.set_xlabel('EMG Channel')
ax1.set_ylabel('Importance Score')
ax1.grid(True, alpha=0.3, axis='y')

# Highlight top channels
top_channels_perm = np.argsort(channel_perm_importance)[-3:]
for ch in top_channels_perm:
    bars1[ch].set_color('orange')

# 2. Temporal importance (permutation)
ax2 = axes[0, 1]
temporal_perm_importance = np.mean(perm_importance, axis=1)
time_steps = np.arange(WINDOW_SIZE)

ax2.plot(time_steps, temporal_perm_importance, 'o-', color='darkgreen', linewidth=2, markersize=4)
ax2.set_title('Temporal Importance (Permutation)', fontweight='bold')
ax2.set_xlabel('Time Step')
ax2.set_ylabel('Importance Score')
ax2.grid(True, alpha=0.3)

# Highlight top time steps
top_timesteps_perm = np.argsort(temporal_perm_importance)[-5:]
ax2.scatter(top_timesteps_perm, temporal_perm_importance[top_timesteps_perm], 
           color='red', s=60, zorder=5, label='Top 5 time steps')
ax2.legend()

# 3. Permutation importance heatmap
ax3 = axes[0, 2]
im1 = ax3.imshow(perm_importance.T, aspect='auto', cmap='RdYlBu_r', interpolation='nearest')
ax3.set_title('Permutation Importance Heatmap', fontweight='bold')
ax3.set_xlabel('Time Step')
ax3.set_ylabel('EMG Channel')
cbar1 = plt.colorbar(im1, ax=ax3, shrink=0.8)
cbar1.set_label('Importance Score')

# 4. Occlusion importance by channel
ax4 = axes[1, 0]
channel_occ_importance = np.mean(occlusion_importance, axis=0)

bars2 = ax4.bar(channels, channel_occ_importance, color='purple', alpha=0.7, edgecolor='darkviolet')
ax4.set_title('Channel Importance (Occlusion)', fontweight='bold')
ax4.set_xlabel('EMG Channel')
ax4.set_ylabel('Importance Score')
ax4.grid(True, alpha=0.3, axis='y')

# Highlight top channels
top_channels_occ = np.argsort(channel_occ_importance)[-3:]
for ch in top_channels_occ:
    bars2[ch].set_color('orange')

# 5. Variance-based importance
ax5 = axes[1, 1]
channel_var_importance = np.mean(variance_importance, axis=0)

bars3 = ax5.bar(channels, channel_var_importance, color='green', alpha=0.7, edgecolor='darkgreen')
ax5.set_title('Channel Importance (Variance)', fontweight='bold')
ax5.set_xlabel('EMG Channel')
ax5.set_ylabel('Variance Score')
ax5.grid(True, alpha=0.3, axis='y')

# Highlight top channels
top_channels_var = np.argsort(channel_var_importance)[-3:]
for ch in top_channels_var:
    bars3[ch].set_color('orange')

# 6. Method comparison
ax6 = axes[1, 2]

# Normalize for comparison
perm_norm = channel_perm_importance / np.max(np.abs(channel_perm_importance))
occ_norm = channel_occ_importance / np.max(np.abs(channel_occ_importance))
var_norm = channel_var_importance / np.max(channel_var_importance)

width = 0.25
x_pos = np.arange(len(channels))

bars_comp1 = ax6.bar(x_pos - width, perm_norm, width, 
                    label='Permutation', color='steelblue', alpha=0.7)
bars_comp2 = ax6.bar(x_pos, occ_norm, width,
                    label='Occlusion', color='purple', alpha=0.7)
bars_comp3 = ax6.bar(x_pos + width, var_norm, width,
                    label='Variance', color='green', alpha=0.7)

ax6.set_title('Method Comparison (Normalized)', fontweight='bold')
ax6.set_xlabel('EMG Channel')
ax6.set_ylabel('Normalized Importance')
ax6.set_xticks(x_pos)
ax6.set_xticklabels(channels)
ax6.legend()
ax6.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

# Print detailed analysis
print(f"\nFeature Importance Analysis Summary")
print("")

print(f"\nMost Important Channels")
print("")
print("Permutation Method:")
for i, ch in enumerate(reversed(top_channels_perm)):
    print(f"  {i+1}. Channel {ch:2d}: Score = {channel_perm_importance[ch]:.6f}")

print("\nOcclusion Method:")
for i, ch in enumerate(reversed(top_channels_occ)):
    print(f"  {i+1}. Channel {ch:2d}: Score = {channel_occ_importance[ch]:.6f}")

print("\nVariance Method:")
for i, ch in enumerate(reversed(top_channels_var)):
    print(f"  {i+1}. Channel {ch:2d}: Score = {channel_var_importance[ch]:.6f}")

print(f"\nMost Important Time Steps:")
print("")
for i, ts in enumerate(reversed(top_timesteps_perm)):
    print(f"  {i+1}. Time step {ts:2d}: Score = {temporal_perm_importance[ts]:.6f}")

print(f"\nTemporal Pattern Analysis:")
print("")
early_importance = np.mean(temporal_perm_importance[:WINDOW_SIZE//3])
middle_importance = np.mean(temporal_perm_importance[WINDOW_SIZE//3:2*WINDOW_SIZE//3])
late_importance = np.mean(temporal_perm_importance[2*WINDOW_SIZE//3:])

print(f"First Exercise (first 1/3):   {early_importance:.6f}")
print(f"Second Exercise (middle 1/3): {middle_importance:.6f}")
print(f"Third Exercise (last 1/3):     {late_importance:.6f}")

if early_importance > middle_importance and early_importance > late_importance:
    print("→ Model focuses most on First muscle activation patterns")
elif late_importance > early_importance and late_importance > middle_importance:
    print("→ Model focuses most on Third muscle activation patterns")
else:
    print("→ Model focuses most on Second muscle activation patterns")

print(f"\nChannel Group Analysis:")
print("")
proximal_channels = [0, 1, 2, 3]
middle_channels = [4, 5, 6, 7, 8, 9, 10, 11]
distal_channels = [12, 13, 14, 15]

proximal_importance = np.mean(channel_perm_importance[proximal_channels])
middle_group_importance = np.mean(channel_perm_importance[middle_channels])
distal_importance = np.mean(channel_perm_importance[distal_channels])

print(f"Proximal muscles (channels 0-3):   {proximal_importance:.6f}")
print(f"Middle muscles (channels 4-11):    {middle_group_importance:.6f}")
print(f"Distal muscles (channels 12-15):   {distal_importance:.6f}")

max_group = max(proximal_importance, middle_group_importance, distal_importance)
if max_group == proximal_importance:
    print("→ Model relies most on PROXIMAL muscle activity")
elif max_group == distal_importance:
    print("→ Model relies most on DISTAL muscle activity")
else:
    print("→ Model relies most on MIDDLE muscle groups")

print(f"\nMethod Consistency:")
print("")
# Check consistency between methods
corr_perm_occ = np.corrcoef(perm_norm, occ_norm)[0, 1]
corr_perm_var = np.corrcoef(perm_norm, var_norm)[0, 1]
corr_occ_var = np.corrcoef(occ_norm, var_norm)[0, 1]

print(f"Permutation vs Occlusion:  {corr_perm_occ:.3f}")
print(f"Permutation vs Variance:   {corr_perm_var:.3f}")
print(f"Occlusion vs Variance:     {corr_occ_var:.3f}")

avg_correlation = np.mean([corr_perm_occ, corr_perm_var, corr_occ_var])
print(f"Average correlation:       {avg_correlation:.3f}")

if avg_correlation > 0.7:
    print("→ High consistency between methods")
elif avg_correlation > 0.4:
    print("→ Moderate consistency between methods")
else:
    print("→ Low consistency between methods - results may be method-dependent")

print("\nFeature importance analysis complete!")
print("- Permutation importance shows which features are most critical for predictions")
print("- Occlusion analysis reveals spatial dependencies in the model")
print("- Variance analysis identifies the most discriminative features")
print("- Multiple methods provide robust and reliable importance estimates")
print("- These insights can guide electrode placement and feature engineering")

In [None]:
# Class-wise Training Contribution Analysis
print("\nClass-Wise Training Contribution Analysis")
print("")

# Analyze how each class contributed to model training across folds
print("Analyzing class contributions across all folds...")

# 1. Collect class-wise data from all folds
class_contributions = {}
class_sample_counts = {}
class_fold_performance = {}

for fold_idx, fold_data in enumerate(kfold_data):
    X_train, X_test = fold_data['X_train'], fold_data['X_test']
    y_train, y_test = fold_data['y_train'], fold_data['y_test']
    
    # Count samples per class in training set
    train_class_counts = np.bincount(y_train, minlength=52)
    test_class_counts = np.bincount(y_test, minlength=52)
    
    for class_id in range(52):
        if class_id not in class_contributions:
            class_contributions[class_id] = {
                'train_samples': [],
                'test_samples': [],
                'fold_performance': [],
                'total_train_samples': 0,
                'total_test_samples': 0
            }
        
        class_contributions[class_id]['train_samples'].append(train_class_counts[class_id])
        class_contributions[class_id]['test_samples'].append(test_class_counts[class_id])
        class_contributions[class_id]['total_train_samples'] += train_class_counts[class_id]
        class_contributions[class_id]['total_test_samples'] += test_class_counts[class_id]

# 2. Calculate class-wise performance metrics
class_performance_metrics = {}
for class_id in range(52):
    # Get predictions for this class
    class_mask = (all_y_true == class_id)
    if np.sum(class_mask) > 0:
        class_predictions = all_y_pred[class_mask]
        class_accuracy = np.mean(class_predictions == class_id)
        
        # Calculate precision and recall from confusion matrix
        tp = np.sum((all_y_true == class_id) & (all_y_pred == class_id))
        fp = np.sum((all_y_true != class_id) & (all_y_pred == class_id))
        fn = np.sum((all_y_true == class_id) & (all_y_pred != class_id))
        
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        class_performance_metrics[class_id] = {
            'accuracy': class_accuracy,
            'precision': precision,
            'recall': recall,
            'f1_score': f1,
            'total_samples': np.sum(class_mask),
            'correct_predictions': tp
        }
    else:
        class_performance_metrics[class_id] = {
            'accuracy': 0, 'precision': 0, 'recall': 0, 'f1_score': 0,
            'total_samples': 0, 'correct_predictions': 0
        }

# 3. Create comprehensive visualizations
fig, axes = plt.subplots(3, 2, figsize=(20, 18))
fig.suptitle('Class-wise Training Contribution Analysis', fontsize=16, fontweight='bold')

# 3.1 Training sample distribution across classes
ax1 = axes[0, 0]
class_ids = list(range(52))
total_train_samples = [class_contributions[i]['total_train_samples'] for i in class_ids]

bars1 = ax1.bar(class_ids, total_train_samples, color='steelblue', alpha=0.7, edgecolor='navy')
ax1.set_title('Training Samples per Class', fontweight='bold')
ax1.set_xlabel('Movement Class')
ax1.set_ylabel('Total Training Samples')
ax1.grid(True, alpha=0.3, axis='y')

# Highlight classes with most and least samples
max_samples_class = np.argmax(total_train_samples)
min_samples_class = np.argmax([s for s in total_train_samples if s > 0])  # Non-zero minimum
bars1[max_samples_class].set_color('green')
if total_train_samples[min_samples_class] > 0:
    bars1[min_samples_class].set_color('red')

# 3.2 Class performance vs sample count
ax2 = axes[0, 1]
class_f1_scores = [class_performance_metrics[i]['f1_score'] for i in class_ids]
sample_sizes = [class_performance_metrics[i]['total_samples'] for i in class_ids]

# Only plot classes with samples > 0
valid_classes = [(i, f1, samples) for i, f1, samples in zip(class_ids, class_f1_scores, sample_sizes) if samples > 0]
if valid_classes:
    valid_class_ids, valid_f1s, valid_samples = zip(*valid_classes)
    
    scatter = ax2.scatter(valid_samples, valid_f1s, c=valid_class_ids, cmap='viridis', 
                         s=60, alpha=0.7, edgecolors='black')
    ax2.set_title('Class Performance vs Sample Count', fontweight='bold')
    ax2.set_xlabel('Total Test Samples')
    ax2.set_ylabel('F1 Score')
    ax2.grid(True, alpha=0.3)
    
    # Add colorbar
    cbar = plt.colorbar(scatter, ax=ax2)
    cbar.set_label('Class ID')
    
    # Add trend line
    if len(valid_samples) > 1:
        z = np.polyfit(valid_samples, valid_f1s, 1)
        p = np.poly1d(z)
        ax2.plot(valid_samples, p(valid_samples), "r--", alpha=0.8, linewidth=2)

# 3.3 Class-wise recall distribution
ax3 = axes[1, 0]
class_recalls = [class_performance_metrics[i]['recall'] for i in class_ids]

bars3 = ax3.bar(class_ids, class_recalls, color='orange', alpha=0.7, edgecolor='darkorange')
ax3.set_title('Recall per Class', fontweight='bold')
ax3.set_xlabel('Movement Class')
ax3.set_ylabel('Recall')
ax3.grid(True, alpha=0.3, axis='y')
ax3.axhline(y=np.mean(class_recalls), color='red', linestyle='--', linewidth=2, label=f'Mean Recall: {np.mean(class_recalls):.3f}')
ax3.legend()

# Highlight best and worst performing classes
best_recall_class = np.argmax(class_recalls)
worst_recall_class = np.argmin([r for r in class_recalls if r >= 0])
bars3[best_recall_class].set_color('green')
bars3[worst_recall_class].set_color('red')

# 3.4 Training sample variance across folds
ax4 = axes[1, 1]
sample_variances = []
sample_means = []

for class_id in class_ids:
    train_samples = class_contributions[class_id]['train_samples']
    if any(s > 0 for s in train_samples):
        sample_variances.append(np.var(train_samples))
        sample_means.append(np.mean(train_samples))
    else:
        sample_variances.append(0)
        sample_means.append(0)

bars4 = ax4.bar(class_ids, sample_variances, color='purple', alpha=0.7, edgecolor='darkviolet')
ax4.set_title('Training Sample Variance Across Folds', fontweight='bold')
ax4.set_xlabel('Movement Class')
ax4.set_ylabel('Sample Count Variance')
ax4.grid(True, alpha=0.3, axis='y')

# Highlight classes with highest variance (inconsistent across folds)
high_variance_classes = np.argsort(sample_variances)[-3:]
for cls in high_variance_classes:
    if sample_variances[cls] > 0:
        bars4[cls].set_color('red')

# 3.5 Class contribution efficiency (Performance per training sample)
ax5 = axes[2, 0]
efficiency_scores = []
for class_id in class_ids:
    total_train = class_contributions[class_id]['total_train_samples']
    f1_score = class_performance_metrics[class_id]['f1_score']
    
    if total_train > 0:
        efficiency = f1_score / (total_train / 1000)  # F1 per 1000 training samples
    else:
        efficiency = 0
    efficiency_scores.append(efficiency)

bars5 = ax5.bar(class_ids, efficiency_scores, color='green', alpha=0.7, edgecolor='darkgreen')
ax5.set_title('Training Efficiency (F1 Score per 1000 Training Samples)', fontweight='bold')
ax5.set_xlabel('Movement Class')
ax5.set_ylabel('Efficiency Score')
ax5.grid(True, alpha=0.3, axis='y')

# Highlight most and least efficient classes
if any(e > 0 for e in efficiency_scores):
    most_efficient_class = np.argmax(efficiency_scores)
    bars5[most_efficient_class].set_color('gold')

# 3.6 Class balance analysis (Training vs Test sample ratio)
ax6 = axes[2, 1]
balance_ratios = []
for class_id in class_ids:
    total_train = class_contributions[class_id]['total_train_samples']
    total_test = class_contributions[class_id]['total_test_samples']
    
    if total_test > 0:
        ratio = total_train / total_test
    else:
        ratio = 0
    balance_ratios.append(ratio)

bars6 = ax6.bar(class_ids, balance_ratios, color='teal', alpha=0.7, edgecolor='darkcyan')
ax6.set_title('Train/Test Sample Ratio per Class', fontweight='bold')
ax6.set_xlabel('Movement Class')
ax6.set_ylabel('Train/Test Ratio')
ax6.grid(True, alpha=0.3, axis='y')
ax6.axhline(y=np.mean([r for r in balance_ratios if r > 0]), color='red', linestyle='--', 
           linewidth=2, label=f'Mean Ratio: {np.mean([r for r in balance_ratios if r > 0]):.2f}')
ax6.legend()

plt.tight_layout()
plt.show()

# 4. Detailed analysis and insights
print(f"\nClass Contribution Analysis Summary")
print("")

# Top and bottom performers
valid_f1_scores = [(i, score) for i, score in enumerate(class_f1_scores) if score > 0]
if valid_f1_scores:
    sorted_by_f1 = sorted(valid_f1_scores, key=lambda x: x[1], reverse=True)
    
    print(f"\nTop 5 Performing Classes (by F1 Score):")
    print("")
    for i, (class_id, f1_score) in enumerate(sorted_by_f1[:5]):
        train_samples = class_contributions[class_id]['total_train_samples']
        test_samples = class_contributions[class_id]['total_test_samples']
        print(f"  {i+1}. Class {class_id:2d}: F1={f1_score:.3f}, Train={train_samples:,}, Test={test_samples:,}")

    print(f"\nBottom 5 Performing Classes (by F1 Score):")
    print(f"="*42)
    for i, (class_id, f1_score) in enumerate(sorted_by_f1[-5:]):
        train_samples = class_contributions[class_id]['total_train_samples']
        test_samples = class_contributions[class_id]['total_test_samples']
        print(f"  {i+1}. Class {class_id:2d}: F1={f1_score:.3f}, Train={train_samples:,}, Test={test_samples:,}")

# Sample distribution insights
total_samples_all_classes = sum(total_train_samples)
print(f"\nSample Distribution Insights:")
print("")
print(f"• Total training samples: {total_samples_all_classes:,}")
print(f"• Average samples per class: {total_samples_all_classes/52:,.0f}")
print(f"• Most represented class: {max_samples_class} ({max(total_train_samples):,} samples)")

non_zero_samples = [s for s in total_train_samples if s > 0]
if non_zero_samples:
    print(f"• Least represented class (non-zero): {total_train_samples.index(min(non_zero_samples))} ({min(non_zero_samples):,} samples)")
    print(f"• Sample distribution std: {np.std(non_zero_samples):,.0f}")

# Training efficiency insights
valid_efficiency = [(i, eff) for i, eff in enumerate(efficiency_scores) if eff > 0]
if valid_efficiency:
    most_efficient = max(valid_efficiency, key=lambda x: x[1])
    print(f"\nTraining efficiency insight:")
    print("")
    print(f"• Most efficient class: {most_efficient[0]} (F1={class_performance_metrics[most_efficient[0]]['f1_score']:.3f} per 1000 samples)")
    print(f"• Average efficiency: {np.mean([eff for _, eff in valid_efficiency]):.3f}")

# Balance analysis
valid_ratios = [r for r in balance_ratios if r > 0]
if valid_ratios:
    print(f"\nTrain/Test Balance Insight:")
    print("")
    print(f"• Average train/test ratio: {np.mean(valid_ratios):.2f}")
    print(f"• Most balanced class: {balance_ratios.index(min(valid_ratios, key=lambda x: abs(x - np.mean(valid_ratios))))} (ratio: {min(valid_ratios, key=lambda x: abs(x - np.mean(valid_ratios))):.2f})")
    print(f"• Ratio standard deviation: {np.std(valid_ratios):.2f}")

In [None]:
# Advanced Class Grouping and Training Progression Analysis
print("\nAdvanced Class Grouping Analysis")
print("")

# Define meaningful class groups based on movement types (adjust based on the dataset)
class_groups = {
    'Exercise A': list(range(0, 12)),
    'Exercise B': list(range(12, 39)),
    'Exercise C': list(range(39, 52)),
}

# Calculate group-wise statistics
group_stats = {}
for group_name, class_list in class_groups.items():
    group_train_samples = sum(class_contributions[cls]['total_train_samples'] for cls in class_list)
    group_test_samples = sum(class_contributions[cls]['total_test_samples'] for cls in class_list)
    
    # Calculate average performance for the group
    group_f1_scores = [class_performance_metrics[cls]['f1_score'] for cls in class_list 
                      if class_performance_metrics[cls]['total_samples'] > 0]
    group_recalls = [class_performance_metrics[cls]['recall'] for cls in class_list 
                    if class_performance_metrics[cls]['total_samples'] > 0]
    
    group_stats[group_name] = {
        'train_samples': group_train_samples,
        'test_samples': group_test_samples,
        'avg_f1': np.mean(group_f1_scores) if group_f1_scores else 0,
        'avg_recall': np.mean(group_recalls) if group_recalls else 0,
        'num_classes': len(class_list),
        'active_classes': len(group_f1_scores)  # Classes with samples > 0
    }

# Sample distribution by Exercise
group_names = list(group_stats.keys())
group_train_samples = [group_stats[name]['train_samples'] for name in group_names]
group_test_samples = [group_stats[name]['test_samples'] for name in group_names]

x_pos = np.arange(len(group_names))
width = 0.35

fig1, ax1 = plt.subplots(figsize=(8, 6))
bars1 = ax1.bar(x_pos - width/2, group_train_samples, width, label='Training', color='#2d2d2d', alpha=0.8)
bars2 = ax1.bar(x_pos + width/2, group_test_samples, width, label='Testing', color='#a8a8a8', alpha=0.8)

ax1.set_title('Sample Distribution by Exercise', fontweight='bold')
ax1.set_xlabel('Exercise')
ax1.set_ylabel('Number of Samples')
ax1.set_xticks(x_pos)
ax1.set_xticklabels(group_names, rotation=0, ha='right')
ax1.legend()
ax1.grid(True, alpha=0.3, axis='y')

# Add value labels on bars
for i, (train, test) in enumerate(zip(group_train_samples, group_test_samples)):
    ax1.text(i - width/2, train + 50, f'{train:,}', ha='center', va='bottom', fontsize=9)
    ax1.text(i + width/2, test + 50, f'{test:,}', ha='center', va='bottom', fontsize=9)

plt.tight_layout()
plt.show()

# Add value labels on bars
for i, (train, test) in enumerate(zip(group_train_samples, group_test_samples)):
    ax1.text(i - width/2, train + 50, f'{train:,}', ha='center', va='bottom', fontsize=9)
    ax1.text(i + width/2, test + 50, f'{test:,}', ha='center', va='bottom', fontsize=9)

plt.tight_layout()
plt.show()

# Performance by Exercise
group_f1_scores = [group_stats[name]['avg_f1'] for name in group_names]
group_recall_scores = [group_stats[name]['avg_recall'] for name in group_names]

fig2, ax2 = plt.subplots(figsize=(8, 6))
bars3 = ax2.bar(x_pos - width/2, group_f1_scores, width, label='F1 Score', color='#2d2d2d', alpha=0.7)
bars4 = ax2.bar(x_pos + width/2, group_recall_scores, width, label='Recall', color='#a8a8a8', alpha=0.7)

ax2.set_title('Average Performance by Exercise', fontweight='bold')
ax2.set_xlabel('Exercise')
ax2.set_ylabel('Score')
ax2.set_xticks(x_pos)
ax2.set_xticklabels(group_names, rotation=0, ha='right')
ax2.legend()
ax2.grid(True, alpha=0.3, axis='y')
ax2.set_ylim(0, max(max(group_f1_scores), max(group_recall_scores)) * 1.1)

plt.tight_layout()
plt.show()

# Detailed group analysis results
print(f"\nExercise Analysis Results")
print("")

for group_name in group_names:
    stats = group_stats[group_name]
    efficiency = stats['avg_f1'] / (stats['train_samples'] / 1000) if stats['train_samples'] > 0 else 0
    coverage = stats['active_classes'] / stats['num_classes']
    
    print(f"\n{group_name.upper()}:")
    print(f"  • Training samples: {stats['train_samples']:,}")
    print(f"  • Test samples: {stats['test_samples']:,}")
    print(f"  • Average F1 score: {stats['avg_f1']:.3f}")
    print(f"  • Average recall: {stats['avg_recall']:.3f}")
    print(f"  • Class coverage: {coverage:.1%} ({stats['active_classes']}/{stats['num_classes']} classes)")
    print(f"  • Training efficiency: {efficiency:.4f}")

# Overall insights
print(f"\nKey Exercise Insights")
print("")

# Best performing group
best_f1_group = group_names[np.argmax(group_f1_scores)]
best_efficiency_group = group_names[np.argmax([group_stats[name]['avg_f1'] / (group_stats[name]['train_samples'] / 1000) if group_stats[name]['train_samples'] > 0 else 0 for name in group_names])]
most_data_group = group_names[np.argmax(group_train_samples)]

print(f"• Best performing group (F1): {best_f1_group} ({max(group_f1_scores):.3f})")
print(f"• Most efficient group: {best_efficiency_group} ({max([group_stats[name]['avg_f1'] / (group_stats[name]['train_samples'] / 1000) if group_stats[name]['train_samples'] > 0 else 0 for name in group_names]):.4f})")
print(f"• Most represented group: {most_data_group} ({max(group_train_samples):,} samples)")

# Coverage analysis
avg_coverage = np.mean([stats['active_classes'] / stats['num_classes'] for stats in group_stats.values()])
print(f"• Average class coverage: {avg_coverage:.1%}")

if avg_coverage < 0.8:
    print("  → Low coverage suggests many movement classes are underrepresented")
else:
    print("  → Good coverage across movement types")

# Data balance analysis
sample_counts = [group_stats[name]['train_samples'] for name in group_names]
sample_std = np.std(sample_counts)
sample_mean = np.mean(sample_counts)
cv = sample_std / sample_mean if sample_mean > 0 else 0

print(f"• Data balance (CV): {cv:.2f}")
if cv > 0.5:
    print("  → High variability in group representation")
elif cv > 0.3:
    print("  → Moderate variability in group representation")
else:
    print("  → Well-balanced representation across groups")
