## **SynapseON II : Fatigue State Classification**



In [None]:
import pandas as pd
import json
from google.colab import files
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt

In [None]:
!head -n 30 training_dataset.json

head: cannot open 'training_dataset.json' for reading: No such file or directory


In [None]:
!head -n 20 test_dataset.json
!tail -n 20 test_dataset.json

head: cannot open 'test_dataset.json' for reading: No such file or directory
tail: cannot open 'test_dataset.json' for reading: No such file or directory


In [None]:
with open('training_dataset.json') as f:
  data = json.load(f)
mm = []
for state, cause in data.items():
  m = pd.json_normalize(cause)
  m['state'] = state.split('_')[1]
  mm.append(m)

datass = pd.concat(mm, ignore_index=False)

datass.to_csv('training_dataset_fatigue.csv', index=False)
files.download('training_dataset_fatigue.csv')


In [None]:
fat = pd.read_csv('training_dataset_fatigue.csv')

print(len(fat))
print(fat.nunique())
fat = fat.drop('index', axis=1)
fat.info()
fat.sample(5)

In [None]:
def filter_emg(emg_signal, lowcut=20, highcut=450, fs=1000, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, emg_signal)

fat['EMG_clean'] = filter_emg(fat['EMG'].values)

plt.figure(figsize=(10, 4))
plt.plot(fat['EMG'], label='unfiltered', alpha=0.3)
plt.plot(fat['EMG_clean'], label='filtered')
plt.legend()
plt.show()


invalid = ~fat['AvgBPM'].between(40, 200)
fat.loc[invalid, 'AvgBPM'] = fat['AvgBPM'].rolling(window=5 , center=True).median().loc[invalid]

fat['og_BPM'] = fat['BPM']

illogic_rest = ((fat['state']== 'REST') & (fat['EMG'] > 50) & (fat['BPM']<60))
fat.loc[illogic_rest , 'BPM'] = (60 + (fat.loc[illogic_rest , 'BPM'])*0.1)

illogic_fat = ((fat['state']== 'FATIG') & (fat['EMG'] > 200) & (fat['BPM'] < 100))
fat.loc[illogic_fat, 'BPM'] = (100 + (fat.loc[illogic_fat, 'BPM'] * 0.15))

plt.figure(figsize=(12, 6))
plt.scatter(fat['EMG'], fat['og_BPM'], color='black', alpha=0.5, s=20, label='Original')
plt.scatter(fat['EMG'], fat['BPM'], color='red', alpha=0.5, s=20, label='Corrected')
plt.xlabel('EMG (µV)')
plt.ylabel('BPM')
plt.title('BPM Correction: Before vs After')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

print(fat.sample(5))
print(len(fat))


In [None]:
with open('test_dataset.json') as f:
  data = json.load(f)
mm = []
for state, cause in data.items():
  m = pd.json_normalize(cause)
  m['state'] = state.split('_')[1]
  mm.append(m)

datass = pd.concat(mm, ignore_index=False)

datass.to_csv('testing_dataset_fatigue.csv', index=False)
files.download('testing_dataset_fatigue.csv')



In [None]:
test_m = pd.read_csv('testing_dataset_fatigue.csv')
print(test_m['EMG'].isna().sum())
print(test_m.nunique())
print(test_m['state'].unique())
test_m = test_m.drop('index', axis=1)
def filter_emg(emg_signal, lowcut=20, highcut=450, fs=1000, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, emg_signal)

test_m['EMG_clean'] = filter_emg(test_m['EMG'].values)


invalid = ~test_m['BPM'].between(40, 200)
test_m.loc[invalid, 'BPM'] = test_m['BPM'].rolling(window=5, center=True).median().loc[invalid]


illogic_rest = ((test_m['state'] == 'REST') & (test_m['EMG'] > 50) & (test_m['BPM'] < 60))
test_m.loc[illogic_rest, 'BPM'] = 60 + (test_m.loc[illogic_rest, 'BPM'] * 0.1)

illogic_fat = ((test_m['state'] == 'FATIG') & (test_m['EMG'] > 200) & (test_m['BPM'] < 100))
test_m.loc[illogic_fat, 'BPM'] = 100 + (test_m.loc[illogic_fat, 'BPM'] * 0.15)

print(test_m.sample(5))

In [None]:
import pandas as pd
import numpy as np
import json
from scipy.signal import butter, filtfilt
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import (accuracy_score, precision_score,
                            recall_score, f1_score,
                            confusion_matrix, classification_report)


X_test = test_m[['EMG_clean', 'BPM', 'IR', 'AvgBPM']]


Xtrain = fat[['EMG_clean', 'BPM', 'IR', 'AvgBPM']]
ytrain = fat['state']


model = RandomForestClassifier(random_state=42)
model.fit(Xtrain, ytrain)


predicted_states = model.predict(X_test)
test_m['predicted_state'] = predicted_states
print(test_m[['EMG_clean', 'BPM', 'IR', 'AvgBPM', 'predicted_state']].sample(5))




In [None]:
print("\nMetrics:")
print(f"Accuracy: {accuracy_score(test_m['state'], predicted_states):.2f}")
print(f"Precision: {precision_score(test_m['state'], predicted_states, average='weighted'):.2f}")
print(f"Recall: {recall_score(test_m['state'], predicted_states, average='weighted'):.2f}")
print(f"F1 Score: {f1_score(test_m['state'], predicted_states, average='weighted'):.2f}")

print("\nConfusion Matrix:")
print(confusion_matrix(test_m['state'], predicted_states, labels=['REST', 'FATIG']))

print("\nClassification Report:")
print(classification_report(test_m['state'], predicted_states))