### Tahap 1: Import Libraries

In [None]:
%pip install tensorflow

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.signal import filtfilt, butter, iirnotch
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import confusion_matrix, accuracy_score, roc_curve, auc, classification_report
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv3D, MaxPooling3D, Flatten, Dense, concatenate, Dropout
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split, StratifiedKFold
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import confusion_matrix, accuracy_score, roc_curve, auc
from sklearn.preprocessing import label_binarize, LabelEncoder, OneHotEncoder

import warnings
warnings.filterwarnings("ignore")

### Tahap 2: Memuat Dataset dan Informasi Awal

In [None]:
# Memuat dataset
data = pd.read_csv("../Datasets/EMG-data.csv")

In [None]:
# Melihat data
data.head()

In [None]:
# Melihat informasi data
data.info()

In [None]:
# Melihat statistik deskriptif data
data.describe()

In [None]:
# Memisahkan dataset berdasarkan kelas hand gesture
gesture_datasets = {}
for gesture_class in range(8):
    gesture_datasets[gesture_class] = data[data['class'] == gesture_class]

# Menampilkan jumlah data pada masing-masing kelas
for gesture_class in gesture_datasets:
    print(f"Kelas {gesture_class} memiliki {gesture_datasets[gesture_class].shape[0]} data")

### Tahap 3: Filtering

#### Exponential Filtering

In [None]:
# Exponential filter function for 1D array
def exponential_filter(data, alpha=0.5):
    filtered_data = np.zeros(data.shape)
    if len(data) > 0:
        filtered_data[0] = data[0]
        for t in range(1, len(data)):
            filtered_data[t] = alpha * data[t] + (1 - alpha) * filtered_data[t-1]
    return filtered_data

# Apply exponential filter to each EMG channel in the dataframe
def apply_exponential_filter_to_df(df, emg_channels, alpha=0.5):
    filtered_df = df.copy()
    for channel in emg_channels:
        filtered_df[channel] = exponential_filter(df[channel].values, alpha)
    return filtered_df

# Define EMG channels
emg_channels = data.columns[1:9]  # Columns for channels 1-8

# Apply exponential filter to each subset of the dataset
filtered_gesture_datasets = {}
for gesture_class in gesture_datasets:
    filtered_gesture_datasets[gesture_class] = apply_exponential_filter_to_df(gesture_datasets[gesture_class], emg_channels, alpha=0.5)

# Combine filtered data back into one dataset
filtered_data = pd.concat(filtered_gesture_datasets.values(), ignore_index=True)

# Display first few rows of filtered data
print(filtered_data.head())

In [None]:
# Plotting function
def plot_filtered_data(subject_data, filtered_data, channel_names):
    for gesture_class in range(8):
        original_gesture_data = subject_data[subject_data['class'] == gesture_class]
        filtered_gesture_data = filtered_data[filtered_data['class'] == gesture_class]

        for channel_name in channel_names:
            plt.figure(figsize=(15, 5))

            original_data = original_gesture_data[channel_name].values
            filtered_channel_exp = filtered_gesture_data[channel_name].values

            plt.plot(original_data, label='Original {}'.format(channel_name))
            plt.plot(filtered_channel_exp, label='Exponential Filtered {}'.format(channel_name))

            plt.xlabel('Waktu (ms)')
            plt.ylabel('Amplitudo')
            plt.title('Hand Gesture Class {} - {} Filter'.format(gesture_class, channel_name))
            plt.legend()

            plt.tight_layout()
            plt.show()

# Get data for subject 11
subject_data = data[data['label'] == 11]
filtered_subject_data = filtered_data[filtered_data['label'] == 11]
channel_names = ['channel1', 'channel2', 'channel3', 'channel4', 'channel5', 'channel6', 'channel7', 'channel8']

# Plot data for subject 11
plot_filtered_data(subject_data, filtered_subject_data, channel_names)

#### Notch Filtering

In [None]:
# Calculate the sampling frequency (fs) for each subset
def calculate_fs(data):
    time_diff = data['time'].diff().dropna().mean()
    fs = 1000 / time_diff  # Assuming time is in milliseconds
    return fs

# Calculate fs for each gesture subset
fs_gesture_datasets = {gesture_class: calculate_fs(gesture_datasets[gesture_class]) for gesture_class in gesture_datasets}

# Print sampling frequencies
for gesture_class, fs in fs_gesture_datasets.items():
    print(f"Frekuensi sampling untuk kelas {gesture_class}: {fs:.2f} Hz")

In [None]:
# Notch filter function
def notch_filter(data, fs, freq=50.0, Q=30.0):
    b, a = iirnotch(freq, Q, fs)
    filtered_data = filtfilt(b, a, data)
    return filtered_data

