# sEMG-HHT CNN Classifier for Movement Quality and Fatigue Classification

This notebook implements a Convolutional Neural Network (CNN) encoder with SVM classifier for classifying surface electromyography (sEMG) signals based on movement quality and fatigue levels.

## Architecture Overview
- **Input**: 256×256 matrix from HHT (Hilbert-Huang Transform) of sEMG signals
- **Encoder**: 3-layer CNN with Conv2D + InstanceNorm + LeakyReLU
- **Pooling**: Global Average Pooling
- **Classifier**: SVM for multi-class classification

## Requirements
- PyTorch
- scikit-learn
- NumPy
- Matplotlib

# Kaggle Integration | Kaggle 集成

This notebook is configured to work with the **HILBERTMATRIX_NPZ** dataset on Kaggle.

本笔记本配置为使用 Kaggle 上的 **HILBERTMATRIX_NPZ** 数据集。

Data path: `/kaggle/input/hilbertmatrix-npz/hht_matrices/`

# sEMG-HHT CNN Classifier - Kaggle Training
# sEMG-HHT CNN 分类器 - Kaggle 训练

This notebook trains a CNN-SVM classifier on sEMG Hilbert-Huang Transform data for 6-class classification:
本笔记本训练一个 CNN-SVM 分类器，用于 sEMG 希尔伯特-黄变换数据的 6 类分类：

- **Gender (性别)**: Male (男性, M), Female (女性, F)
- **Movement Quality (动作质量)**: Full (完整), Half (半程), Invalid (无效)

## Dataset Integration | 数据集集成

This notebook uses the **HILBERTMATRIX_NPZ** dataset from Kaggle.
本笔记本使用 Kaggle 上的 **HILBERTMATRIX_NPZ** 数据集。

Data location: `/kaggle/input/hilbertmatrix-npz/hht_matrices/`
数据位置：`/kaggle/input/hilbertmatrix-npz/hht_matrices/`

In [None]:
# Configure data paths for Kaggle
# 配置 Kaggle 数据路径
import os

# Check if running on Kaggle
# 检查是否在 Kaggle 上运行
IS_KAGGLE = os.path.exists('/kaggle/input')

if IS_KAGGLE:
    DATA_DIR = '/kaggle/input/hilbertmatrix-npz/hht_matrices'
    CHECKPOINT_DIR = '/kaggle/working/checkpoints'
    print(f'Running on Kaggle | 在 Kaggle 上运行')
    print(f'Data directory | 数据目录: {DATA_DIR}')
else:
    DATA_DIR = './data'
    CHECKPOINT_DIR = './checkpoints'
    print(f'Running locally | 本地运行')
    print(f'Data directory | 数据目录: {DATA_DIR}')

os.makedirs(CHECKPOINT_DIR, exist_ok=True)
print(f'Checkpoint directory | 检查点目录: {CHECKPOINT_DIR}')

In [None]:
# Install dependencies (uncomment if running on Kaggle or fresh environment)
# !pip install torch torchvision scikit-learn numpy matplotlib scipy PyEMD

In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from typing import Tuple, Optional
import warnings

# Suppress specific warnings that are not critical for this demo
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning, module='sklearn')

# Set random seeds for reproducibility
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

# Check for GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 🎛️ Hyperparameter Configuration | 超参数配置

**Configure all important hyperparameters here** | **在此处配置所有重要的超参数**

Modify these values to experiment with different model configurations and training settings.

In [None]:
# ============================================================================
# HYPERPARAMETER CONFIGURATION | 超参数配置
# ============================================================================
# Modify these values to experiment with different settings
# 修改这些值以尝试不同的设置

# ---------------------------------------------------------------------------
# Model Architecture | 模型架构
# ---------------------------------------------------------------------------
MODEL_IN_CHANNELS = 1              # Input channels | 输入通道数
MODEL_BASE_CHANNELS = 64           # Base number of channels in CNN | CNN基础通道数
MODEL_NUM_ENCODER_LAYERS = 3       # Number of encoder layers | 编码器层数
MODEL_DROPOUT_RATE = 0.5           # Dropout rate for regularization | Dropout率

# ---------------------------------------------------------------------------
# Training Configuration | 训练配置
# ---------------------------------------------------------------------------
TRAIN_EPOCHS = 50                  # Number of training epochs | 训练轮数
TRAIN_BATCH_SIZE = 16              # Batch size for training | 训练批次大小
TRAIN_LEARNING_RATE = 0.001        # Learning rate | 学习率
TRAIN_CHECKPOINT_INTERVAL = 5      # Save checkpoint every N epochs | 每N轮保存检查点

# ---------------------------------------------------------------------------
# Learning Rate Scheduler | 学习率调度器
# ---------------------------------------------------------------------------
LR_SCHEDULER_FACTOR = 0.5          # Factor to reduce LR | 学习率衰减因子
LR_SCHEDULER_PATIENCE = 5          # Epochs to wait before reducing LR | 等待轮数

# ---------------------------------------------------------------------------
# SVM Configuration (for CNN+SVM method) | SVM配置（用于CNN+SVM方法）
# ---------------------------------------------------------------------------
SVM_KERNEL = 'rbf'                 # SVM kernel type | SVM核函数类型
SVM_C = 10.0                       # SVM regularization parameter | SVM正则化参数
SVM_GAMMA = 'scale'                # SVM kernel coefficient | SVM核系数

# ---------------------------------------------------------------------------
# Display Configuration | 显示配置
# ---------------------------------------------------------------------------
print('='*70)
print('HYPERPARAMETER CONFIGURATION | 超参数配置')
print('='*70)
print(f'\n📐 Model Architecture | 模型架构:')
print(f'  - Input Channels: {MODEL_IN_CHANNELS}')
print(f'  - Base Channels: {MODEL_BASE_CHANNELS}')
print(f'  - Encoder Layers: {MODEL_NUM_ENCODER_LAYERS}')
print(f'  - Dropout Rate: {MODEL_DROPOUT_RATE}')
print(f'\n🎯 Training Configuration | 训练配置:')
print(f'  - Epochs: {TRAIN_EPOCHS}')
print(f'  - Batch Size: {TRAIN_BATCH_SIZE}')
print(f'  - Learning Rate: {TRAIN_LEARNING_RATE}')
print(f'  - Checkpoint Interval: {TRAIN_CHECKPOINT_INTERVAL}')
print(f'\n📉 LR Scheduler | 学习率调度器:')
print(f'  - Factor: {LR_SCHEDULER_FACTOR}')
print(f'  - Patience: {LR_SCHEDULER_PATIENCE}')
print(f'\n🔧 SVM Configuration | SVM配置:')
print(f'  - Kernel: {SVM_KERNEL}')
print(f'  - C: {SVM_C}')
print(f'  - Gamma: {SVM_GAMMA}')
print('='*70)

## 1. CNN Encoder Architecture

