In [6]:
import json
import os
from pathlib import Path
import pandas as pd

# One single user repetition signal pre processing

In [None]:
# Load the user1 training data 
# Define paths (relative to notebook location)
BASE_PATH = Path("..") / "EMG-EPN612 Dataset"
TRAINING_PATH = BASE_PATH / "trainingJSON"
TESTING_PATH = BASE_PATH / "testingJSON"

print(f"Training path exists: {TRAINING_PATH.exists()}")
print(f"Testing path exists: {TESTING_PATH.exists()}")
print(f"Base path: {BASE_PATH.resolve()}")

# Load user1 JSON file
def load_user_json(split_path, user_folder):
    """Load a user's JSON file"""
    json_file = split_path / user_folder / f"{user_folder}.json"
    with open(json_file, 'r') as f:
        data = json.load(f)
    return data

# Load user1 from training split
user1_training_data = load_user_json(TRAINING_PATH, "user1")

# Load just idx_1 from trainingSamples
idx_1_sample = user1_training_data['trainingSamples']['idx_1']

gesture_name = idx_1_sample.get('gestureName')
print(f"\n✓ Loaded idx_1 sample")
print(f"  Gesture: {gesture_name}")
print(f"  Sample contains: {list(idx_1_sample.keys())}")

Training path exists: True
Testing path exists: True
Base path: C:\Users\antol\Documents\Documenti\UNI_sant anna\data mining\EMG-EPN612-PROJECT\EMG-EPN612 Dataset


## Extract EMG and IMU Raw Data

In [None]:
import numpy as np
from scipy import signal
from scipy.fft import fft, fftfreq
from scipy.stats import entropy

# Extract raw data
emg_channels = idx_1_sample['emg']
gyroscope_data = idx_1_sample['gyroscope']
accelerometer_data = idx_1_sample['accelerometer']

# Get sampling rates
general_info = user1_training_data.get('generalInfo', {})
sampling_rate_emg = general_info.get('samplingFrequencyInHertz', 200)
sampling_rate_imu = 50  # Standard IMU sampling rate

print(f"✓ Raw Data Extracted:")
print(f"\n  EMG Channels: {list(emg_channels.keys())}")
for ch_name, ch_data in emg_channels.items():
    ch_array = np.array(ch_data)
    print(f"    {ch_name}: {len(ch_data)} samples | min={np.min(ch_array):.2f}, max={np.max(ch_array):.2f}, mean={np.mean(ch_array):.2f}")

print(f"\n  IMU Data:")
print(f"    Gyroscope (50 Hz): {list(gyroscope_data.keys())}")
print(f"    Accelerometer (50 Hz): {list(accelerometer_data.keys())}")
print(f"\n  Sampling Rates:")
print(f"    EMG: {sampling_rate_emg} Hz")
print(f"    IMU: {sampling_rate_imu} Hz")


## Signal Preprocessing: Filtering and Normalization

## Visualize Preprocessed EMG Signals in Time Domain

In [None]:
import matplotlib.pyplot as plt

# Create time axis (in seconds)
time_axis = np.arange(len(normalized_emg['ch1'])) / sampling_rate_emg

# Plot 1: Individual subplots for each channel
fig, axes = plt.subplots(4, 2, figsize=(14, 10))
fig.suptitle(f'EMG Signals - All 8 Channels (Normalized)\nGesture: {gesture_name}', fontsize=14, fontweight='bold')

channel_names = list(emg_channels.keys())
for idx, ch_name in enumerate(channel_names):
    row = idx // 2
    col = idx % 2
    ax = axes[row, col]
    
    # Plot signal
    ax.plot(time_axis, normalized_emg[ch_name], linewidth=1, color='steelblue')
    ax.set_xlabel('Time (seconds)', fontsize=10)
    ax.set_ylabel('Normalized Amplitude', fontsize=10)
    ax.set_title(f'{ch_name}', fontsize=11, fontweight='bold')
    ax.grid(True, alpha=0.3)
    ax.set_xlim([0, time_axis[-1]])