# Apply notch filter to each subset of the dataset
def apply_notch_filter_to_df(df, emg_channels, fs, freq=50.0, Q=30.0):
    filtered_df = df.copy()
    for channel in emg_channels:
        filtered_df[channel] = notch_filter(df[channel].values, fs, freq, Q)
    return filtered_df

# Apply notch filter to each subset of the dataset using their respective fs
notch_filtered_gesture_datasets = {gesture_class: apply_notch_filter_to_df(filtered_gesture_datasets[gesture_class], emg_channels, fs_gesture_datasets[gesture_class]) for gesture_class in filtered_gesture_datasets}

# Combine notch filtered data back into one dataset
notch_filtered_data = pd.concat(notch_filtered_gesture_datasets.values(), ignore_index=True)

# Display first few rows of notch filtered data
print(notch_filtered_data.head())

In [None]:
# Plotting function for notch filter comparison
def plot_notch_filtered_data(subject_data, filtered_data_exp, filtered_data_notch, channel_names):
    for gesture_class in range(8):
        original_gesture_data = subject_data[subject_data['class'] == gesture_class]
        exp_filtered_gesture_data = filtered_data_exp[filtered_data_exp['class'] == gesture_class]
        notch_filtered_gesture_data = filtered_data_notch[filtered_data_notch['class'] == gesture_class]

        for channel_name in channel_names:
            plt.figure(figsize=(15, 5))

            original_data = original_gesture_data[channel_name].values
            exp_filtered_data = exp_filtered_gesture_data[channel_name].values
            notch_filtered_data = notch_filtered_gesture_data[channel_name].values

            plt.plot(original_data, label='Original {}'.format(channel_name))
            plt.plot(exp_filtered_data, label='Exponential Filtered {}'.format(channel_name))
            plt.plot(notch_filtered_data, label='Notch Filtered {}'.format(channel_name))

            plt.xlabel('Waktu (ms)')
            plt.ylabel('Amplitudo')
            plt.title('Hand Gesture Class {} - {} Filter'.format(gesture_class, channel_name))
            plt.legend()

            plt.tight_layout()
            plt.show()

# Get data for subject 11
subject_data = data[data['label'] == 11]
exp_filtered_subject_data = filtered_data[filtered_data['label'] == 11]
notch_filtered_subject_data = notch_filtered_data[notch_filtered_data['label'] == 11]
channel_names = ['channel1', 'channel2', 'channel3', 'channel4', 'channel5', 'channel6', 'channel7', 'channel8']

# Plot data for subject 11
plot_notch_filtered_data(subject_data, exp_filtered_subject_data, notch_filtered_subject_data, channel_names)

### Tahap 4: Segmentation

### Adjacent Windowing

In [None]:
# Define window duration in seconds and overlap fraction
window_duration = 3  # 3 second
overlap_fraction = 0.5  # 50% overlap

# Function to create segments with a consistent window size
def create_segments(data, window_size, overlap):
    segments = []
    labels = []
    num_samples = len(data)
    step = window_size - overlap
    for start in range(0, num_samples - window_size + 1, step):
        segment = data.iloc[start:start+window_size, 1:9].values  # Take columns channel1 to channel8
        label = data.iloc[start:start+window_size]['label'].mode()[0]  # Take the most frequent label in the window
        segments.append(segment)
        labels.append(label)
    return np.array(segments), np.array(labels)

# Calculate the lowest sampling frequency
min_fs = min(fs_gesture_datasets.values())

# Calculate consistent window size and overlap using the lowest sampling frequency
window_size = int(window_duration * min_fs)
overlap = int(window_size * overlap_fraction)

# Create segments for each gesture class using consistent window size
gesture_segments = {}
gesture_labels = {}

for gesture_class in gesture_datasets:
    subset_data = gesture_datasets[gesture_class]
    segments, labels = create_segments(subset_data, window_size, overlap)
    gesture_segments[gesture_class] = segments
    gesture_labels[gesture_class] = labels

# Combine segments and labels for all gesture classes
all_segments = np.concatenate([gesture_segments[gc] for gc in gesture_segments if gesture_segments[gc].size > 0], axis=0)
all_labels = np.concatenate([gesture_labels[gc] for gc in gesture_labels if gesture_labels[gc].size > 0], axis=0)

print(f"Total segments: {all_segments.shape[0]}")
print(f"Total labels: {all_labels.shape[0]}")

# Ensure the dimensions of segments are consistent
segment_lengths = [segments.shape[1] for segments in gesture_segments.values() if segments.size > 0]
print(f"Segment lengths: {segment_lengths}")

# Check if all segment lengths are the same
if len(set(segment_lengths)) != 1:
    print("Error: Not all segment lengths are the same.")
else:
    print("All segment lengths are consistent.")

In [None]:
import matplotlib.pyplot as plt

