# EMG Signal Classification
Authors: Karla Avilés & George Gaibor

In this notebook, you are going to find two feature extraction techniques: </br>
- Custom fetaures: set of statistical features. </br>
- Rms fetaures: rms value calculated by windows. </br>

In [None]:
import pandas as pd
import numpy as np
import os
import re
import matplotlib.pyplot as plt
import glob

In [None]:
# Define path were emg data is stored into folders belonging to each class (Task1, Task2, ...)
MAIN_PATH = "../Data/RmsNorm_Reg/two_sec"
# Define path to save generated features
FEATURES_PATH = "../Features"

In [None]:
# List containing the name of each folder representing each class
MOTOR_TASKS = ["Task1", "Task2", "Task3", "Task4", "Task5"]
MOTOR_TASKS_DIC = {label.split('.')[0]:num+1 for num, label in enumerate(MOTOR_TASKS)}

# Define type of features: MeanAbsoluteValue, MeanAbsWeightI, MeanAbsWeightII, MedianAbsValue, Variance, StdDeviation.
# In case you want to add a feature, you should generate your own function and add the case on the function: get_custom_features()
FEATURES_LIST = ["MeanAbsoluteValue", "MeanAbsWeightI", "MeanAbsWeightII", "MedianAbsValue", "Variance", "StdDeviation"]

# Define emg sensors used in experiment
EMG_SENSORS = ["MW1", "MW2", "MW3"]


## Custom Features

In [None]:
def mean_abs_value(data: pd.Series) -> float:
  """
  Calculate the mean absolute value of a
  group of data.
  Params:
    data (pd.Series): contains data
  Returns:
    mean_abs_val (float): mean absolute value
  """
  mean_abs_val = abs((data)).mean()
  return mean_abs_val

def mean_abs_weightI(data: pd.Series) -> float:
  """
  Calculate the mean absolute value of a group of
  data with a given weight. Centered data (between 
  0.25 and 0.75 of the window) receives a weight of
  1, otherwise weight is 0.5.
  Params:
    data (pd.Series): contains data
  Returns:
    mean_abs_w_val (float): mean absolute weighted value
  """
  values_list = []
  for row in range(len(data)):
    row += 1
    if row <= 0.75*len(data) and row >= 0.25*len(data):
      w = 1
    else:
      w = 0.5
    values_list.append(abs(data.iloc[row-1])*w)
  mean_abs_w_val = np.mean(values_list)
  return mean_abs_w_val

def mean_abs_weightII(data: pd.Series) -> float:
  """
  Calculate the mean absolute value of a group of
  data with a given weight. Centered data (between 
  0.25 and 0.75 of the window) receives a weight of
  1, otherwise weight is four times the value window
  position.
  Params:
    data (pd.Series): contains data
  Returns:
    mean_abs_w_val (float): mean absolute weighted value
  """
  values_list = []
  for row in range(len(data)):
    row += 1
    if row <= 0.75*len(data) and row >= 0.25*len(data):
      w = 1
    elif row < 0.25*len(data) :
      w = 4*row/len(data)
    else:
      w = 4*(len(data)-row)/len(data)
    values_list.append(abs(data.iloc[row-1])*w)
    mean_abs_w_val = np.mean(values_list)
  return mean_abs_w_val

def median_abs_value(data: pd.Series) -> float:
  """
  Calculate the median absolute value of a
  group of data.
  Params:
    data (pd.Series): contains data
  Returns:
    median_abs_val (float): median absolute value
  """
  median_abs_val = abs(data.median())
  return median_abs_val

def variance(data: pd.Series):
  """
  Calculate the variance of a group 
  of data.
  Params:
    data (pd.Series): contains data
  Returns:
    variance: variance value
  """
  variance = data.var()
  return variance

def std_deviation(data: pd.Series):
  """
  Calculate the standard deviation of 
  a group of data.
  Params:
    data (pd.Series): contains data
  Returns:
    std_deviation: standard deviation value
  """
  std_deviation = data.std()
  return std_deviation

In [None]:
FEATURES_HEADER = []

FEATURES_HEADER.append("Label")
for feature in FEATURES_LIST:
    for sensor in EMG_SENSORS:
        FEATURES_HEADER.append(f"{feature}_{sensor}")