plt.tight_layout()
plt.show()

print(f"✓ Signal visualization complete")
print(f"  Time duration: {time_axis[-1]:.2f} seconds")
print(f"  Sampling rate: {sampling_rate_emg} Hz")
print(f"  Total samples per channel: {len(time_axis)}")

# Plot 2: Overlay all channels for comparison
fig, ax = plt.subplots(figsize=(14, 6))

colors = plt.cm.tab10(np.linspace(0, 1, 8))
for idx, ch_name in enumerate(channel_names):
    ax.plot(time_axis, normalized_emg[ch_name], label=ch_name, linewidth=1.5, alpha=0.8, color=colors[idx])

ax.set_xlabel('Time (seconds)', fontsize=11)
ax.set_ylabel('Normalized Amplitude', fontsize=11)
ax.set_title(f'All EMG Channels Overlay - {gesture_name}', fontsize=13, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend(loc='upper right', ncol=2, fontsize=10)
ax.set_xlim([0, time_axis[-1]])

plt.tight_layout()
plt.show()

print(f"\n✓ All channels overlay visualization complete")

In [None]:
# Print detailed signal information for each channel
print(f"\n{'='*100}")
print(f"DETAILED SIGNAL INFORMATION FOR EACH CHANNEL")
print(f"{'='*100}")

signal_info = []
for ch_name in channel_names:
    ch_data = normalized_emg[ch_name]
    signal_info.append({
        'Channel': ch_name,
        'Min': np.min(ch_data),
        'Max': np.max(ch_data),
        'Mean': np.mean(ch_data),
        'Std': np.std(ch_data),
        'RMS': np.sqrt(np.mean(ch_data**2)),
        'Peak-to-Peak': np.max(ch_data) - np.min(ch_data)
    })

signal_df = pd.DataFrame(signal_info)
print(signal_df.to_string(index=False))

print(f"\n{'='*100}")
print(f"SIGNAL CHARACTERISTICS:")
print(f"{'='*100}")
print(f"Gesture: {gesture_name}")
print(f"Duration: {time_axis[-1]:.3f} seconds")
print(f"Sampling Rate: {sampling_rate_emg} Hz")
print(f"Total Samples per Channel: {len(time_axis)}")
print(f"Time Resolution: {1/sampling_rate_emg*1000:.2f} ms")

In [None]:
def preprocess_emg(emg_signal, fs=200, lowcut=20, highcut=500):
    """
    Preprocess EMG signal: bandpass filtering + normalization
    """
    # Bandpass filter (20-500 Hz)
    nyquist = fs / 2
    low = lowcut / nyquist
    high = highcut / nyquist
    
    low = max(0.001, min(low, 0.999))
    high = max(low + 0.001, min(high, 0.999))
    
    b, a = signal.butter(4, [low, high], btype='band')
    filtered = signal.filtfilt(b, a, emg_signal)
    
    # Z-score normalization
    normalized = (filtered - np.mean(filtered)) / (np.std(filtered) + 1e-8)
    
    return filtered, normalized

# Preprocess all 8 channels
preprocessed_emg = {}
normalized_emg = {}

for ch_name, ch_data in emg_channels.items():
    filtered, normalized = preprocess_emg(np.array(ch_data))
    preprocessed_emg[ch_name] = filtered
    normalized_emg[ch_name] = normalized

print(f"✓ EMG Preprocessing Complete:")
print(f"  - Bandpass filter: 20-500 Hz")
print(f"  - Z-score normalization applied")
print(f"  - All 8 channels processed")

# Show comparison
ch1_original = np.array(emg_channels['ch1'])
ch1_normalized = normalized_emg['ch1']
print(f"\n  Channel 1 (ch1) - Before vs After:")
print(f"    Original:   Mean={np.mean(ch1_original):.4f}, Std={np.std(ch1_original):.4f}")
print(f"    Normalized: Mean={np.mean(ch1_normalized):.6f}, Std={np.std(ch1_normalized):.4f}")

## Feature Extraction from EMG Signals

In [None]:
def extract_emg_features(signal_data, fs=200):
    """
    Extract temporal and spectral features from EMG signal
    """
    features = {}
    sig = np.array(signal_data)
    
    # Temporal Features
    features['mean'] = np.mean(sig)
    features['std'] = np.std(sig)
    features['var'] = np.var(sig)
    features['rms'] = np.sqrt(np.mean(sig**2))
    features['min'] = np.min(sig)
    features['max'] = np.max(sig)
    features['peak_to_peak'] = np.max(sig) - np.min(sig)
    features['waveform_length'] = np.sum(np.abs(np.diff(sig)))
    features['arc_length'] = np.sum(np.sqrt(1 + np.diff(sig)**2))
    
    # Spectral Features
    fft_vals = np.abs(fft(sig))
    fft_freqs = fftfreq(len(sig), 1/fs)[:len(sig)//2]
    fft_vals = fft_vals[:len(sig)//2]
    
    if np.sum(fft_vals) > 0:
        features['mean_freq'] = np.sum(fft_freqs * fft_vals) / np.sum(fft_vals)
        cumsum = np.cumsum(fft_vals)
        median_idx = np.argmin(np.abs(cumsum - cumsum[-1] / 2))
        features['median_freq'] = fft_freqs[median_idx]
        normalized_psd = fft_vals / np.sum(fft_vals)
        features['spectral_entropy'] = entropy(normalized_psd + 1e-10)
    else:
        features['mean_freq'] = 0
        features['median_freq'] = 0
        features['spectral_entropy'] = 0
    
    # Shannon Entropy
    sig_normalized = (sig - np.min(sig)) / (np.max(sig) - np.min(sig) + 1e-10)
    hist, _ = np.histogram(sig_normalized, bins=20)
    hist = hist / np.sum(hist)
    features['shannon_entropy'] = entropy(hist)
    
    return features

# Extract features for all 8 channels
emg_features = {}
for ch_name, ch_data in normalized_emg.items():
    emg_features[ch_name] = extract_emg_features(ch_data)

print(f"✓ EMG Features Extracted for All 8 Channels")
print(f"\n  Example - Channel 1 (ch1) Features:")
for i, (feature_name, value) in enumerate(list(emg_features['ch1'].items())[:5]):
    print(f"    {feature_name}: {value:.6f}")
print(f"    ... (and {len(emg_features['ch1']) - 5} more features)")

print(f"\n  Total features per channel: {len(emg_features['ch1'])}")

## Feature Extraction from IMU Sensors (Gyroscope & Accelerometer)

In [None]:
def extract_imu_features(x_data, y_data, z_data):
    """
    Extract features from IMU data (gyroscope or accelerometer)
    """
    features = {}
    
    x = np.array(x_data)
    y = np.array(y_data)
    z = np.array(z_data)
    
    axes = {'x': x, 'y': y, 'z': z}
    
    # Features for each axis
    for axis_name, axis_data in axes.items():
        features[f'{axis_name}_mean'] = np.mean(axis_data)
        features[f'{axis_name}_std'] = np.std(axis_data)
        features[f'{axis_name}_var'] = np.var(axis_data)
        features[f'{axis_name}_min'] = np.min(axis_data)
        features[f'{axis_name}_max'] = np.max(axis_data)
    
    # Magnitude (combined 3D vector)
    magnitude = np.sqrt(x**2 + y**2 + z**2)
    features['magnitude_mean'] = np.mean(magnitude)
    features['magnitude_std'] = np.std(magnitude)
    features['magnitude_max'] = np.max(magnitude)
    
    return features

# Extract Gyroscope features
gyro_features = extract_imu_features(
    gyroscope_data['x'],
    gyroscope_data['y'],
    gyroscope_data['z']
)

# Extract Accelerometer features
accel_features = extract_imu_features(
    accelerometer_data['x'],
    accelerometer_data['y'],
    accelerometer_data['z']
)

print(f"✓ IMU Features Extracted")
print(f"\n  Gyroscope Features ({len(gyro_features)} total):")
for feature_name in list(gyro_features.keys())[:6]:
    print(f"    - {feature_name}")
print(f"    ... (and {len(gyro_features) - 6} more)")

print(f"\n  Accelerometer Features ({len(accel_features)} total):")
for feature_name in list(accel_features.keys())[:6]:
    print(f"    - {feature_name}")
print(f"    ... (and {len(accel_features) - 6} more)")

## Combine All Features into Feature Vector

In [None]:
# Combine all features into a single feature dictionary
all_features = {}

# Add EMG features from all 8 channels
for ch_name, ch_features in emg_features.items():
    for feature_name, value in ch_features.items():
        all_features[f'emg_{ch_name}_{feature_name}'] = value

# Add Gyroscope features
for feature_name, value in gyro_features.items():
    all_features[f'gyro_{feature_name}'] = value

# Add Accelerometer features
for feature_name, value in accel_features.items():
    all_features[f'accel_{feature_name}'] = value

# Create feature vector
feature_names = list(all_features.keys())
feature_values = list(all_features.values())

# Create DataFrame with all features
feature_df = pd.DataFrame([all_features])
feature_df['gesture'] = gesture_name
feature_df['user'] = 'user1'
feature_df['sample_id'] = 'idx_1'

print(f"✓ All Features Combined")
print(f"\n  Feature Summary:")
print(f"    EMG Features: {sum(1 for f in feature_names if f.startswith('emg_'))}\"")
print(f"    Gyroscope Features: {sum(1 for f in feature_names if f.startswith('gyro_'))}\"")
print(f"    Accelerometer Features: {sum(1 for f in feature_names if f.startswith('accel_'))}\"")
print(f"    Total Features: {len(feature_names)}\"")

print(f"\n  Feature Vector Shape: {feature_df.shape}\"")
print(f"  Metadata:\"")
print(f\"    Gesture: {feature_df['gesture'].values[0]}\"")
print(f\"    User: {feature_df['user'].values[0]}\"")
print(f\"    Sample ID: {feature_df['sample_id'].values[0]}\"")

print(f"\n  First 10 Features:\"")
print(feature_df.iloc[:, :10])")

## Save Preprocessed Data

In [None]:
# Create output directory if it doesn't exist
output_dir = Path("../preprocessed_data")
output_dir.mkdir(exist_ok=True)

# Save feature dataframe as CSV
csv_path = output_dir / "idx_1_features.csv"
feature_df.to_csv(csv_path, index=False)

# Also save as NumPy arrays for efficient loading
np.save(output_dir / "idx_1_feature_values.npy", feature_values)
np.save(output_dir / "idx_1_feature_names.npy", feature_names, allow_pickle=True)

# Save preprocessed EMG signals
emg_array = np.array([normalized_emg[ch] for ch in emg_channels.keys()])
np.save(output_dir / "idx_1_emg_normalized.npy", emg_array)

print(f"✓ Preprocessed Data Saved")
print(f"\n  Output Directory: {output_dir.resolve()}")
print(f"  Files saved:")
print(f"    - idx_1_features.csv (Feature vector with metadata)")
print(f"    - idx_1_feature_values.npy (NumPy feature values)")
print(f"    - idx_1_feature_names.npy (Feature names)")
print(f"    - idx_1_emg_normalized.npy (8-channel EMG data)")

# Display summary statistics
print(f"\n  Feature Statistics:")
feature_values_array = np.array(feature_values)
print(f"    Mean: {np.mean(feature_values_array):.6f}")
print(f"    Std: {np.std(feature_values_array):.6f}")
print(f"    Min: {np.min(feature_values_array):.6f}")
print(f"    Max: {np.max(feature_values_array):.6f}")

print(f"\n✓ PREPROCESSING COMPLETE - Ready for ML models!")