def plot_segment(segment, fs, channel_names):
    time_axis = np.linspace(0, len(segment) / fs, len(segment))
    plt.figure(figsize=(15, 5))
    for i, channel in enumerate(channel_names):
        plt.plot(time_axis, segment[:, i], label=channel)
    plt.xlabel('Waktu (detik)')
    plt.ylabel('Amplitudo')
    plt.title('Segmentasi Data')
    plt.legend()
    plt.show()

# Plot the first segment of each gesture class for subject 11
subject_id = 11
subject_segments = {gesture_class: gesture_segments[gesture_class][gesture_labels[gesture_class] == subject_id] for gesture_class in gesture_segments if gesture_segments[gesture_class].size > 0}
channel_names = ['channel1', 'channel2', 'channel3', 'channel4', 'channel5', 'channel6', 'channel7', 'channel8']

for gesture_class in subject_segments:
    if len(subject_segments[gesture_class]) > 0:
        plot_segment(subject_segments[gesture_class][0], fs_gesture_datasets[gesture_class], channel_names)

### Tahap 5: Feature Extraction

In [None]:
# Function to calculate Mean Absolute Value (MAV)
def calculate_mav(segment):
    return np.mean(np.abs(segment), axis=0)

# Function to calculate Root Mean Square (RMS)
def calculate_rms(segment):
    return np.sqrt(np.mean(segment**2, axis=0))

# Function to calculate Amplitude (v)
def calculate_amplitude(segment):
    return np.max(segment, axis=0)

In [None]:
# Initialize dictionaries to store feature values for each gesture class
mav_features = {}
rms_features = {}
amplitude_features = {}

# Extract features for each gesture class
for gesture_class in gesture_segments:
    segments = gesture_segments[gesture_class]
    
    mav_values = np.array([calculate_mav(segment) for segment in segments])
    rms_values = np.array([calculate_rms(segment) for segment in segments])
    amplitude_values = np.array([calculate_amplitude(segment) for segment in segments])
    
    mav_features[gesture_class] = mav_values
    rms_features[gesture_class] = rms_values
    amplitude_features[gesture_class] = amplitude_values

In [None]:
# Convert to DataFrame for better visualization
mav_df = pd.DataFrame({f'class_{gc}': mav_features[gc].mean(axis=0) for gc in mav_features})
rms_df = pd.DataFrame({f'class_{gc}': rms_features[gc].mean(axis=0) for gc in rms_features})
amplitude_df = pd.DataFrame({f'class_{gc}': amplitude_features[gc].mean(axis=0) for gc in amplitude_features})

In [None]:
# Display feature values in tabular form
print("Mean Absolute Value (MAV):")
print(mav_df)

print("\nRoot Mean Square (RMS):")
print(rms_df)

print("\nAmplitude (v):")
print(amplitude_df)

In [None]:
# Calculate overall mean for each feature across all gesture classes
overall_mav = mav_df.mean(axis=1)
overall_rms = rms_df.mean(axis=1)
overall_amplitude = amplitude_df.mean(axis=1)

print("\nOverall Mean Absolute Value (MAV) across all gesture classes:")
print(overall_mav)

print("\nOverall Root Mean Square (RMS) across all gesture classes:")
print(overall_rms)

print("\nOverall Amplitude (v) across all gesture classes:")
print(overall_amplitude)

### Tahap 6: Modeling Multi-Stream 3D CNN

#### Reshape untuk 3D CNN Model Input Layer

In [None]:
# Constants
num_classes = 8
num_channels = 8  # Number of EMG channels

# Reshape feature arrays for 3D CNN
def reshape_features(features, num_channels):
    num_samples = features.shape[1] // num_channels
    reshaped = features.reshape((features.shape[0], num_samples, num_channels, 1, 1))
    return reshaped

# Print the shapes before reshaping
print("Shapes before reshaping:")
print(f"MAV shapes: {[mav_features[gc].shape for gc in mav_features]}")
print(f"RMS shapes: {[rms_features[gc].shape for gc in rms_features]}")
print(f"Amplitude shapes: {[amplitude_features[gc].shape for gc in amplitude_features]}")

In [None]:
# Reshape MAV, RMS, and Amplitude features
mav_reshaped = {gc: reshape_features(mav_features[gc], num_channels) for gc in mav_features}
rms_reshaped = {gc: reshape_features(rms_features[gc], num_channels) for gc in rms_features}
amplitude_reshaped = {gc: reshape_features(amplitude_features[gc], num_channels) for gc in amplitude_features}

