In [3]:
import os
import numpy as np
from sklearn.model_selection import train_test_split

def load_data(root_path, test_size=0.2, random_state=42):
    """Loads gesture dataset and splits into train/test sets."""
    data_path = os.path.join(root_path, "processed_gesture_data.npy")
    label_path = os.path.join(root_path, "gesture_labels.npy")
    
    if not os.path.exists(data_path) or not os.path.exists(label_path):
        raise FileNotFoundError("Processed gesture data or labels not found. Please run process_all_gesture_files first.")
    
    print("Loading processed data...")
    X = np.load(data_path, allow_pickle=True)
    y = np.load(label_path, allow_pickle=True)
    
    print(f"Data shape: {X.shape}, Labels shape: {y.shape}")
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state, stratify=y)
    
    print(f"Training set: {X_train.shape}, Testing set: {X_test.shape}")
    
    return X_train, X_test, y_train, y_test

def extract_emg_features(signal, fs=1000):
    """Extracts features from an EMG signal (100 time steps)."""
    mav = np.mean(np.abs(signal))  # Mean Absolute Value
    rms = np.sqrt(np.mean(signal**2))  # Root Mean Square
    var = np.var(signal)  # Variance
    zc = np.sum(np.diff(np.sign(signal)) != 0)  # Zero Crossing Count
    wl = np.sum(np.abs(np.diff(signal)))  # Waveform Length

    # Frequency-domain features using FFT
    fft_vals = np.fft.rfft(signal)
    freqs = np.fft.rfftfreq(len(signal), d=1/fs)
    power = np.abs(fft_vals)**2
    total_power = np.sum(power)
    mean_freq = np.sum(freqs * power) / total_power if total_power else 0

    cumsum_power = np.cumsum(power)
    median_freq = freqs[np.where(cumsum_power >= total_power/2)[0][0]] if total_power else 0

    return np.array([mav, rms, var, zc, wl, mean_freq, median_freq])

def synthesize_time_series(features, num_timesteps=100):
    """
    Generates a synthetic time series of length `num_timesteps` from extracted features.
    Uses Gaussian noise centered at the mean feature values.
    """
    synthesized_signal = np.zeros((num_timesteps,))
    
    for i, feature in enumerate(features):
        synthesized_signal += feature * np.sin(2 * np.pi * (i + 1) * np.linspace(0, 1, num_timesteps))
    
    return synthesized_signal + np.random.normal(0, 0.05, num_timesteps)  # Add small noise

def replace_emg_with_synthetic_data(X, fs=1000):
    """
    Replaces the first 4 EMG channels in each window with synthesized time series
    generated from extracted features, while keeping the last 6 IMU channels unchanged.

    Parameters:
    - X: Shape (num_samples, num_windows, num_timesteps, num_channels)
    - fs: Sampling frequency

    Returns:
    - X_new: Same shape as X, but with EMG channels replaced by synthetic features
    """
    num_samples, num_windows, num_timesteps, num_channels = X.shape
    new_X = np.copy(X)  # Keep the original structure

    for i in range(num_samples):
        for j in range(num_windows):
            for ch in range(4):  # Replace only the first 4 EMG channels
                features = extract_emg_features(X[i, j, :, ch], fs)
                new_X[i, j, :, ch] = synthesize_time_series(features, num_timesteps)

    return new_X

# Load original data
data_folder = r"new_collect"  
X_train, X_test, y_train, y_test = load_data(data_folder)

# Replace EMG channels with synthesized time series
X_train_new = replace_emg_with_synthetic_data(X_train, fs=1000)
X_test_new = replace_emg_with_synthetic_data(X_test, fs=1000)

print(f"New training data shape: {X_train_new.shape}")
print(f"New testing data shape: {X_test_new.shape}")


Loading processed data...
Data shape: (1724, 19, 100, 10), Labels shape: (1724,)
Training set: (1379, 19, 100, 10), Testing set: (345, 19, 100, 10)
New training data shape: (1379, 19, 100, 10)
New testing data shape: (345, 19, 100, 10)


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, TimeDistributed, Conv1D, Flatten
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization, GlobalAveragePooling1D

