In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Library

In [None]:
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from scipy.stats import pearsonr
from scipy.signal import find_peaks
import seaborn as sns
%matplotlib inline

font_size = 16
# Specify the path to save containing data CSV files
path = '/content/drive/MyDrive/Gait_phase_analysis_DH803/healthy gait/Moin'
fs = 104
cutoff_frequency = 2  # Hz
filter_types = ['butterworth']
# Specify the path to save the result CSV file
result_path = '/content/drive/MyDrive/Gait_phase_analysis_DH803/healthy gait/Result'

# Functions

In [None]:
def apply_lowpass_filters(data, sampling_frequency, cutoff_frequency, filter_types, filter_order=4):
    """
    Apply multiple lowpass filters (including moving average) to the input data and calculate correlation coefficients.

    Parameters:
    - data (array-like): The input data to filter.
    - sampling_frequency (float): The sampling frequency of the input data (in Hz).
    - cutoff_frequency (float): The cutoff frequency of the lowpass filters (in Hz).
    - filter_types (list of str): A list of filter types to use ('butterworth', 'chebyshev1', 'chebyshev2', 'moving_average').
    - filter_order (int): The order of the filters (only applicable to certain filter types).

    Returns:
    - results (dict): A dictionary containing filtered data and correlation coefficients for each filter type.
    """
    nyquist_frequency = 0.5 * sampling_frequency
    results = {}

    for filter_type in filter_types:
        if filter_type == 'butterworth':
            b, a = signal.butter(filter_order, cutoff_frequency / nyquist_frequency, btype='low')
            filtered_data = signal.filtfilt(b, a, data)
        elif filter_type == 'chebyshev1':
            b, a = signal.cheby1(filter_order, 1, cutoff_frequency / nyquist_frequency, btype='low')
            filtered_data = signal.filtfilt(b, a, data)
        elif filter_type == 'chebyshev2':
            b, a = signal.cheby2(filter_order, 30, cutoff_frequency / nyquist_frequency, btype='low')
            filtered_data = signal.filtfilt(b, a, data)
        elif filter_type == 'moving_average':
            filter_window = int(sampling_frequency / cutoff_frequency)  # Window size for moving average
            filtered_data = np.convolve(data, np.ones(filter_window) / filter_window, mode='same')
        else:
            raise ValueError("Invalid filter_type. Supported types: 'butterworth', 'chebyshev1', 'chebyshev2', 'moving_average'")

        correlation_coefficient, _ = pearsonr(data, filtered_data)

        results[filter_type] = {
            'filtered_data': filtered_data,
            'correlation_coefficient': correlation_coefficient
        }

    return results

# Data Read