The encoder consists of 3 convolutional layers, each with:
- Conv2D (kernel=3, stride=2, padding=1)
- Instance Normalization
- LeakyReLU activation

This progressively reduces the spatial dimensions while extracting features.

In [None]:
class ConvBlock(nn.Module):
    """Convolutional block with Conv2D, InstanceNorm, and LeakyReLU."""
    
    def __init__(self, in_channels: int, out_channels: int, 
                 kernel_size: int = 3, stride: int = 2, padding: int = 1,
                 leaky_slope: float = 0.2):
        super(ConvBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, 
                              kernel_size=kernel_size, 
                              stride=stride, 
                              padding=padding)
        self.instance_norm = nn.InstanceNorm2d(out_channels)
        self.activation = nn.LeakyReLU(negative_slope=leaky_slope)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.conv(x)
        x = self.instance_norm(x)
        x = self.activation(x)
        return x


class sEMGHHTEncoder(nn.Module):
    """
    CNN Encoder for sEMG-HHT matrix classification.
    
    Architecture:
    - Input: 1×256×256 (single-channel HHT matrix)
    - 3 ConvBlocks with increasing channels
    - Global Average Pooling
    - Output: Feature vector for SVM classification
    """
    
    def __init__(self, in_channels: int = 1, 
                 base_channels: int = 64,
                 num_layers: int = 3,
                 leaky_slope: float = 0.2):
        super(sEMGHHTEncoder, self).__init__()
        
        self.in_channels = in_channels
        self.base_channels = base_channels
        self.num_layers = num_layers
        
        # Build convolutional layers
        layers = []
        current_channels = in_channels
        
        for i in range(num_layers):
            out_channels = base_channels * (2 ** i)
            layers.append(ConvBlock(
                in_channels=current_channels,
                out_channels=out_channels,
                kernel_size=3,
                stride=2,
                padding=1,
                leaky_slope=leaky_slope
            ))
            current_channels = out_channels
        
        self.encoder = nn.Sequential(*layers)
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        
        # Calculate output feature dimension
        self.feature_dim = base_channels * (2 ** (num_layers - 1))
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the encoder.
        
        Args:
            x: Input tensor of shape (batch, channels, height, width)
        
        Returns:
            Feature tensor of shape (batch, feature_dim)
        """
        x = self.encoder(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten to (batch, feature_dim)
        return x
    
    def get_feature_dim(self) -> int:
        """Return the output feature dimension."""
        return self.feature_dim

## 2. Complete Classification Pipeline

This class combines the CNN encoder with an SVM classifier for end-to-end classification.

In [None]:
class sEMGHHTClassifier:
    """
    Complete classification pipeline combining CNN encoder and SVM classifier.
    
    The pipeline:
    1. Extracts features using CNN encoder
    2. Normalizes features using StandardScaler
    3. Classifies using SVM (supports multi-class)
    """
    
    def __init__(self, 
                 encoder: Optional[sEMGHHTEncoder] = None,
                 svm_kernel: str = 'rbf',
                 svm_C: float = 1.0,
                 svm_gamma: str = 'scale',
                 device: torch.device = torch.device('cpu')):
        """
        Initialize the classifier.
        
        Args:
            encoder: Pre-trained or new CNN encoder (creates default if None)
            svm_kernel: SVM kernel type ('rbf', 'linear', 'poly')
            svm_C: SVM regularization parameter
            svm_gamma: SVM gamma parameter
            device: Device to run the encoder on
        """
        self.device = device
        
        # Initialize encoder
        if encoder is None:
            self.encoder = sEMGHHTEncoder(
                in_channels=1, 
                base_channels=64, 
                num_layers=3
            )
        else:
            self.encoder = encoder
        
        self.encoder.to(self.device)
        
        # Initialize scaler and SVM
        self.scaler = StandardScaler()
        self.svm = SVC(
            kernel=svm_kernel,
            C=svm_C,
            gamma=svm_gamma,
            decision_function_shape='ovr',  # One-vs-Rest for multi-class
            probability=True  # Enable probability estimates
        )
        
        self._is_fitted = False
    
    def extract_features(self, X: np.ndarray, batch_size: int = 32) -> np.ndarray:
        """
        Extract features from HHT matrices using the CNN encoder.
        
        Args:
            X: Input array of shape (n_samples, height, width) or (n_samples, 1, height, width)
            batch_size: Batch size for processing
        
        Returns:
            Feature array of shape (n_samples, feature_dim)
        """
        self.encoder.eval()
        
        # Ensure correct shape
        if X.ndim == 3:
            X = X[:, np.newaxis, :, :]  # Add channel dimension
        
        features_list = []
        n_samples = X.shape[0]
        
        with torch.no_grad():
            for i in range(0, n_samples, batch_size):
                batch = torch.tensor(X[i:i+batch_size], dtype=torch.float32).to(self.device)
                batch_features = self.encoder(batch)
                features_list.append(batch_features.cpu().numpy())
        
        return np.vstack(features_list)
    
    def fit(self, X: np.ndarray, y: np.ndarray, batch_size: int = 32):
        """
        Fit the classifier (extract features and train SVM).
        
        Args:
            X: Training HHT matrices of shape (n_samples, height, width)
            y: Training labels of shape (n_samples,)
            batch_size: Batch size for feature extraction
        """
        print("Extracting features from training data...")
        features = self.extract_features(X, batch_size)
        
        print("Normalizing features...")
        features_scaled = self.scaler.fit_transform(features)
        
        print("Training SVM classifier...")
        self.svm.fit(features_scaled, y)
        
        self._is_fitted = True
        print("Training complete!")
    
    def predict(self, X: np.ndarray, batch_size: int = 32) -> np.ndarray:
        """
        Predict class labels for samples.
        
        Args:
            X: Input HHT matrices of shape (n_samples, height, width)
            batch_size: Batch size for feature extraction
        
        Returns:
            Predicted labels of shape (n_samples,)
        """
        if not self._is_fitted:
            raise RuntimeError("Classifier must be fitted before predicting")
        
        features = self.extract_features(X, batch_size)
        features_scaled = self.scaler.transform(features)
        return self.svm.predict(features_scaled)
    
    def predict_proba(self, X: np.ndarray, batch_size: int = 32) -> np.ndarray:
        """
        Predict class probabilities for samples.
        
        Args:
            X: Input HHT matrices of shape (n_samples, height, width)
            batch_size: Batch size for feature extraction
        
        Returns:
            Probability array of shape (n_samples, n_classes)
        """
        if not self._is_fitted:
            raise RuntimeError("Classifier must be fitted before predicting")
        
        features = self.extract_features(X, batch_size)
        features_scaled = self.scaler.transform(features)
        return self.svm.predict_proba(features_scaled)
    
    def evaluate(self, X: np.ndarray, y: np.ndarray, 
                 batch_size: int = 32) -> dict:
        """
        Evaluate the classifier on test data.
        
        Args:
            X: Test HHT matrices
            y: True labels
            batch_size: Batch size for feature extraction
        
        Returns:
            Dictionary containing accuracy, predictions, and classification report
        """
        y_pred = self.predict(X, batch_size)
        accuracy = accuracy_score(y, y_pred)
        
        return {
            'accuracy': accuracy,
            'predictions': y_pred,
            'classification_report': classification_report(y, y_pred),
            'confusion_matrix': confusion_matrix(y, y_pred)
        }

---

# Training Methods Overview | 训练方法概述

This notebook supports **TWO** training approaches. Choose the one that fits your needs:

本笔记本支持**两种**训练方法。根据需求选择：

---

---

# Training Methods Comparison | 训练方法对比

This notebook supports **TWO** training methods. Choose based on your needs:

本笔记本支持**两种**训练方法。根据需求选择：

---

## Method 1: CNN+SVM (Traditional) | 方法一：CNN+SVM（传统）

**English**:
- CNN encoder extracts features (weights **frozen**)
- SVM classifier trains on extracted features
- **Fast training**, good for small datasets
- **No epochs** needed - one-shot training

**中文**：
- CNN 编码器提取特征（权重**冻结**）
- SVM 分类器在提取的特征上训练
- **训练快速**，适合小数据集
- **无需多轮训练** - 一次性训练

**Use When | 使用场景**:
- Limited data (< 1000 samples) | 数据有限（< 1000 样本）
- Quick prototyping | 快速原型
- Stable baseline needed | 需要稳定基线

---

## Method 2: End-to-End with Encoder Fine-tuning | 方法二：端到端编码器微调

**English**:
- **Entire network** trains together (encoder + classifier)
- Encoder **adapts** to your specific data
- **Multi-epoch** training with checkpointing
- Can resume from interruption
- Real-time progress monitoring (accuracy/loss)

**中文**：
- **整个网络**一起训练（编码器 + 分类器）
- 编码器**适应**你的特定数据
- **多轮**训练与检查点保存
- 可从中断处恢复
- 实时进度监控（准确率/损失）

**Use When | 使用场景**:
- Large dataset (> 1000 samples) | 大数据集（> 1000 样本）
- Maximum accuracy needed | 需要最大准确率
- Domain-specific data | 领域特定数据

---

**📍 Scroll down to find sections for each method:**

**📍 向下滚动找到每种方法的对应章节：**

- **Section 5**: CNN+SVM Training | 第 5 节：CNN+SVM 训练
- **Section 6**: End-to-End Training | 第 6 节：端到端训练

---

In [None]:
def train_end_to_end_with_checkpointing(
    model: nn.Module,
    X_train: np.ndarray,
    y_train: np.ndarray,
    X_val: np.ndarray,
    y_val: np.ndarray,
    epochs: int = 50,
    batch_size: int = 16,
    learning_rate: float = 0.001,
    checkpoint_interval: int = 5,
    checkpoint_dir: str = None,
    resume_from: str = None,
    device: torch.device = None
) -> dict:
    """
    Train end-to-end model with comprehensive checkpointing support.
    端到端训练模型，支持完整的检查点功能。
    
    Features | 功能:
    - Multi-epoch training | 多轮训练
    - Checkpoint saving every N epochs | 每 N 轮保存检查点
    - Resume from interruption | 中断后恢复
    - Best model auto-save | 自动保存最佳模型
    - Real-time progress display | 实时进度显示
    
    Args:
        model: Model to train | 要训练的模型
        X_train, y_train: Training data | 训练数据
        X_val, y_val: Validation data | 验证数据
        epochs: Number of epochs | 训练轮数
        batch_size: Batch size | 批次大小
        learning_rate: Learning rate | 学习率
        checkpoint_interval: Save checkpoint every N epochs | 每 N 轮保存检查点
        checkpoint_dir: Where to save checkpoints | 检查点保存目录
        resume_from: Path to checkpoint to resume from | 要恢复的检查点路径
        device: Device to use | 使用的设备
    
    Returns:
        Dictionary with training history | 包含训练历史的字典
    """
    # Input validation | 输入验证
    if epochs <= 0:
        raise ValueError(f"epochs must be > 0, got {epochs}")
    if batch_size <= 0:
        raise ValueError(f"batch_size must be > 0, got {batch_size}")
    if learning_rate <= 0:
        raise ValueError(f"learning_rate must be > 0, got {learning_rate}")
    if checkpoint_interval <= 0:
        raise ValueError(f"checkpoint_interval must be > 0, got {checkpoint_interval}")
    
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    if checkpoint_dir is None:
        checkpoint_dir = CHECKPOINT_DIR
    
    os.makedirs(checkpoint_dir, exist_ok=True)
    
    model = model.to(device)
    
    # Ensure correct shape
    if X_train.ndim == 3:
        X_train = X_train[:, np.newaxis, :, :]
        X_val = X_val[:, np.newaxis, :, :]
    
    # Create data loaders
    train_dataset = torch.utils.data.TensorDataset(
        torch.tensor(X_train, dtype=torch.float32),
        torch.tensor(y_train, dtype=torch.long)
    )
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True
    )
    
    val_dataset = torch.utils.data.TensorDataset(
        torch.tensor(X_val, dtype=torch.float32),
        torch.tensor(y_val, dtype=torch.long)
    )
    val_loader = torch.utils.data.DataLoader(
        val_dataset, batch_size=batch_size, shuffle=False
    )
    
    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='max', factor=LR_SCHEDULER_FACTOR, patience=LR_SCHEDULER_PATIENCE
    )
    
    # Initialize history
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': [],
        'learning_rate': []
    }
    
    start_epoch = 0
    best_path = os.path.join(checkpoint_dir, 'best_model.pt')
    best_val_acc = 0.0
    
    # Resume from checkpoint if specified
    if resume_from and os.path.exists(resume_from):
        print(f'\n📂 Resuming from checkpoint: {resume_from}')
        checkpoint = torch.load(resume_from, map_location=device)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_epoch = checkpoint['epoch'] + 1
        best_val_acc = checkpoint.get('best_val_acc', 0.0)
        history = checkpoint.get('history', history)
        print(f'✅ Resumed from epoch {start_epoch}, best val acc: {best_val_acc:.4f}')
    
    print(f'\n🚀 Training from epoch {start_epoch} to {epochs}...')
    print(f'Device: {device}')
    print(f'Train samples: {len(X_train)}, Val samples: {len(X_val)}')
    print(f'Checkpoint interval: every {checkpoint_interval} epochs')
    print('='*70)
    
    for epoch in range(start_epoch, epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * batch_X.size(0)
            _, predicted = outputs.max(1)
            train_total += batch_y.size(0)
            train_correct += predicted.eq(batch_y).sum().item()
        
        train_loss /= train_total
        train_acc = train_correct / train_total
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                
                val_loss += loss.item() * batch_X.size(0)
                _, predicted = outputs.max(1)
                val_total += batch_y.size(0)
                val_correct += predicted.eq(batch_y).sum().item()
        
        val_loss /= val_total
        val_acc = val_correct / val_total
        
        # Update learning rate
        scheduler.step(val_acc)
        current_lr = optimizer.param_groups[0]['lr']
        
        # Save history
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['learning_rate'].append(current_lr)
        
        # Print progress
        print(f"Epoch [{epoch+1:3d}/{epochs}] | "
              f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f} | "
              f"LR: {current_lr:.6f}")
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'best_val_acc': best_val_acc,
                'history': history
            }, best_path)
            print(f"  ⭐ New best model! Val Acc: {val_acc:.4f} (saved to {best_path})")
        
        # Save checkpoint at interval
        if (epoch + 1) % checkpoint_interval == 0:
            checkpoint_path = os.path.join(checkpoint_dir, f'checkpoint_epoch_{epoch+1}.pt')
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'best_val_acc': best_val_acc,
                'history': history
            }, checkpoint_path)
            print(f"  💾 Checkpoint saved: {checkpoint_path}")
    
    # Save final checkpoint
    final_path = os.path.join(checkpoint_dir, 'final_model.pt')
    torch.save({
        'epoch': epochs - 1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_val_acc': best_val_acc,
        'history': history
    }, final_path)
    
    print('='*70)
    print(f'\n✅ Training Complete!')
    print(f'Best Validation Accuracy: {best_val_acc:.4f}')
    print(f'Final model saved to: {final_path}')
    print(f'Best model saved to: {best_path}')
    
    return history

In [None]:
def plot_training_history(history: dict):
    """
    Plot training history (loss and accuracy).
    绘制训练历史（损失和准确率）。
    """
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    
    epochs = range(1, len(history['train_loss']) + 1)
    
    # Plot loss
    axes[0].plot(epochs, history['train_loss'], 'b-', label='Train Loss | 训练损失', linewidth=2)
    axes[0].plot(epochs, history['val_loss'], 'r-', label='Val Loss | 验证损失', linewidth=2)
    axes[0].set_xlabel('Epoch | 轮次', fontsize=12)
    axes[0].set_ylabel('Loss | 损失', fontsize=12)
    axes[0].set_title('Training and Validation Loss | 训练和验证损失', fontsize=14)
    axes[0].legend(fontsize=10)
    axes[0].grid(True, alpha=0.3)
    
    # Plot accuracy
    axes[1].plot(epochs, history['train_acc'], 'b-', label='Train Acc | 训练准确率', linewidth=2)
    axes[1].plot(epochs, history['val_acc'], 'r-', label='Val Acc | 验证准确率', linewidth=2)
    axes[1].set_xlabel('Epoch | 轮次', fontsize=12)
    axes[1].set_ylabel('Accuracy | 准确率', fontsize=12)
    axes[1].set_title('Training and Validation Accuracy | 训练和验证准确率', fontsize=14)
    axes[1].legend(fontsize=10)
    axes[1].grid(True, alpha=0.3)
    
    # Plot learning rate
    if 'learning_rate' in history:
        axes[2].plot(epochs, history['learning_rate'], 'g-', linewidth=2)
        axes[2].set_xlabel('Epoch | 轮次', fontsize=12)
        axes[2].set_ylabel('Learning Rate | 学习率', fontsize=12)
        axes[2].set_title('Learning Rate Schedule | 学习率调度', fontsize=14)
        axes[2].set_yscale('log')
        axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # Print final statistics
    print(f'\n📊 Training Statistics | 训练统计:')
    print(f'   Final Train Acc | 最终训练准确率: {history["train_acc"][-1]:.4f}')
    print(f'   Final Val Acc | 最终验证准确率: {history["val_acc"][-1]:.4f}')
    print(f'   Best Val Acc | 最佳验证准确率: {max(history["val_acc"]):.4f}')
    print(f'   Final Train Loss | 最终训练损失: {history["train_loss"][-1]:.4f}')
    print(f'   Final Val Loss | 最终验证损失: {history["val_loss"][-1]:.4f}')

In [None]:
class sEMGHHTEndToEndClassifier(nn.Module):
    """
    End-to-end trainable classifier with CNN encoder and linear classification head.
    
    This version allows the encoder to be fine-tuned during training, enabling
    the model to adapt to your specific data distribution.
    
    Architecture:
        Input (1×256×256) 
        → CNN Encoder (3 ConvBlocks) 
        → Features (256-dim)
        → Dropout → FC(256→128) → ReLU → Dropout → FC(128→n_classes)
        → Logits (n_classes)
    
    Args:
        n_classes (int): Number of output classes (default: 4)
            For 6-class problem (M_full, M_half, M_invalid, F_full, F_half, F_invalid), use n_classes=6
        in_channels (int): Number of input channels (default: 1 for grayscale HHT matrix)
        base_channels (int): Base number of channels in first conv layer (default: 64)
            Channel progression: 64 → 128 → 256
        num_encoder_layers (int): Number of convolutional blocks in encoder (default: 3)
            Must match the encoder architecture you want to use
        dropout_rate (float): Dropout probability for regularization (default: 0.5)
            Range: 0.0 (no dropout) to 0.9 (high dropout)
            Typical values: 0.3-0.6
    
    Example:
        >>> model = sEMGHHTEndToEndClassifier(
        ...     n_classes=6,
        ...     base_channels=64,
        ...     num_encoder_layers=3,
        ...     dropout_rate=0.5
        ... )
        >>> x = torch.randn(16, 1, 256, 256)  # Batch of 16 HHT matrices
        >>> logits = model(x)  # Output shape: (16, 6)
    """
    
    def __init__(self, 
                 n_classes: int = 4,
                 in_channels: int = 1,
                 base_channels: int = 64,
                 num_encoder_layers: int = 3,
                 dropout_rate: float = 0.5):
        super(sEMGHHTEndToEndClassifier, self).__init__()
        
        self.encoder = sEMGHHTEncoder(
            in_channels=in_channels,
            base_channels=base_channels,
            num_layers=num_encoder_layers
        )
        
        feature_dim = self.encoder.get_feature_dim()
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(feature_dim, 128),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(128, n_classes)
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        features = self.encoder(x)
        logits = self.classifier(features)
        return logits
    
    def get_features(self, x: torch.Tensor) -> torch.Tensor:
        """Extract features without classification."""
        return self.encoder(x)

In [None]:
def train_end_to_end(model: nn.Module,
                     X_train: np.ndarray,
                     y_train: np.ndarray,
                     X_val: np.ndarray,
                     y_val: np.ndarray,
                     epochs: int = 50,
                     batch_size: int = 16,
                     learning_rate: float = 0.001,
                     device: torch.device = torch.device('cpu')) -> dict:
    """
    Train the end-to-end model.
    
    Args:
        model: The model to train
        X_train, y_train: Training data and labels
        X_val, y_val: Validation data and labels
        epochs: Number of training epochs
        batch_size: Batch size
        learning_rate: Learning rate
        device: Device to train on
    
    Returns:
        Dictionary containing training history
    """
    model = model.to(device)
    
    # Ensure correct shape
    if X_train.ndim == 3:
        X_train = X_train[:, np.newaxis, :, :]
        X_val = X_val[:, np.newaxis, :, :]
    
    # Create data loaders
    train_dataset = torch.utils.data.TensorDataset(
        torch.tensor(X_train, dtype=torch.float32),
        torch.tensor(y_train, dtype=torch.long)
    )
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True
    )
    
    val_dataset = torch.utils.data.TensorDataset(
        torch.tensor(X_val, dtype=torch.float32),
        torch.tensor(y_val, dtype=torch.long)
    )
    val_loader = torch.utils.data.DataLoader(
        val_dataset, batch_size=batch_size, shuffle=False
    )
    
    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='max', factor=LR_SCHEDULER_FACTOR, patience=LR_SCHEDULER_PATIENCE
    )
    
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': []
    }
    
    best_val_acc = 0.0
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * batch_X.size(0)
            _, predicted = outputs.max(1)
            train_total += batch_y.size(0)
            train_correct += predicted.eq(batch_y).sum().item()
        
        train_loss /= train_total
        train_acc = train_correct / train_total
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                
                val_loss += loss.item() * batch_X.size(0)
                _, predicted = outputs.max(1)
                val_total += batch_y.size(0)
                val_correct += predicted.eq(batch_y).sum().item()
        
        val_loss /= val_total
        val_acc = val_correct / val_total
        
        # Update learning rate
        scheduler.step(val_acc)
        
        # Save history
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}: "
                  f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
                  f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
    print(f"\nBest Validation Accuracy: {best_val_acc:.4f}")
    return history

## 4. Model Saving and Loading

In [None]:
import pickle

def save_svm_classifier(classifier: sEMGHHTClassifier, path: str):
    """Save the SVM-based classifier to disk."""
    # Save encoder
    torch.save(classifier.encoder.state_dict(), f"{path}_encoder.pt")
    
    # Save scaler and SVM
    with open(f"{path}_scaler.pkl", 'wb') as f:
        pickle.dump(classifier.scaler, f)
    
    with open(f"{path}_svm.pkl", 'wb') as f:
        pickle.dump(classifier.svm, f)
    
    print(f"Classifier saved to {path}_*.pt/pkl")

def load_svm_classifier(path: str, device: torch.device = torch.device('cpu')) -> sEMGHHTClassifier:
    """Load a saved SVM-based classifier."""
    classifier = sEMGHHTClassifier(device=device)
    
    # Load encoder
    classifier.encoder.load_state_dict(torch.load(f"{path}_encoder.pt", map_location=device))
    
    # Load scaler and SVM
    with open(f"{path}_scaler.pkl", 'rb') as f:
        classifier.scaler = pickle.load(f)
    
    with open(f"{path}_svm.pkl", 'rb') as f:
        classifier.svm = pickle.load(f)
    
    classifier._is_fitted = True
    print(f"Classifier loaded from {path}_*.pt/pkl")
    return classifier

def save_e2e_model(model: nn.Module, path: str):
    """Save the end-to-end model."""
    torch.save(model.state_dict(), path)
    print(f"Model saved to {path}")

def load_e2e_model(path: str, n_classes: int = 4, 
                   device: torch.device = torch.device('cpu')) -> sEMGHHTEndToEndClassifier:
    """Load a saved end-to-end model."""
    model = sEMGHHTEndToEndClassifier(n_classes=n_classes)
    model.load_state_dict(torch.load(path, map_location=device))
    model.to(device)
    model.eval()
    print(f"Model loaded from {path}")
    return model

## Real Data Training | 真实数据训练

Train the classifier on real sEMG HHT data from the Kaggle dataset.
在 Kaggle 数据集的真实 sEMG HHT 数据上训练分类器。

In [None]:
# Import required modules
# 导入所需模块
import glob
import re
from sklearn.preprocessing import LabelEncoder

def parse_filename(filename):
    """
    Parse filename to extract labels.
    解析文件名以提取标签。
    
    Returns None if filename starts with 'Test' (unlabeled test data)
    如果文件名以 'Test' 开头则返回 None（未标记的测试数据）
    """
    basename = os.path.basename(filename)
    
    # Skip files that start with 'Test'
    # 跳过以 'Test' 开头的文件
    if basename.lower().startswith('test'):
        return None
    
    # Extract gender (M or F)
    # 提取性别（M 或 F）
    gender_match = re.search(r'[_-]([MF])[_-]', basename)
    if not gender_match:
        return None
    gender = gender_match.group(1)
    
    # Extract movement quality
    # 提取动作质量
    basename_lower = basename.lower()
    if 'fatiguetest' in basename_lower or 'full' in basename_lower:
        movement = 'full'
    elif 'half' in basename_lower:
        movement = 'half'
    elif 'invalid' in basename_lower or 'wrong' in basename_lower:
        movement = 'invalid'
    else:
        return None
    
    return {'gender': gender, 'movement': movement}

def load_data_from_directory(data_dir):
    """
    Load HHT matrices from npz files.
    从 npz 文件加载 HHT 矩阵。
    """
    npz_files = glob.glob(os.path.join(data_dir, '*.npz'))
    
    X_list = []
    y_list = []
    filenames = []
    test_files = []
    
    # Create label encoder
    # 创建标签编码器
    all_classes = ['M_full', 'M_half', 'M_invalid', 'F_full', 'F_half', 'F_invalid']
    label_encoder = LabelEncoder()
    label_encoder.fit(all_classes)
    
    for npz_file in npz_files:
        labels = parse_filename(npz_file)
        
        if labels is None:
            test_files.append(npz_file)
            continue
        
        try:
            data = np.load(npz_file)
            if 'hht' in data:
                hht_matrix = data['hht']
            else:
                hht_matrix = data[list(data.keys())[0]]
            
            if hht_matrix.shape != (256, 256):
                continue
            
            # Create combined label
            # 创建组合标签
            combined = f"{labels['gender']}_{labels['movement']}"
            label = label_encoder.transform([combined])[0]
            
            X_list.append(hht_matrix)
            y_list.append(label)
            filenames.append(npz_file)
            
        except Exception as e:
            print(f'Error loading {npz_file}: {e}')
            continue
    
    X = np.array(X_list, dtype=np.float32)
    y = np.array(y_list)
    
    print(f'\nLoaded {len(X)} training samples | 加载了 {len(X)} 个训练样本')
    print(f'Found {len(test_files)} test files | 找到 {len(test_files)} 个测试文件')
    print(f'\nClass distribution | 类别分布:')
    for i, class_name in enumerate(label_encoder.classes_):
        count = np.sum(y == i)
        print(f'  {class_name}: {count} samples')
    
    return X, y, filenames, test_files, label_encoder

# Load data from Kaggle dataset
# 从 Kaggle 数据集加载数据
if os.path.exists(DATA_DIR):
    X, y, filenames, test_files, label_encoder = load_data_from_directory(DATA_DIR)
else:
    print(f'Data directory not found | 数据目录未找到: {DATA_DIR}')
    print('Please ensure the HILBERTMATRIX_NPZ dataset is added to this notebook.')
    print('请确保 HILBERTMATRIX_NPZ 数据集已添加到此笔记本。')

## 6. Real HHT Transformation (For Reference)

When you have real sEMG data, you can use the following functions to perform HHT transformation.

In [None]:
# Train the classifier | 训练分类器
from sklearn.model_selection import train_test_split

if 'X' in locals() and len(X) > 0:
    # Split data | 分割数据
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    print(f'Training set | 训练集: {X_train.shape[0]} samples')
    print(f'Validation set | 验证集: {X_val.shape[0]} samples')
    
    # Initialize classifier | 初始化分类器
    classifier = sEMGHHTClassifier(
        encoder=None,
        svm_kernel=SVM_KERNEL,
        svm_C=SVM_C,
        svm_gamma=SVM_GAMMA,
        device=device
    )
    
    # Train | 训练
    print('\n' + '='*60)
    print('Training SVM Classifier | 训练 SVM 分类器')
    print('='*60)
    classifier.fit(X_train, y_train, batch_size=32)
    
    # Evaluate on training set | 在训练集上评估
    train_results = classifier.evaluate(X_train, y_train, batch_size=32)
    print(f'\nTraining Accuracy | 训练准确率: {train_results["accuracy"]:.4f}')
    
    # Evaluate on validation set | 在验证集上评估
    val_results = classifier.evaluate(X_val, y_val, batch_size=32)
    print(f'Validation Accuracy | 验证准确率: {val_results["accuracy"]:.4f}')
    print('\nClassification Report | 分类报告:')
    print(val_results['classification_report'])
else:
    print('No data loaded. Please check the data directory.')
    print('未加载数据。请检查数据目录。')

In [None]:
def compute_hht_matrix(signal: np.ndarray, 
                       fs: float, 
                       matrix_size: int = 256,
                       max_imf: int = 10) -> np.ndarray:
    """
    Compute HHT (Hilbert-Huang Transform) matrix from a signal.
    
    This is a reference implementation. For real usage, you may need to install:
    pip install PyEMD scipy
    
    Args:
        signal: 1D input signal
        fs: Sampling frequency
        matrix_size: Output matrix size (matrix_size × matrix_size)
        max_imf: Maximum number of IMFs to extract
    
    Returns:
        HHT matrix of shape (matrix_size, matrix_size)
    """
    try:
        from PyEMD import EMD
        from scipy.signal import hilbert
    except ImportError:
        raise ImportError("Please install PyEMD and scipy: pip install PyEMD scipy")
    
    # Perform EMD
    emd = EMD()
    imfs = emd(signal, max_imf=max_imf)
    
    # Compute Hilbert transform for each IMF
    n_samples = len(signal)
    t = np.arange(n_samples) / fs
    
    # Initialize time-frequency matrix
    freq_bins = np.linspace(0, fs/2, matrix_size)
    time_bins = np.linspace(0, t[-1], matrix_size)
    hht_matrix = np.zeros((matrix_size, matrix_size))
    
    for imf in imfs:
        # Compute analytic signal
        analytic = hilbert(imf)
        amplitude = np.abs(analytic)
        phase = np.unwrap(np.angle(analytic))
        
        # Compute instantaneous frequency
        inst_freq = np.diff(phase) / (2 * np.pi) * fs
        inst_freq = np.concatenate([inst_freq, [inst_freq[-1]]])
        inst_freq = np.clip(inst_freq, 0, fs/2)
        
        # Map to time-frequency matrix
        for i, (ti, fi, ai) in enumerate(zip(t, inst_freq, amplitude)):
            t_idx = int(ti / t[-1] * (matrix_size - 1))
            f_idx = int(fi / (fs/2) * (matrix_size - 1))
            
            t_idx = np.clip(t_idx, 0, matrix_size - 1)
            f_idx = np.clip(f_idx, 0, matrix_size - 1)
            
            hht_matrix[f_idx, t_idx] += ai
    
    # Normalize
    if hht_matrix.max() > 0:
        hht_matrix = hht_matrix / hht_matrix.max()
    
    return hht_matrix.astype(np.float32)


# Example usage (uncomment when you have real data):
# signal = np.random.randn(1000)  # Replace with real sEMG signal
# fs = 1000  # Sampling frequency in Hz
# hht_matrix = compute_hht_matrix(signal, fs, matrix_size=256)
# plt.imshow(hht_matrix, aspect='auto', cmap='hot', origin='lower')
# plt.colorbar()
# plt.show()

---

# 5. Method 1: CNN+SVM Training | 方法一：CNN+SVM 训练

## Usage Instructions | 使用说明

**English**: This section shows how to use the traditional CNN+SVM approach. The CNN encoder is used for feature extraction only (weights frozen), and an SVM is trained on those features.

**中文**：本节展示如何使用传统的 CNN+SVM 方法。CNN 编码器仅用于特征提取（权重冻结），SVM 在这些特征上训练。

### Steps | 步骤:
1. Load and split data | 加载和分割数据
2. Initialize CNN+SVM classifier | 初始化 CNN+SVM 分类器
3. Train (one-shot, no epochs) | 训练（一次性，无需多轮）
4. Evaluate and save | 评估和保存

---

In [None]:
# Train the CNN+SVM classifier | 训练 CNN+SVM 分类器
from sklearn.model_selection import train_test_split

if 'X' in locals() and len(X) > 0:
    print('='*70)
    print('METHOD 1: CNN+SVM Training | 方法一：CNN+SVM 训练')
    print('='*70)
    
    # Split data | 分割数据
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    print(f'\nTraining set | 训练集: {X_train.shape[0]} samples')
    print(f'Validation set | 验证集: {X_val.shape[0]} samples')
    
    # Initialize classifier | 初始化分类器
    print('\n📦 Initializing CNN+SVM classifier...')
    svm_classifier = sEMGHHTClassifier(
        encoder=None,  # Will create default encoder
        svm_kernel=SVM_KERNEL,
        svm_C=SVM_C,
        svm_gamma=SVM_GAMMA,
        device=device
    )
    
    # Train | 训练
    print('\n🚀 Training SVM classifier (this is a one-shot process)...')
    print('训练 SVM 分类器（这是一次性过程）...')
    svm_classifier.fit(X_train, y_train, batch_size=32)
    
    # Evaluate on training set | 在训练集上评估
    print('\n📊 Evaluating on training set | 在训练集上评估...')
    train_results = svm_classifier.evaluate(X_train, y_train, batch_size=32)
    print(f'Training Accuracy | 训练准确率: {train_results["accuracy"]:.4f}')
    
    # Evaluate on validation set | 在验证集上评估
    print('\n📊 Evaluating on validation set | 在验证集上评估...')
    val_results = svm_classifier.evaluate(X_val, y_val, batch_size=32)
    print(f'Validation Accuracy | 验证准确率: {val_results["accuracy"]:.4f}')
    
    print('\n📋 Classification Report | 分类报告:')
    print(val_results['classification_report'])
    
    # Save model | 保存模型
    svm_model_path = os.path.join(CHECKPOINT_DIR, 'cnn_svm_model')
    save_svm_classifier(svm_classifier, svm_model_path)
    
    print(f'\n✅ CNN+SVM training complete!')
    print(f'Model saved to: {svm_model_path}_*')
    
else:
    print('⚠️  No data loaded. Please run the data loading cell first.')
    print('未加载数据。请先运行数据加载单元格。')

---

# 6. Method 2: End-to-End Training with Encoder Fine-tuning | 方法二：端到端编码器微调训练

## Usage Instructions | 使用说明

**English**: This section shows how to train the entire network end-to-end. Both the CNN encoder and the classification head are trained together, allowing the encoder to adapt to your specific data.

**中文**：本节展示如何端到端训练整个网络。CNN 编码器和分类头一起训练，允许编码器适应你的特定数据。

### Key Features | 关键特性:
- ✅ **Multi-epoch training** | 多轮训练
- ✅ **Automatic checkpointing** | 自动检查点保存
- ✅ **Resume from interruption** | 中断后恢复
- ✅ **Best model auto-save** | 最佳模型自动保存
- ✅ **Real-time progress** | 实时进度显示
- ✅ **Learning rate scheduling** | 学习率调度

### Training Process | 训练流程:
1. Initialize model | 初始化模型
2. Start training (or resume) | 开始训练（或恢复）
3. Monitor progress | 监控进度
4. Visualize results | 可视化结果

### Checkpoints | 检查点说明:
- `best_model.pt`: Best performing model (highest val accuracy) | 表现最佳的模型（最高验证准确率）
- `checkpoint_epoch_N.pt`: Saved every N epochs | 每 N 轮保存
- `final_model.pt`: Final model after all epochs | 所有轮次后的最终模型

---

In [None]:
# END-TO-END TRAINING - Initial Training | 端到端训练 - 初始训练
# Run this cell to start training from scratch
# 运行此单元格从头开始训练

if 'X' in locals() and len(X) > 0:
    print('='*70)
    print('METHOD 2: End-to-End Training | 方法二：端到端训练')
    print('='*70)
    
    # Determine number of classes
    n_classes = len(np.unique(y))
    print(f'\nNumber of classes | 类别数: {n_classes}')
    
    # Initialize end-to-end model | 初始化端到端模型
    print('\n📦 Initializing end-to-end model...')
    e2e_model = sEMGHHTEndToEndClassifier(
        n_classes=n_classes,
        in_channels=1,
        base_channels=MODEL_BASE_CHANNELS,
        num_encoder_layers=MODEL_NUM_ENCODER_LAYERS,
        dropout_rate=MODEL_DROPOUT_RATE
    )
    
    print(f'Model architecture | 模型架构:')
    print(f'  - Encoder: 3-layer CNN | 编码器：3 层 CNN')
    print(f'  - Feature dim: {e2e_model.encoder.get_feature_dim()}')
    print(f'  - Classifier: 2-layer FC | 分类器：2 层全连接')
    print(f'  - Output classes: {n_classes}')
    
    # Training configuration | 训练配置
    EPOCHS = TRAIN_EPOCHS  # Configured above | 在上面配置
    BATCH_SIZE = TRAIN_BATCH_SIZE
    LEARNING_RATE = TRAIN_LEARNING_RATE
    CHECKPOINT_INTERVAL = TRAIN_CHECKPOINT_INTERVAL  # Configured above | 在上面配置
    
    print(f'\n⚙️  Training Configuration | 训练配置:')
    print(f'  - Epochs | 训练轮数: {EPOCHS}')
    print(f'  - Batch size | 批次大小: {BATCH_SIZE}')
    print(f'  - Learning rate | 学习率: {LEARNING_RATE}')
    print(f'  - Checkpoint interval | 检查点间隔: {CHECKPOINT_INTERVAL} epochs')
    print(f'  - Device | 设备: {device}')
    
    # Start training | 开始训练
    print('\n🚀 Starting training... | 开始训练...')
    print('💡 Tip: Training can be interrupted (Ctrl+C) and resumed later')
    print('提示：训练可以中断（Ctrl+C）并稍后恢复\n')
    
    e2e_history = train_end_to_end_with_checkpointing(
        model=e2e_model,
        X_train=X_train,
        y_train=y_train,
        X_val=X_val,
        y_val=y_val,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        learning_rate=LEARNING_RATE,
        checkpoint_interval=CHECKPOINT_INTERVAL,
        checkpoint_dir=CHECKPOINT_DIR,
        resume_from=None,  # Set to None for initial training | 初始训练设为 None
        device=device
    )
    
    print('\n✅ Training completed successfully! | 训练成功完成！')
    
else:
    print('⚠️  No data loaded. Please run the data loading cell first.')
    print('未加载数据。请先运行数据加载单元格。')

In [None]:
# END-TO-END TRAINING - Resume from Checkpoint | 端到端训练 - 从检查点恢复
# Run this cell ONLY if you want to continue training from a checkpoint
# 仅在想要从检查点继续训练时运行此单元格

# Specify which checkpoint to resume from | 指定要恢复的检查点
# Options | 选项:
#   - 'best_model.pt': Resume from best model | 从最佳模型恢复
#   - 'checkpoint_epoch_10.pt': Resume from epoch 10 | 从第 10 轮恢复
#   - 'final_model.pt': Resume from final | 从最终模型恢复

RESUME_CHECKPOINT = os.path.join(CHECKPOINT_DIR, 'best_model.pt')  # Change this | 更改此处
ADDITIONAL_EPOCHS = 20  # How many more epochs to train | 再训练多少轮

if os.path.exists(RESUME_CHECKPOINT):
    print('='*70)
    print('RESUMING End-to-End Training | 恢复端到端训练')
    print('='*70)
    
    # Load checkpoint to check current epoch
    checkpoint = torch.load(RESUME_CHECKPOINT, map_location=device)
    current_epoch = checkpoint['epoch']
    best_val_acc = checkpoint.get('best_val_acc', 0.0)
    
    print(f'\n📂 Resuming from: {RESUME_CHECKPOINT}')
    print(f'   Last completed epoch: {current_epoch}')
    print(f'   Best val accuracy so far: {best_val_acc:.4f}')
    print(f'   Will train for {ADDITIONAL_EPOCHS} more epochs')
    
    # Reinitialize model (architecture must match!)
    n_classes = len(np.unique(y))
    e2e_model_resumed = sEMGHHTEndToEndClassifier(
        n_classes=n_classes,
        in_channels=1,
        base_channels=64,
        num_encoder_layers=3,
        dropout_rate=0.5
    )
    
    # Resume training
    print('\n🚀 Resuming training... | 恢复训练...\n')
    
    e2e_history_resumed = train_end_to_end_with_checkpointing(
        model=e2e_model_resumed,
        X_train=X_train,
        y_train=y_train,
        X_val=X_val,
        y_val=y_val,
        epochs=current_epoch + 1 + ADDITIONAL_EPOCHS,  # Total epochs
        batch_size=BATCH_SIZE,
        learning_rate=LEARNING_RATE,
        checkpoint_interval=CHECKPOINT_INTERVAL,
        checkpoint_dir=CHECKPOINT_DIR,
        resume_from=RESUME_CHECKPOINT,  # Resume from checkpoint
        device=device
    )
    
    print('\n✅ Resumed training completed! | 恢复训练完成！')
    e2e_history = e2e_history_resumed  # Update history variable
    
else:
    print(f'⚠️  Checkpoint not found: {RESUME_CHECKPOINT}')
    print('Please train the model first or check the checkpoint path.')
    print('请先训练模型或检查检查点路径。')

In [None]:
# Visualize Training History | 可视化训练历史

if 'e2e_history' in locals():
    print('📊 Plotting training history... | 绘制训练历史...\n')
    plot_training_history(e2e_history)
else:
    print('⚠️  No training history found. Train the model first.')
    print('未找到训练历史。请先训练模型。')

In [None]:
# Load and Evaluate Best Model | 加载和评估最佳模型

best_model_path = os.path.join(CHECKPOINT_DIR, 'best_model.pt')

if os.path.exists(best_model_path) and 'X_val' in locals() and 'y_val' in locals():
    print('='*70)
    print('Loading and Evaluating Best Model | 加载和评估最佳模型')
    print('='*70)
    
    # Load best model
    n_classes = len(np.unique(y))
    best_model = sEMGHHTEndToEndClassifier(
        n_classes=n_classes,
        in_channels=1,
        base_channels=64,
        num_encoder_layers=3,
        dropout_rate=0.5
    )
    
    checkpoint = torch.load(best_model_path, map_location=device)
    best_model.load_state_dict(checkpoint['model_state_dict'])
    best_model.to(device)
    best_model.eval()
    
    print(f'\n✅ Loaded best model from epoch {checkpoint["epoch"] + 1}')
    print(f'   Best validation accuracy: {checkpoint["best_val_acc"]:.4f}')
    
    # Evaluate on validation set
    X_val_tensor = torch.tensor(X_val[:, np.newaxis, :, :], dtype=torch.float32).to(device)
    y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(device)
    
    with torch.no_grad():
        outputs = best_model(X_val_tensor)
        _, predictions = outputs.max(1)
        accuracy = (predictions == y_val_tensor).float().mean().item()
    
    print(f'\n📊 Validation Set Performance | 验证集性能:')
    print(f'   Accuracy | 准确率: {accuracy:.4f}')
    
    # Classification report
    from sklearn.metrics import classification_report
    y_pred = predictions.cpu().numpy()
    print(f'\n📋 Classification Report | 分类报告:')
    if 'label_encoder' in locals():
        target_names = label_encoder.classes_
        print(classification_report(y_val, y_pred, target_names=target_names))
    else:
        print(classification_report(y_val, y_pred))
    
else:
    if not os.path.exists(best_model_path):
        print(f'⚠️  Best model not found at: {best_model_path}')
        print('Please train the model first.')
    else:
        print('⚠️  Validation data not found. Please load data first.')

In [None]:
# Run inference on test files | 对测试文件进行推理
if 'classifier' in locals() and len(test_files) > 0:
    print('\n' + '='*60)
    print('Running Inference on Test Files | 对测试文件进行推理')
    print('='*60)
    
    X_test_list = []
    valid_test_files = []
    
    for test_file in test_files[:10]:  # Limit to first 10 for demo
        try:
            data = np.load(test_file)
            if 'hht' in data:
                hht_matrix = data['hht']
            else:
                hht_matrix = data[list(data.keys())[0]]
            
            if hht_matrix.shape == (256, 256):
                X_test_list.append(hht_matrix)
                valid_test_files.append(test_file)
        except Exception as e:
            print(f'Error loading {test_file}: {e}')
    
    if len(X_test_list) > 0:
        X_test = np.array(X_test_list, dtype=np.float32)
        y_test_pred = classifier.predict(X_test, batch_size=32)
        y_test_proba = classifier.predict_proba(X_test, batch_size=32)
        
        svm_classes = classifier.svm.classes_
        
        print(f'\nPredictions for {len(valid_test_files)} test files:')
        print(f'前 {len(valid_test_files)} 个测试文件的预测结果：\n')
        for i, (filename, pred, proba) in enumerate(zip(valid_test_files, y_test_pred, y_test_proba)):
            pred_idx = np.where(svm_classes == pred)[0][0]
            class_name = label_encoder.classes_[pred]
            confidence = proba[pred_idx]
            print(f'{os.path.basename(filename)}: {class_name} (confidence: {confidence:.4f})')
else:
    print('No test files to process or classifier not trained.')
    print('没有测试文件或分类器未训练。')

## 7. Summary and Next Steps

This notebook provides:

1. **CNN Encoder Architecture**: 3-layer convolutional network with Instance Normalization and LeakyReLU
2. **SVM Classifier**: Multi-class classification using extracted CNN features
3. **End-to-End Model**: Optional fully trainable model with neural network classifier
4. **Real Data Training**: Load and train on actual HHT matrices from Kaggle dataset
5. **HHT Computation**: Reference implementation for real sEMG signals

### Hyperparameter Tuning:
- `base_channels`: Number of channels in first conv layer (default: 64)
- `svm_C`: SVM regularization parameter
- `svm_kernel`: SVM kernel type ('rbf', 'linear', 'poly')
- `learning_rate`: Learning rate for end-to-end training
- `dropout_rate`: Dropout rate in end-to-end model

In [None]:
print("\n" + "="*60)
print("sEMG-HHT CNN Classifier - Ready for Use")
print("="*60)
print(f"\nDevice: {device}")
print(f"Encoder feature dimension: {encoder.get_feature_dim()}")
print(f"Number of classes: {len(class_names)}")
print(f"\nClass names:")
for i, name in enumerate(class_names):
    print(f"  {i}: {name}")