<a href="https://colab.research.google.com/github/ABHISHEK17042003/Classification-of-Emg-using-cnn/blob/main/EMG_Classification_with_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION (/kaggle/input) IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'emg-signal-for-gesture-recognition:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F756231%2F1306261%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240203%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240203T135816Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D6712b2d068d18a396874b7e52f551443c98d4bdf43f9a2c86a043fe092b6de510655f0fff1aa084fdc3aa7e2c13312618cbb7e3995547281cf53ac002cf4220c48e701aeaa460c60f416d7ac1f5d204248aca3b5c5f9f06537c47826832a4e758a4e467b87fcee22e114ab1f5e91af026692ed6747b493d3b32bd21446da1bd7eb0f81ff7663f38bd68f4ed8980fe3be6f5f43d20b849ceb466312d6ce60a936aec0b74b3e9470853c2f7dfcfe17d540459377b7ebaa7bb38b479b46239745358a92d9ffedaf686c13fb93dad289ef2215b3c32f8fecc7d164860e04a3e26599d8308df988ef28328f8884d773dbd913aa7bfee53dd2b8f9895db8adb0984684'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


In [None]:
import pandas as pd
import random
import csv
import numpy as np

In [None]:
project_dataset_dir = '/kaggle/input/emg-signal-for-gesture-recognition/EMG-data.csv'
df = pd.read_csv(project_dataset_dir)

In [None]:
df.info()

In [None]:
df.drop('time', inplace=True, axis=1)

df.head()

In [None]:
df.info()
len(df)

In [None]:
df.isna().sum() # check null dataset

In [None]:
print(df['class'].value_counts())

In [None]:
df.drop(df[df['class'] == 0].index, inplace=True)

In [None]:
df.head()

In [None]:
all_labels_present = df.groupby('class')['label'].nunique() == 36
# use .unquniq() == 36 to check if all class have 36 unique label
all_labels_present

In [None]:
# check how many label in class = 7
class_7_data = df[df['class'] == 7]
class_7_label_counts = class_7_data.groupby('label').size().reset_index(name='count')
class_7_label_counts

In [None]:
data = df[df['class'] != 7]

In [None]:
data['class'].value_counts()

In [None]:
subjects_data = data.groupby(['label','class'])

In [None]:
import pandas as pd

# Assuming you have loaded your dataset into a DataFrame named df
class_counts = data.groupby('class').size().reset_index(name='count')

print('Class distribution:')
print(class_counts)

In [None]:
data

In [None]:
data.drop('label', inplace=True, axis=1)



In [None]:
data.info()

# ***NORMALIZE PROCESS***

In [None]:
def normalized(data):
    min_value = np.min(data)
    max_value = np.max(data)
    normalized_data = (data - min_value) / (max_value - min_value)
    return normalized_data

# ***THE SLIDING WINDOW - CNN1***

In [None]:
def sliding_window(data, window_size, stride):
    windowed_data = []
    labels = []

    for i in range(0, len(data) - window_size + 1, stride):
        windowed = data.iloc[i:i+window_size, :-1].values  # Etiket sütununu hariç al ve NumPy array'e dönüştür
        label = data.iloc[i+window_size-1, -1]  # Son sütunu etiket olarak kullan
        windowed_data.append(windowed)
        labels.append(label)

    return np.array(windowed_data), np.array(labels)

In [None]:
window_size=150
stride=30
X, Y = sliding_window(data, 150, 30)

In [None]:
X = normalized(X)

> Need to start the class from 0, because when you start class from 1, Keras accept [1,6) so the model does not work for class 6.

In [None]:
Y = Y - 1

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense

def CNN_model(input_shape, num_classes):
    model = Sequential()

    # Convolutional layers
    model.add(Conv1D(32, kernel_size=3, activation='relu', input_shape=input_shape))
    model.add(MaxPooling1D(pool_size=2))

    model.add(Conv1D(64, kernel_size=3, activation='relu'))
    model.add(MaxPooling1D(pool_size=2))

    # Flattening layers
    model.add(Flatten())

    # Full connected layers
    model.add(Dense(128, activation='relu'))
    model.add(Dense(num_classes, activation='softmax'))

    # compile the model
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model