In [None]:
csv_files = [file for file in os.listdir(path) if file.endswith('.csv')]
dataframes = []
# Loop through each CSV file and read it into a separate dataframe
for csv_file in csv_files:
  # print(csv_file)
  # Construct the full file path
  file_path = os.path.join(path, csv_file)

  # Read the CSV file into a dataframe
  data = pd.read_csv(file_path)
  # # Store the dataframe in the dictionary with the file name as the key
  # dataframes[csv_file] = data

  HeelADCRaw = data['Heel ADC0']
  MidFootADCRaw = data['MidFoot ADC1']
  ToeADCRaw = data['Toe ADC2']

  GyroXRaw = data[' GyroX (deg/s)']
  GyroXRaw = GyroXRaw - np.min(GyroXRaw)
  GyroYRaw = data[' GyroY (deg/s)']
  GyroYRaw = GyroYRaw - np.min(GyroYRaw)
  GyroZRaw = data[' GyroZ (deg/s)']
  GyroZRaw = GyroZRaw - np.min(GyroZRaw)

  AccXRaw = data[' AccX (g)']
  AccYRaw = data[' AccY (g)']
  AccZRaw = data[' AccZ (g)']

  start, end = 0, len(GyroZRaw)
  GyroXRawSeg = GyroXRaw[start: end]
  GyroYRawSeg = GyroYRaw[start: end]
  GyroZRawSeg = GyroZRaw[start: end]

  results1 = apply_lowpass_filters(AccXRaw, fs, cutoff_frequency, filter_types, 2)
  for filter_type, result in results1.items():
    AccXFiltSeg = result['filtered_data']
  results1 = apply_lowpass_filters(AccYRaw, fs, cutoff_frequency, filter_types, 2)
  for filter_type, result in results1.items():
    AccYFiltSeg = result['filtered_data']
  results1 = apply_lowpass_filters(AccZRaw, fs, cutoff_frequency, filter_types, 2)
  for filter_type, result in results1.items():
    AccZFiltSeg = result['filtered_data']

  results1 = apply_lowpass_filters(GyroXRawSeg, fs, cutoff_frequency, filter_types, 2)
  for filter_type, result in results1.items():
    GyroXFiltSeg = result['filtered_data']
  results1 = apply_lowpass_filters(GyroYRawSeg, fs, cutoff_frequency, filter_types, 2)
  for filter_type, result in results1.items():
    GyroYFiltSeg = result['filtered_data']
  results1 = apply_lowpass_filters(GyroZRawSeg, fs, cutoff_frequency, filter_types, 2)
  for filter_type, result in results1.items():
    GyroZFiltSeg = result['filtered_data']



  ## Find max of the given window
  max_window = max(GyroZFiltSeg)
  # print(max_window)

  ## Find the lower threshold based on max found
  thr = 0.32
  low_thr = int(max_window - (0.32 * max_window))
  # print(max_window, low_thr)

  ## Begin MSw Finding based on Max peak detetion and locallized peak detection
  ## Find the local peaks based on lower threshold
  GyroZPksPos, _ = find_peaks(GyroZFiltSeg, height=low_thr, distance=fs//1)
  GyroZPksVal = GyroZFiltSeg[GyroZPksPos]


  ## Begin finding the TO (towards Left of MSw) and IC (towards Right of MSw)
  GyroZPksPosLeftMin, GyroZPksPosRightMin = [], []
  for i in range(len(GyroZPksPos)):
    if (i == 0):
      low, high = (GyroZPksPos[i]), (GyroZPksPos[i]+50)
      b = GyroZPksPos[i] + np.argmin(GyroZFiltSeg[low : high])
      #print(b, GyroZPksPos[i])
      GyroZPksPosRightMin.append(b)
    else:
      #print(i)
      low, high = (GyroZPksPos[i]-50), (GyroZPksPos[i])
      a = GyroZPksPos[i] - 50 + np.argmin(GyroZFiltSeg[low : high])
      #print(a, GyroZPksPos[i])
      GyroZPksPosLeftMin.append(a)

      if ((GyroZPksPos[i]+50) > len(GyroZFiltSeg)):
        continue
      else:
        low, high = (GyroZPksPos[i]), (GyroZPksPos[i]+50)
        b = GyroZPksPos[i] + np.argmin(GyroZFiltSeg[low : high])
        #print(b, GyroZPksPos[i])
        GyroZPksPosRightMin.append(b)

  ## Begin Finding MSt (Between IC and TO)
  if (GyroZPksPosLeftMin > GyroZPksPosRightMin):
    loop_len = len(GyroZPksPosLeftMin)
  else:
    loop_len = len(GyroZPksPosRightMin)

  GyroZPksPosMSt = []
  for i in range(loop_len):
    low, high = (GyroZPksPosRightMin[i]), (GyroZPksPosLeftMin[i])
    a = GyroZPksPosRightMin[i] + np.argmax(GyroZFiltSeg[low : high])
    #print(a, GyroZPksPos[i])
    GyroZPksPosMSt.append(a)



  label = []
  for n in range(0, GyroZPksPos[0]):
    label.append(4)

  for idx in range(len(GyroZPksPos)-1):
    for m in range(GyroZPksPos[idx], GyroZPksPosRightMin[idx]):
      label.append(3)
    for j in range(GyroZPksPosRightMin[idx], GyroZPksPosMSt[idx]):
      label.append(0)
    for k in range(GyroZPksPosMSt[idx], GyroZPksPosLeftMin[idx]):
      label.append(1)
    for i in range(GyroZPksPosLeftMin[idx], GyroZPksPos[idx+1]):
      label.append(2)
    # print(GyroZPksPos[idx], GyroZPksPosRightMin[idx], GyroZPksPosMSt[idx], GyroZPksPosLeftMin[idx])

  for o in range(GyroZPksPos[len(GyroZPksPos)-1], len(GyroZFiltSeg)):
    label.append(4)

  # for i in range(len(label)):
  #   label[i] = label[i]*100 + int(np.mean(GyroZFiltSeg))
  fig, ax1 = plt.subplots(figsize=(20, 5))
  plt.title('Peak Finding [Major-Swing (MSw), Toe-Off (TO), Initial Contact (IC), Major-Stance (MSt)]', fontsize=font_size)
  # Plotting on the first y-axis
  ax1.plot(GyroZFiltSeg, label='Filtered Data', alpha=0.6, linewidth=2)
  ax1.plot(GyroZPksPos, GyroZFiltSeg[GyroZPksPos], "x", label='MSw')
  ax1.plot(GyroZPksPosLeftMin, GyroZFiltSeg[GyroZPksPosLeftMin], "x", label='TO')
  ax1.plot(GyroZPksPosRightMin, GyroZFiltSeg[GyroZPksPosRightMin], "x", label='IC')
  ax1.plot(GyroZPksPosMSt, GyroZFiltSeg[GyroZPksPosMSt], "x", label='MSt')
  ax1.set_xlabel('Samples', fontsize=font_size)
  ax1.set_ylabel('Amplitude', fontsize=font_size)
  ax1.legend(loc='upper left', fontsize=font_size)
  # Creating a second y-axis
  ax2 = ax1.twinx()
  ax2.plot(label, "--", label='Label', color='orange')
  ax2.set_ylabel('Label', fontsize=font_size)
  # Manually setting tick labels
  tick_labels = ['Heel Strike', 'Mid-Stance', 'Toe-Off', 'Mid Swing', 'N/A']
  ax2.set_yticks([0, 1, 2, 3, 4])
  ax2.set_yticklabels(tick_labels, fontsize=font_size)
  ax2.legend(loc='upper right', fontsize=font_size)
  plt.grid(True)
  plt.tight_layout()
  plt.show()

  # Create a dataframe with the specified columns
  df = pd.DataFrame({
        'AccX (g)': AccXFiltSeg,
        'AccY (g)': AccYFiltSeg,
        'AccZ (g)': AccZFiltSeg,
        'GyroX (deg/s)': GyroXFiltSeg,
        'GyroY (deg/s)': GyroYFiltSeg,
        'GyroZ (deg/s)': GyroZFiltSeg,
        'Heel ADC0': HeelADCRaw,
        'MidFoot ADC1': MidFootADCRaw,
        'Toe ADC2': ToeADCRaw,
        'LABEL': label
    })
  df['LABEL'].value_counts().plot(kind='bar') #Plot freuqncy of class

  dataframes.append(df)


Output hidden; open in https://colab.research.google.com to view.

# Save Labelled data into CSV file

In [None]:
# # This code will store the result in same columns.
# # Ensure the result directory exists
# os.makedirs(result_path, exist_ok=True)

# # Concatenate all dataframes in the list
# result_df = pd.concat(dataframes, ignore_index=True)

# # Save the concatenated dataframe to a CSV file
# result_file_path = os.path.join(result_path, 'result.csv')
# result_df.to_csv(result_file_path, index=False)

# print(f"Result CSV file saved at: {result_file_path}")

In [None]:
# # This code will store the result in different columns.
# # Set the path to save the result CSV file
# result_file = 'result.csv'

# # Create the result directory if it doesn't exist
# if not os.path.exists(result_path):
#     os.makedirs(result_path)

# # Assuming 'dataframes' is the list of dataframes from the previous code
# # Create a dictionary to store dataframes as columns
# df_dict = {'df{}'.format(i): df for i, df in enumerate(dataframes)}

# # Create a single dataframe with columns as dataframes
# result_df = pd.concat(df_dict, axis=1)

# # Save the result dataframe to a CSV file
# result_df.to_csv(os.path.join(result_path, result_file), index=False)

In [None]:
# This code will store the result in different CSV files.

# Create the result directory if it doesn't exist
if not os.path.exists(result_path):
    os.makedirs(result_path)
else:
    # Delete all files in the result_path directory
    files_to_delete = os.listdir(result_path)
    for file_to_delete in files_to_delete:
        file_path = os.path.join(result_path, file_to_delete)
        os.remove(file_path)

# Assuming 'dataframes' is the list of dataframes from the previous code
# Save each dataframe to a separate CSV file
for i, df in enumerate(dataframes):
    csv_file = f'df_{i}.csv'
    csv_path = os.path.join(result_path, csv_file)
    df.to_csv(csv_path, index=False)


# Machine Learning Section

In [None]:
import tensorflow as tf
print(tf.version.VERSION)

2.12.0


## Loading the dataset

In [None]:
new_csv_files = [file for file in os.listdir(result_path) if file.endswith('.csv')]
print(new_csv_files)

recordings = []
for csv_file in new_csv_files:
    file_path = os.path.join(result_path, csv_file)
    df_data = pd.read_csv(file_path)
    recordings.append(df_data.iloc[:].values)

recordings = np.array(recordings).reshape(len(recordings), -1, 10)
print("recordings shape:", recordings.shape)

['df_0.csv', 'df_1.csv', 'df_2.csv', 'df_3.csv', 'df_4.csv', 'df_5.csv', 'df_6.csv', 'df_7.csv', 'df_8.csv', 'df_9.csv', 'df_10.csv', 'df_11.csv', 'df_12.csv', 'df_13.csv', 'df_14.csv']
recordings shape: (15, 5000, 10)


## Frame data

In [None]:
def frame(x, frame_len, hop_len):
    ''' Slice a 3D data array into (overlapping) frames. '''

    assert(x.shape == (len(x), 10))
    assert(x.shape[0] >= frame_len)
    assert(hop_len >= 1)

    n_frames = 1 + (x.shape[0] - frame_len) // hop_len
    shape = (n_frames, frame_len, x.shape[1])
    strides_x = ((hop_len * x.strides[0],) + x.strides)

    return np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides_x)

