In [3]:
import os
import pandas as pd

# Cleaning Data

In [1]:
master_folder = "EMG_data_for_gestures-master"
cleaned_data_by_subject = {}

In [5]:
# Loop through each subject's folder
for subject_folder in sorted(os.listdir(master_folder)):
    subject_path = os.path.join(master_folder, subject_folder)

    if os.path.isdir(subject_path):  # Check if it's a directory
        subject_number = int(subject_folder)  # Convert subject folder name to an integer
        
        trials = []
        
        for file_name in sorted(os.listdir(subject_path)):
            if file_name.endswith('.txt'):
                file_path = os.path.join(subject_path, file_name)
                
                # Load the .txt file with proper handling for mixed types and whitespace delimiter
                df = pd.read_csv(file_path, sep='\s+', dtype=str)  # Load as strings to avoid dtype issues

                # Drop 'time' column
                df.drop('time', inplace=True, axis=1)
                
                # Drop rows where 'class' column has NaNs or non-numeric values
                df = df[df["class"].notna() & df["class"].str.isnumeric()]
                
                # Convert 'class' column to integer
                df["class"] = df["class"].astype(int)
                
                # Keep only relevant classes (1 to 6) and drop others
                df = df[df["class"].isin([1, 2, 3, 4, 5, 6])]
                
                # Append the cleaned trial data to the list
                trials.append(df)
        
        # Concatenate trials for the current subject
        subject_data = pd.concat(trials, axis=0).reset_index(drop=True)
        
        # Store the cleaned data for the subject
        cleaned_data_by_subject[subject_number] = subject_data

print(f"Loaded and cleaned data for {len(cleaned_data_by_subject)} subjects.")

Loaded and cleaned data for 36 subjects.


# Segment Data by Subject

In [6]:
segmented_data_by_subject = {}

# Loop through each subject's cleaned data
for subject_number, subject_data in cleaned_data_by_subject.items():
    segmented_trials = []  # Store segmented data for all trials of this subject

    # Sort trials by 'trial' column if applicable
    if 'trial' in subject_data.columns:
        trials = subject_data.groupby('trial')
    else:
        trials = [(None, subject_data)]  # Single trial case

    # Process each trial separately
    for trial_id, trial_data in trials:
        trial_segments = []  # Store all segments for this trial

        # Identify gesture segments by detecting label changes
        current_label = trial_data.iloc[0]['class']  # Start with the first label
        start_index = 0  # Index for the beginning of the segment

        for i in range(1, len(trial_data)):
            if trial_data.iloc[i]['class'] != current_label:
                # Extract the segment for the current label
                segment = trial_data.iloc[start_index:i]
                trial_segments.append(segment)

                # Update the current label and start index
                current_label = trial_data.iloc[i]['class']
                start_index = i

        # Capture the last segment
        segment = trial_data.iloc[start_index:]
        trial_segments.append(segment)

        # Add the segmented data for this trial
        segmented_trials.extend(trial_segments)

    # Store segmented data for the subject
    segmented_data_by_subject[subject_number] = segmented_trials

print(f"Segmented data for {len(segmented_data_by_subject)} subjects.")

Segmented data for 36 subjects.


# Sliding Window

In [8]:
import numpy as np

In [9]:
def extract_sliding_windows(segmented_data, window_size, stride):
    """
    Extracts sliding windows from segmented data.

    Args:
        segmented_data (dict): Segmented data by subject.
        window_size (int): Number of samples in each window.
        stride (int): Step size between consecutive windows.

    Returns:
        list: A list of tuples where each tuple is (window_data, window_label).
    """
    sliding_windows = []

    for subject, trials in segmented_data.items():
        # print(f"Processing Subject {subject}...")
        
        for segment in trials:
            segment_label = segment['class'].iloc[0]  # The class label for the entire segment
            segment_data = segment.drop(columns=['class']).values  # Drop the label column
            
            # Generate windows from this segment
            for start_idx in range(0, len(segment_data) - window_size + 1, stride):
                window = segment_data[start_idx:start_idx + window_size]
                sliding_windows.append((window, segment_label))
    
    print(f"Extracted {len(sliding_windows)} windows.")
    return sliding_windows

In [10]:
# Parameters for sliding windows
window_size = 500  # e.g., 100 samples per window
stride = 100        # e.g., 50-sample step size

# Extract sliding windows from segmented data
sliding_windows = extract_sliding_windows(segmented_data_by_subject, window_size, stride)

# Inspect the number of windows
print(f"Total sliding windows: {len(sliding_windows)}")

# Example: Shape of one window and its label
example_window, example_label = sliding_windows[0]
print(f"Window shape: {example_window.shape}, Label: {example_label}")