X_train_new = np.nan_to_num(X_train_new, nan=0.0)
X_test_new = np.nan_to_num(X_test_new, nan=0.0)
y_train = np.nan_to_num(y_train, nan = 0.0)
y_test = np.nan_to_num(y_test, nan = 0.0)
# **获取数据形状**
num_batches = X_train_new.shape[0]  # batch 维度
num_windows = X_train_new.shape[1]  # 时间步（窗口数 59）
num_features = X_train_new.shape[2]  # 特征数（15）
num_channels = X_train_new.shape[3]  # 通道数（1）

# **检测类别数量**
unique_classes = np.unique(y_train)
num_classes = len(unique_classes)  # 确保类别数正确

print(f"Corrected Classes: {num_classes}, Batches: {num_batches}, Windows: {num_windows}, Features: {num_features}, Channels: {num_channels}")

# **保持 X 形状**
X_train_new = X_train_new.reshape(num_batches, num_windows, num_features, num_channels)
X_test_new = X_test_new.reshape(X_test_new.shape[0], num_windows, num_features, num_channels)
X_train_new = X_train_new[:,:,:,:]
X_test_new = X_test_new[:,:,:,:]

# print(X_train_new.shape)

num_features = X_train_new.shape[2] 
num_channels = X_train_new.shape[3]
# **标签编码**
encoder = LabelEncoder()
y_train = encoder.fit_transform(y_train)  # 转换成 0,1,2
y_test = encoder.transform(y_test)

# **转换为 One-Hot**
y_train = to_categorical(y_train, num_classes=num_classes)
y_test = to_categorical(y_test, num_classes=num_classes)

print(f"X_train_new shape: {X_train_new.shape}, y_train shape: {y_train.shape}")  # (batch, 59, 15, 1) (batch, 3)
print(f"X_test_new shape: {X_test_new.shape}, y_test shape: {y_test.shape}")  # (batch, 59, 15, 1) (batch, 3)


X_train_new = X_train_new.reshape(X_train_new.shape[0], X_train_new.shape[1], -1)  # (batch_size, 29, 40)
X_test_new = X_test_new.reshape(X_test_new.shape[0], X_test_new.shape[1], -1)  

# # print(num_features)
# # **构建 LSTM 处理通道的模型**

model = Sequential([
    Conv1D(filters=64, kernel_size=3, activation='relu', padding='same', input_shape=(num_windows, num_features * num_channels)),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Conv1D(filters=128, kernel_size=3, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Conv1D(filters=256, kernel_size=3, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(64, activation='relu'),
    Dropout(0.3),
    Dense(num_classes, activation='softmax')  # 多分类输出
])

from tensorflow.keras.callbacks import LearningRateScheduler
def lr_schedule(epoch, lr):
    if epoch % 10 == 0 and epoch != 0:
        return lr * 0.9  # 每 10 个 epoch，学习率减半
    return lr

# **编译模型**
model.compile(
    optimizer=Adam(learning_rate=0.001),  # 初始学习率
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# **使用回调函数**
lr_callback = LearningRateScheduler(lr_schedule)

# **训练时加上 callback**
history = model.fit(X_train_new, y_train, epochs=100, callbacks=[lr_callback], validation_data=(X_test_new, y_test))


# **测试模型**
y_pred = model.predict(X_test_new)
y_pred_classes = np.argmax(y_pred, axis=1)
y_test_classes = np.argmax(y_test, axis=1)

# **计算准确率**
accuracy = accuracy_score(y_test_classes, y_pred_classes)
print(f"Test Accuracy: {accuracy:.4f}")

# **保存模型**
model.save("cnn_emg_model.h5")
print("Model saved as cnn_emg_model.h5")
# Test Accuracy: 0.9826
# Gt: Test Accuracy: 0.9391

# 2 datasets:
# Test Accuracy: 0.9524

# 3 datasets:
# Test Accuracy: 0.9304

Corrected Classes: 23, Batches: 1379, Windows: 19, Features: 100, Channels: 10
X_train_new shape: (1379, 19, 100, 10), y_train shape: (1379, 23)
X_test_new shape: (345, 19, 100, 10), y_test shape: (345, 23)
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/