In [None]:
# define the CNN model
input_shape = X_train.shape[1:]  # set input size
num_classes = len(np.unique(Y))   # set class size

model = CNN_model(input_shape, num_classes)

In [None]:
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_test, y_test))

In [None]:
from sklearn.metrics import accuracy_score, f1_score

# make predictions on training data
y_pred_train = model.predict(X_train)
y_pred_train = np.argmax(y_pred_train, axis=1)  # One-hot encoding'den etiketlere dönüştür

# make predictions on test data
y_pred_test = model.predict(X_test)
y_pred_test = np.argmax(y_pred_test, axis=1)  # One-hot encoding'den etiketlere dönüştür

# calculate accuracy
accuracy_training = accuracy_score(y_train, y_pred_train)
accuracy_test = accuracy_score(y_test, y_pred_test)

# calculate F1 score
f1_training = f1_score(y_train, y_pred_train, average='weighted')
f1_test = f1_score(y_test, y_pred_test, average='weighted')

print("Training Accuracy:", accuracy_training)
print("Test Accuracy:", accuracy_test)
print("Training F1 Score:", f1_training)
print("Test F1 Score:", f1_test)

In [None]:
import matplotlib.pyplot as plt

# training and testing loss graph
plt.plot(history.history['loss'], label='Eğitim Kaybı')
plt.plot(history.history['val_loss'], label='Test Kaybı')
plt.legend()
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm = confusion_matrix(y_test, y_pred_test)
plt.figure(figsize=(6, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=np.unique(y_test), yticklabels=np.unique(y_test))
plt.title('Confusion Matrix SW')
plt.xlabel('Predicted Labels SW')
plt.ylabel('Real Labels SW')
plt.show()

In [None]:
from sklearn.metrics import classification_report

# accuracy and F1 score of classes
rapor = classification_report(y_test, y_pred_test, target_names=[str(i) for i in np.unique(y_test)])
print(rapor)

# ***Root Mean Square (RMS) - CNN3***

In [None]:
import numpy as np
from scipy.fft import fft
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense

def fourier_transform(X):
    transformed_data = []

    for item in X:
        transformed_item = []
        for channel in item.T:  # transpose the data and apply Fourier transform to each channel separately
            fft_values = fft(channel)
            transformed_item.append(np.abs(fft_values))  # only use amplitude information

        transformed_data.append(np.array(transformed_item))

    return np.array(transformed_data)

In [None]:
# apply fourier transform
transformed_X = fourier_transform(X)

In [None]:
transpoze_X = np.transpose(transformed_X, (0, 2, 1))

In [None]:
# split the data into training and testing sets
X_train_ft, X_test_ft, y_train_ft, y_test_ft = train_test_split(transpoze_X, Y, test_size=0.2, random_state=42)

In [None]:
# define CNN model
input_shape = X_train_ft.shape[1:]  # set input size
num_classes = len(np.unique(Y))   # set class size

model_ft = CNN_model(input_shape, num_classes)

In [None]:
history_ft = model_ft.fit(X_train_ft, y_train_ft, epochs=10, validation_data=(X_test_ft, y_test_ft))

In [None]:
from sklearn.metrics import accuracy_score, f1_score

# make predictions on training data
y_pred_train_ft = model_ft.predict(X_train_ft)
y_pred_train_ft = np.argmax(y_pred_train_ft, axis=1)  # convert from one-hot encoding to tags

# make predictions on test data
y_pred_test_ft = model_ft.predict(X_test_ft)
y_pred_test_ft = np.argmax(y_pred_test_ft, axis=1)  # convert from one-hot encoding to tags

# calculate accuracy
accuracy_training_ft = accuracy_score(y_train_ft, y_pred_train_ft)
accuracy_test_ft= accuracy_score(y_test_ft, y_pred_test_ft)

# calculate F1 score
f1_training_ft = f1_score(y_train_ft, y_pred_train_ft, average='weighted')
f1_test_ft = f1_score(y_test_ft, y_pred_test_ft, average='weighted')

print("Eğitim Doğruluğu FT:", accuracy_training_ft)
print("Test Doğruluğu FT:", accuracy_test_ft)
print("Eğitim F1 Skoru FT:", f1_training_ft)
print("Test F1 Skoru FT:", f1_test_ft)

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm_ft = confusion_matrix(y_test, y_pred_test_ft)
plt.figure(figsize=(6, 6))
sns.heatmap(cm_ft, annot=True, fmt="d", cmap="Blues", xticklabels=np.unique(y_test), yticklabels=np.unique(y_test))
plt.title('Confusion Matrix FT')
plt.xlabel('Predicted Labels FT')
plt.ylabel('Real Labels FT')
plt.show()

In [None]:
from sklearn.metrics import classification_report

# accuracy and F1 score of classes
rapor_ft = classification_report(y_test, y_pred_test_ft, target_names=[str(i) for i in np.unique(y_test)])
print(rapor_ft)

# ***Root Mean Square (RMS) - CNN3***

In [None]:
X.shape

In [None]:
import numpy as np

# apply RMS
rms_data = np.sqrt(np.mean(np.square(X), axis=1))

In [None]:
rms_data.shape

In [None]:
# making data three-dimensional
rms_data = np.expand_dims(rms_data, axis=-1)

In [None]:
rms_data.shape

In [None]:
# check the shape
print("Original shape:", X.shape)
print("Shape After RMS Processing:", rms_data.shape)

In [None]:
# split the data into trainig and test
X_train_rms, X_test_rms, y_train_rms, y_test_rms = train_test_split(rms_data, Y, test_size=0.2, random_state=42)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense

def CNN_model_rms(input_shape, num_classes):
    model = Sequential()

    # Convolutional Layers
    model.add(Conv1D(32, kernel_size=3, activation='relu', input_shape=input_shape))
    model.add(MaxPooling1D(pool_size=2))

    model.add(Conv1D(64, kernel_size=3, activation='relu'))
    model.add(MaxPooling1D(pool_size=1))

    # Flattening Layer
    model.add(Flatten())

    # Full Connected Layers
    model.add(Dense(128, activation='relu'))
    model.add(Dense(num_classes, activation='softmax'))

    # compile the model
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model

In [None]:
X_train_rms.shape

In [None]:
# define CNN model
input_shape_rms = X_train_rms.shape[1:]  # set input size
num_classes_rms = len(np.unique(Y))   # set class size

model_rms = CNN_model_rms(input_shape_rms, num_classes_rms)

In [None]:
history_rms = model_rms.fit(X_train_rms, y_train_rms, epochs=10, validation_data=(X_test_rms, y_test_rms))

In [None]:
from sklearn.metrics import accuracy_score, f1_score

# Predict on training data
y_pred_train_rms = model_rms.predict(X_train_rms)
y_pred_train_rms = np.argmax(y_pred_train_rms, axis=1)  # One-hot encoding'den etiketlere dönüştür

# Predict on test data
y_pred_test_rms = model_rms.predict(X_test_rms)
y_pred_test_rms = np.argmax(y_pred_test_rms, axis=1)  # One-hot encoding'den etiketlere dönüştür

# calculate accuracy
accuracy_training_rms = accuracy_score(y_train_rms, y_pred_train_rms)
accuracy_test_rms = accuracy_score(y_test_rms, y_pred_test_rms)

# calculate F1 score
f1_training_rms = f1_score(y_train_rms, y_pred_train_rms, average='weighted')
f1_test_rms = f1_score(y_test_rms, y_pred_test_rms, average='weighted')

print("Training Accuracy RMS:", accuracy_training_rms)
print("Test Accuracy RMS:", accuracy_test_rms)
print("Training F1 Score RMS:", f1_training_rms)
print("Test F1 Score RMS:", f1_test_rms)

In [None]:
from sklearn.metrics import classification_report

# accuracy and F1 score of classes
rapor_rms = classification_report(y_test, y_pred_test_rms, target_names=[str(i) for i in np.unique(y_test)])
print(rapor_rms)

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm_rms = confusion_matrix(y_test, y_pred_test_rms)
plt.figure(figsize=(6, 6))
sns.heatmap(cm_rms, annot=True, fmt="d", cmap="Blues", xticklabels=np.unique(y_test), yticklabels=np.unique(y_test))
plt.title('Confusion Matrix RMS')
plt.xlabel('Predicted Labels RMS')
plt.ylabel('Real Labels RMS')
plt.show()