Extracted 11105 windows.
Total sliding windows: 11105
Window shape: (500, 8), Label: 1


In [None]:
# Extract data and labels from sliding windows
X = np.array([window for window, _ in sliding_windows])  # Shape: (num_windows, window_size, num_channels)
y = np.array([label for _, label in sliding_windows])   # Shape: (num_windows,)

# Feature Extraction with TSFEL

In [None]:
pip install tsfel

In [None]:
import pandas as pd
import numpy as np
import tsfel

# Initialize an empty list to store the extracted features
X_features = []

cfg = tsfel.get_features_by_domain()

for domain in cfg.keys():
    for feature in list(cfg[domain].keys()):
        if feature not in [
            "Mean",
            "Standard deviation",
            "Variance",
            "Waveform Length",
            "Root Mean Square",
            "Zero Crossing",
            "Slope Sign Changes",
            "Integrated EMG",
            "Skewness",
            "Kurtosis",
            "Spectral Power",
            "Mean Frequency",
            "Median Frequency",
        ]:
            del cfg[domain][feature]

# Iterate through each sliding window
for window in X:
    # For each channel in the sliding window, extract features
    features_per_channel = []
    for channel in range(window.shape[1]):  # Assuming (window_size, num_channels)
        channel_data = window[:, channel]  # Extract one channel's data
        # Convert to DataFrame (required by TSFEL)
        channel_df = pd.DataFrame(channel_data, columns=[f'channel_{channel}'])
        # Extract features for the channel
        channel_features = tsfel.time_series_features_extractor(cfg, channel_df, verbose=0)
        features_per_channel.append(channel_features.values.flatten())  # Flatten the extracted features

    # Concatenate features from all channels for this window
    window_features = np.concatenate(features_per_channel)
    X_features.append(window_features)

# Convert the list of features to a NumPy array
X_features = np.array(X_features)  # Shape: (num_windows, num_features_per_window)


# Prepare data for SVM

In [21]:
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.svm import SVC
from sklearn.metrics import classification_report
from sklearn.preprocessing import StandardScaler

# Assuming `X_features` is already created by TSFEL feature extraction
# Split into training (70%), validation (15%), and test (15%)
X_train, X_temp, y_train, y_temp = train_test_split(X_features, y, test_size=0.3, stratify=y, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42)

print(f"Training set: {X_train.shape}, Validation set: {X_val.shape}, Test set: {X_test.shape}")

# Convert training and testing data to float
X_train = X_train.astype(float)
X_val = X_val.astype(float)
X_test = X_test.astype(float)

# Zero-index the class labels by subtracting 1
y_train -= 1
y_val -= 1
y_test -= 1

print("Class labels have been zero-indexed.")

# Standardize features across the entire training set (per feature/channel)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

print("Data standardized.")

Training set: (7773, 40), Validation set: (1666, 40), Test set: (1666, 40)
Class labels have been zero-indexed.
Data standardized.


# Train SVM

In [22]:
# Train an SVM model
svm = SVC(kernel='rbf')  # You can adjust the kernel as needed
svm.fit(X_train, y_train)

# Make predictions and evaluate
y_pred = svm.predict(X_test)
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

           0       0.98      1.00      0.99       278
           1       0.96      0.97      0.97       268
           2       0.90      0.93      0.91       277
           3       0.91      0.95      0.93       281
           4       0.95      0.93      0.94       280
           5       0.95      0.88      0.91       282

    accuracy                           0.94      1666
   macro avg       0.94      0.94      0.94      1666
weighted avg       0.94      0.94      0.94      1666



In [27]:
# Evaluate the model and get the accuracy
test_accuracy = svm.score(X_test, y_test)
print(f"Test Accuracy: {test_accuracy}")

Test Accuracy: 0.9417767106842737


In [28]:
from sklearn.metrics import hinge_loss

# Predict decision function values for test data
y_test_pred = svm.decision_function(X_test)

# Calculate hinge loss on the test set
test_loss = hinge_loss(y_test, y_test_pred)
print(f"Test Loss (Hinge Loss): {test_loss}")

Test Loss (Hinge Loss): 0.1252099935865576


In [29]:
# Evaluate the model and get the accuracy
val_accuracy = svm.score(X_val, y_val)
print(f"Validation Accuracy: {val_accuracy}")

Validation Accuracy: 0.9339735894357744


In [30]:
# Predict decision function values for test data
y_val_pred = svm.decision_function(X_val)

# Calculate hinge loss on the validation set
val_loss = hinge_loss(y_val, y_val_pred)
print(f"Validation Loss (Hinge Loss): {val_loss}")

Validation Loss (Hinge Loss): 0.14549018187655222