all_frames = []
for i in range(recordings.shape[0]):
    # frames = frame(recordings[i], 26, 26) # no overlap
    frames = frame(recordings[i], 24, 12) # 50% overlap
    all_frames.append(frames)

print(np.array(all_frames).shape)
all_frames = np.concatenate(all_frames)
print(all_frames.shape)

(15, 415, 24, 10)
(6225, 24, 10)


In [None]:
# # Split the last dimension of 'all_frames' into 'x_frames' and 'y_frames'
# x_frames = all_frames[:, :, :9]
# y_frames = all_frames[:, :, 9:]

# # Print the shapes of the resulting arrays
# print("x_frames shape:", x_frames.shape)
# print("y_frames shape:", y_frames.shape)

# Split the 'all_frames' array into 'x_frames' and 'y_frames'
x_frames = recordings[:, :, :-1]  # Take the first 9 columns for each frame
y_frames = recordings[:, :, -1]   # Take the last column for each frame
# Reshape x_recording to (75000, 9)
x_reshaped = np.reshape(x_frames, (75000, 9))

# Reshape y_recording to (75000, 1)
y_reshaped = np.reshape(y_frames, (75000, 1))
print("x_recording shape:", x_frames.shape)
print("y_recording shape:", y_frames.shape)

x_recording shape: (15, 5000, 9)
y_recording shape: (15, 5000)