In [None]:
def main(path: str) -> pd.DataFrame:
    """
    This function iterates through the path containing all 
    emg files and calculate the different features for each
    emg sensor.
    Params:
        path (str): path to the folders containing all emg files.
    Returns:
        df_features (Dataframe): contains features from all files
    """
    # Define main dataframe to store calculated features
    df_features = pd.DataFrame(columns=FEATURES_HEADER)
    # Iterate through emg files in path
    for file in glob.iglob(os.path.join(path, "*/*.csv")):
        emg_file = pd.read_csv(file, index_col = 0)
        feature_file = {}
        # Iterate through each emg sensor
        for sensor in EMG_SENSORS:
            # Iterate through each feature type
            for feature in FEATURES_LIST:
                match feature:
                    case "MeanAbsoluteValue":
                        value = mean_abs_value(emg_file[sensor])
                    case "MeanAbsWeightI":
                        value = mean_abs_weightI(emg_file[sensor])
                    case "MeanAbsWeightII":
                        value = mean_abs_weightII(emg_file[sensor])
                    case "MedianAbsValue":
                        value = median_abs_value(emg_file[sensor])
                    case "Variance":
                        value = variance(emg_file[sensor])
                    case "StdDeviation":
                        value = std_deviation(emg_file[sensor])
                    case _:
                        value = np.nan
                        print("Feature doesn't exist")
                # Store task label
                feature_file["Label"] = MOTOR_TASKS_DIC[file.split("\\")[-2]]
                # Store calculated feature in dict
                feature_file[f"{feature}_{sensor}"] = (value)
        # Append emg file features in dataframe
        df_features = df_features.append(feature_file, ignore_index=True)
    return df_features

In [None]:
df_custom_features = main(MAIN_PATH)

In [None]:
df_custom_features.to_csv(os.path.join(FEATURES_PATH, "custom_features.csv"), index=False)

## Rms Features

In [None]:
# Define initial window size, it can be measured reading an emg file and using len()
INITIAL_WINDOW = 40

# Define the size of rms windows.
# Note: Initial window size should be divisible per this value
WINDOW_RMS = 10

assert INITIAL_WINDOW%WINDOW_RMS == 0, f"INITIAL_WINDOW should be divisible by WINDOW_RMS"

features_column = ['Label']
for sensor in EMG_SENSORS:
    for x in range(WINDOW_RMS,INITIAL_WINDOW+WINDOW_RMS, WINDOW_RMS):
        features_column.append('RMS' + str(x) + '_' + sensor)

def define_feature_df(path: str) -> pd.DataFrame:
    """
    Creates a dataframe to store the calculated features.
    Params: 
        path (str): path to the folders containing all emg files.
    Returns:
        df (Dataframe): dataframe full of nans to store features later.
    """
    num_files = 0
    for folder in os.listdir(path):
        path_folder = os.path.join(path, folder)
        csv_files = [f for f in os.listdir(path_folder) if re.search(r'.*\.(csv)$', f)]
        num_files += len(csv_files)
    df = pd.DataFrame(np.nan, index = list(range(0,num_files)), columns= features_column)
    return df

def get_rms_features(path):
    """
    This function iterates through the path containing all 
    emg files and calculate rms feature per window for each
    emg sensor.
    Params:
        path (str): path to the folders containing all emg files.
    Returns:
        df_rms_features (Dataframe): contains features from all files
    """
    num_row = 0
    df_rms_features = define_feature_df(MAIN_PATH)
    for file in glob.iglob(os.path.join(path, "*/*.csv")):
        emg_file = pd.read_csv(file, index_col = 0)
        print(len(emg_file))                   
        for idx in list(range(0,int(emg_file.shape[0]/WINDOW_RMS))):
            idx = idx*WINDOW_RMS
            for column in EMG_SENSORS:
                column_tag = 'RMS' + str(idx+WINDOW_RMS) + '_' + column
                df_rms_features.at[num_row,column_tag] = np.sqrt(np.mean(emg_file[idx:idx+WINDOW_RMS][column]**2))
        df_rms_features.at[num_row, 'Label'] = MOTOR_TASKS_DIC[file.split("\\")[-2]]
        num_row += 1
    return df_rms_features

In [None]:
df_rms_features = get_rms_features(MAIN_PATH)

In [None]:
df_rms_features.to_csv(os.path.join(FEATURES_PATH, "rms_features.csv"), index=False)