# Combine all reshaped features
X_mav = np.concatenate([mav_reshaped[gc] for gc in mav_reshaped if mav_reshaped[gc].size > 0], axis=0)
X_rms = np.concatenate([rms_reshaped[gc] for gc in rms_reshaped if rms_reshaped[gc].size > 0], axis=0)
X_amplitude = np.concatenate([amplitude_reshaped[gc] for gc in amplitude_reshaped if amplitude_reshaped[gc].size > 0], axis=0)

# Labels
y = np.concatenate([gesture_labels[gc] for gc in gesture_labels if gesture_labels[gc].size > 0], axis=0)

# Check for any labels that are out of bounds
if np.any(y >= num_classes):
    print(f"Warning: Found labels out of bounds: {y[y >= num_classes]}")

In [None]:
# Ensure all labels are within the correct range
y = np.clip(y, 0, num_classes - 1)

# Convert labels to categorical
y = to_categorical(y, num_classes=num_classes)

# Print shapes to confirm
print(f'X_mav shape: {X_mav.shape}')
print(f'X_rms shape: {X_rms.shape}')
print(f'X_amplitude shape: {X_amplitude.shape}')
print(f'y shape: {y.shape}')

#### Multi-Stream 3D CNN

In [None]:
# Define 3D CNN for each stream
def create_cnn(input_layer):
    x = Conv3D(32, (3, 3, 3), activation='relu', padding='same')(input_layer)
    x = Conv3D(64, (3, 3, 3), activation='relu', padding='same')(x)
    x = MaxPooling3D((1, 1, 1))(x)
    x = Conv3D(64, (3, 3, 3), activation='relu', padding='same')(x)
    x = MaxPooling3D((1, 1, 1))(x)
    x = Flatten()(x)
    return x

# Input layers
input_mav = Input(shape=(X_mav.shape[1], X_mav.shape[2], X_mav.shape[3], X_mav.shape[4]))
input_rms = Input(shape=(X_rms.shape[1], X_rms.shape[2], X_rms.shape[3], X_rms.shape[4]))
input_amplitude = Input(shape=(X_amplitude.shape[1], X_amplitude.shape[2], X_amplitude.shape[3], X_amplitude.shape[4]))

# Create streams
stream_mav = create_cnn(input_mav)
stream_rms = create_cnn(input_rms)
stream_amplitude = create_cnn(input_amplitude)

# Combine streams
combined = concatenate([stream_mav, stream_rms, stream_amplitude])

# Fully connected layers
x = Dense(512, activation='relu')(combined)
x = Dense(128, activation='relu')(x)
output = Dense(num_classes, activation='softmax')(x)

# Define model
model = Model(inputs=[input_mav, input_rms, input_amplitude], outputs=output)
model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
# Train the model
history = model.fit([X_mav, X_rms, X_amplitude], y, epochs=50, batch_size=32, validation_split=0.2)

### Tahap 7: Evaluation Model

In [None]:
# Lakukan prediksi
y_pred = model.predict([X_mav, X_rms, X_amplitude])

# Buat prediksi untuk setiap kelas hand gesture
y_pred_classes = np.argmax(y_pred, axis=1)
y_test_classes = np.argmax(y, axis=1)

#### Classification Report

In [None]:
# Classification Report untuk setiap hand gesture
for i in range(num_classes):
    y_true = (y_test_classes == i).astype(int)
    y_pred_class = (y_pred_classes == i).astype(int)
    print(f'Classification Report for Class {i}:\n')
    print(classification_report(y_true, y_pred_class))

In [None]:
# Classification Report secara keseluruhan
print('Overall Classification Report:\n')
print(classification_report(y_test_classes, y_pred_classes))

#### Accuracy

In [None]:
# Accuracy untuk setiap hand gesture
for i in range(num_classes):
    y_true = (y_test_classes == i).astype(int)
    y_pred_class = (y_pred_classes == i).astype(int)
    acc = accuracy_score(y_true, y_pred_class)
    print(f'Accuracy for Class {i}: {acc:.2f}')

In [None]:
# Accuracy secara keseluruhan
overall_accuracy = accuracy_score(y_test_classes, y_pred_classes)
print(f'Overall Accuracy: {overall_accuracy * 100:.2f}%')

In [None]:
# Visualize training & validation accuracy and loss
plt.figure(figsize=(12, 5))

# Accuracy
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

# Loss
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

#### Confusion Matrix

In [None]:
# Confusion matrix
conf_matrix = confusion_matrix(y_test_classes, y_pred_classes)
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=range(num_classes), yticklabels=range(num_classes))
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

#### ROC curve

In [None]:
# ROC curve
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(num_classes):
    fpr[i], tpr[i], _ = roc_curve(y[:, i], y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

plt.figure(figsize=(10, 8))
for i in range(num_classes):
    plt.plot(fpr[i], tpr[i], label=f'ROC curve (area = {roc_auc[i]:.2f}) for class {i}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC)')
plt.legend(loc='lower right')
plt.show()