## Preprocessing the dataset

In [None]:
# Normalize x_frames between [-1;1]
min_val = np.min(x_reshaped)
max_val = np.max(x_reshaped)
x_frames_normed = -1 + 2 * (x_reshaped - min_val) / (max_val - min_val)

print("Normalized x_frame shape:", x_frames_normed.shape)

Normalized x_frame shape: (75000, 9)


## Preparing the dataset (train, test, split)

In [None]:
from sklearn.model_selection import train_test_split
import tensorflow as tf
from sklearn.feature_extraction.text import TfidfVectorizer
# initializing TfidfVectorizer
x_train, x_test, y_train, y_test = train_test_split(x_frames_normed, y_reshaped, test_size=0.25, shuffle=True)



print("X-Training samples:", x_train.shape)
print("X-Testing samples:", x_test.shape)
print("Y-Training samples :", y_train.shape)
print("Y-Testing samples:", y_test.shape)

# Filter the training set
indices_to_remove = [id for id in range(y_train.shape[0]) if y_train[id] == 4]
x_train_filtered = np.delete(x_train, indices_to_remove, axis=0)
y_train_filtered = np.delete(y_train, indices_to_remove)

# Filter the test set
indices_to_remove = [id for id in range(y_test.shape[0]) if y_test[id] == 4]
x_test_filtered = np.delete(x_test, indices_to_remove, axis=0)
y_test_filtered = np.delete(y_test, indices_to_remove)
print("X-Training samples(filtered):", x_train_filtered.shape)
print("X-Testing samples(filtered):", x_test_filtered.shape)
print("Y-Training samples(filtered) :", y_train_filtered.shape)
print("Y-Testing samples(filtered):", y_test_filtered.shape)

X-Training samples: (56250, 9)
X-Testing samples: (18750, 9)
Y-Training samples : (56250, 1)
Y-Testing samples: (18750, 1)
X-Training samples(filtered): (52972, 9)
X-Testing samples(filtered): (17648, 9)
Y-Training samples(filtered) : (52972,)
Y-Testing samples(filtered): (17648,)


## Creating the model

In [None]:
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy

def make_model(input_shape):
    # Reshape input to add a time dimension
    input_layer = tf.keras.layers.Input(input_shape)
    reshape = tf.keras.layers.Reshape((input_shape[0], 1))(input_layer)

    conv1 = tf.keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(reshape)
    conv1 = tf.keras.layers.BatchNormalization()(conv1)
    conv1 = tf.keras.layers.ReLU()(conv1)

    conv2 = tf.keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(conv1)
    conv2 = tf.keras.layers.BatchNormalization()(conv2)
    conv2 = tf.keras.layers.ReLU()(conv2)

    # Apply Batch Normalization after the last convolutional layer
    conv2 = tf.keras.layers.BatchNormalization()(conv2)

    gap = tf.keras.layers.GlobalAveragePooling1D()(conv2)
    num_classes = 4
    output_layer = tf.keras.layers.Dense(num_classes, activation="softmax")(gap)

    return tf.keras.models.Model(inputs=input_layer, outputs=output_layer)

# Assuming x_train_filtered, y_train_filtered, x_test_filtered, y_test_filtered are defined
input_shape = x_train_filtered.shape[1:]

# Adjust learning rate
optimizer = Adam(learning_rate=0.001)

model = make_model(input_shape)

# Compile the model with the optimizer
model.compile(optimizer=optimizer, loss=SparseCategoricalCrossentropy(), metrics=['accuracy'])

# Reshape data to have a third dimension
x_train_reshaped = x_train_filtered[:, :, tf.newaxis]
x_test_reshaped = x_test_filtered[:, :, tf.newaxis]

# Custom data augmentation function for time series
def time_series_augmentation(x):
    # Apply random flip horizontally to each time series independently
    flipped_x = tf.image.random_flip_left_right(x)
    return flipped_x

# Create a data augmentation layer using the custom function
data_augmentation = tf.keras.layers.Lambda(time_series_augmentation)

# Combine data augmentation with the model
augmented_model = tf.keras.Sequential([
    data_augmentation,
    model
])

# Compile augmented model before training
augmented_model.compile(optimizer=optimizer, loss=SparseCategoricalCrossentropy(), metrics=['accuracy'])

# Train the model with augmented data
history = augmented_model.fit(x_train_reshaped, y_train_filtered, epochs=30, batch_size=128, validation_data=(x_test_reshaped, y_test_filtered))

# Evaluate the model on the original test set
test_loss, test_accuracy = model.evaluate(x_test_reshaped, y_test_filtered)

print(f'Test Loss: {test_loss:.4f}')
print(f'Test Accuracy: {test_accuracy:.4f}')


Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
Test Loss: 7.9734
Test Accuracy: 0.2774


In [None]:
test_loss, test_acc = model.evaluate(x_test,  y_test, verbose=2)

print("Test loss:", test_loss)
print("Test acc:", test_acc)
model.summary()

ValueError: ignored

In [None]:
Y_pred = model.predict(x_test)
y_pred = np.argmax(Y_pred, axis=1)
confusion_matrix = tf.math.confusion_matrix(y_test, y_pred)

plt.figure()
sns.heatmap(confusion_matrix,
            annot=True,
            xticklabels=labels,
            yticklabels=labels,
            cmap=plt.cm.Blues,
            fmt='d', cbar=False)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

In [None]:
# Save the model into an HDF5 file ‘model.h5’
model.save('model.h5')