# Importing Libraries

In [9]:
import numpy as np
from fitizens_libraries.load_and_process_training_data import load_training_data
import os
import pandas as pd
from math import ceil

# Loading Data

In [2]:
folder_path = "labeled"
os.makedirs(folder_path, exist_ok=True)
file_names = [f"{folder_path}/{name}" for name in os.listdir(folder_path)]
signals = ["accX", "accY", "accZ", "gyroX", "gyroY", "gyroZ", "magnX", "magnY", "magnZ", "linAccX", "linAccY", "linAccZ"]

data, wk = load_training_data(filelist=file_names,
                         signals= signals,
                          target_exercise="SQUAT", other_exercises=[], is_peak_minima=True)
data[0]

{'series':                          accX_orig  accY_orig  accZ_orig  gyroX_orig  \
 time                                                                   
 1970-01-01 00:00:01.651  -1.143810   2.380944  -1.104601  150.289993   
 1970-01-01 00:00:01.660  -0.997843   2.416837  -1.353463  150.009995   
 1970-01-01 00:00:01.673  -0.624549   2.357014  -1.415679  147.279999   
 1970-01-01 00:00:01.682  -0.296721   2.479053  -1.406107  147.069992   
 1970-01-01 00:00:01.691   0.210576   2.826024  -1.657362  144.059998   
 ...                            ...        ...        ...         ...   
 1970-01-01 00:00:02.595  -0.210576   4.582419  -2.159873  -95.480003   
 1970-01-01 00:00:02.604  -0.160325   4.266555  -2.279519  -98.420006   
 1970-01-01 00:00:02.613  -0.167504   4.374236  -3.054821 -104.230003   
 1970-01-01 00:00:02.622  -0.114860   3.661149  -2.978248 -110.670006   
 1970-01-01 00:00:02.631   0.011965   2.931312  -1.913404 -100.310005   
 
                          gyroY_orig  g

# Data Exploration

In [114]:
squats = [element["series"] for element in list(filter(lambda info: info["target"] == "SQUAT", data))]
no_exercise = [element["series"] for element in list(filter(lambda info: info["target"] == "NO_EXERCISE", data))]

In [115]:
print("Squats: ", len(squats))
print("No exercise: ", len(no_exercise))

Squats:  846
No exercise:  2680


In [116]:
import plotly.graph_objects as go

# Sample data
categories = list( range(len(squats)) )
values = sorted([info.shape[0] for info in squats])

# Create a figure
fig = go.Figure()

# Add a bar trace to the figure
fig.add_trace(go.Bar(x=categories, y=values))

# Update layout (optional but recommended for clarity)
fig.update_layout(
    title='Squats points data',
    xaxis_title='Categories',
    yaxis_title='Values'
)

# Show the plot
fig.show()


In [117]:
import plotly.graph_objects as go

# Sample data
categories = list( range(len(no_exercise)) )
values = sorted([info.shape[0] for info in no_exercise])

# Create a figure
fig = go.Figure()

# Add a bar trace to the figure
fig.add_trace(go.Bar(x=categories, y=values))

# Update layout (optional but recommended for clarity)
fig.update_layout(
    title='No exercise points data',
    xaxis_title='Categories',
    yaxis_title='Values'
)

# Show the plot
fig.show()


# Resampling time series data

### Look for a number to standard all the sample sizes

In [118]:
median_squats = int(np.median([info.shape[0] for info in squats]))
median_no_exercise = int(np.median([info.shape[0] for info in no_exercise]))
median_mean = int(ceil( (median_squats+median_no_exercise) / 2 ))
print("Median squats: ", median_squats)
print("Median no exercise: ", median_no_exercise)
print("Median mean: ", median_mean)

Median squats:  109
Median no exercise:  132
Median mean:  121


In [119]:
def resampling_data(df_input, desired_rows, columns):
    df = df_input[columns]
    if df.shape[0] == desired_rows:
        return df    
    # Resample the DataFrame
    additional_index = pd.date_range(start=df.index.min(),  
                                     end=df.index.max(),
                                     periods=desired_rows)
    resampled_df = df.reindex(additional_index).interpolate()
    
    return resampled_df

In [120]:
columns_selected = ["linAccX", "linAccY", "linAccZ", "gyroX", "gyroY", "gyroZ", "magnX", "magnY", "magnZ"]
resampled_squats = [resampling_data(info, median_mean, columns_selected) for info in squats]
resampled_no_exercise = [resampling_data(info, median_mean, columns_selected) for info in no_exercise]

In [121]:
resampled_squats_labeled = [1] * len(resampled_squats)
resampled_no_exercise_labeled = [0] * len(resampled_no_exercise)

In [122]:
X = resampled_squats + resampled_no_exercise
X = np.stack([df.to_numpy() for df in X])
Y = np.array(resampled_squats_labeled + resampled_no_exercise_labeled)

In [199]:
import tensorflow as tf
from tensorflow.keras.metrics import Metric
# Define a custom F1 score metric
@tf.keras.utils.register_keras_serializable()
class F1Score(Metric):
    def __init__(self, name='f1_score', **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.precision = tf.keras.metrics.Precision()
        self.recall = tf.keras.metrics.Recall()
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)
    
    def result(self):
        p = self.precision.result()
        r = self.recall.result()
        return 2 * ((p * r) / (p + r + tf.keras.backend.epsilon()))
    
    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()

# Test [ChatGPT Neural Network](https://docs.google.com/document/d/1h1toV2v1K-cyVkZUMM5qgS_ETnDb4z8INJg7L9F9isE/edit?usp=sharing)

In [208]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.metrics import AUC


# First, split the data into a combined train-test set and a production set
X_train_test, X_prod, y_train_test, y_prod = train_test_split(X, Y, test_size=0.1)  
# Now split the train-test set into separate training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_train_test, y_train_test, test_size=0.2)
# Apply one-hot encoding to the target variables
y_train = to_categorical(y_train, num_classes=2)
y_test = to_categorical(y_test, num_classes=2)
y_prod = to_categorical(y_prod, num_classes=2)

num_channels = len(columns_selected)
# Build the CNN model
model = Sequential([
    Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=(median_mean, num_channels)),
    MaxPooling1D(pool_size=2),
    Flatten(),
    Dense(50, activation='relu'),
    Dense(2, activation='softmax')
])

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=[F1Score(), AUC()])

# Train the model
model.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.2)

# Evaluate the model
loss, f1, auc = model.evaluate(X_test, y_test)
print(f'Loss: {loss}, F1 Score: {f1}, AUC: {auc}')

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Loss: 0.08495312929153442, F1 Score: 0.9889762997627258, AUC: 0.9932593703269958


In [209]:
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import numpy as np
from tensorflow.keras.models import load_model

#Save and load model
model.save('model_chatgpt.keras')
model = load_model('model_chatgpt.keras', custom_objects={'F1Score': F1Score})
# Use the model to make predictions
y_pred = model.predict(X_prod)
# Convert predictions and true labels back to label format if necessary
y_pred_label = np.argmax(y_pred, axis=1)
y_true_label = np.argmax(y_prod, axis=1)
# Calculate F1 score
f1 = f1_score(y_true_label, y_pred_label)
# Calculate AUC
# For AUC, you need the prediction probabilities for the positive class
y_pred_prob = y_pred[:, 1]  # Assuming class 1 is the 'positive' class
auc = roc_auc_score(y_true_label, y_pred_prob)
# Calculate Accuracy
accuracy = accuracy_score(y_true_label, y_pred_label)
# Print the metrics
print(f'F1 Score: {f1}')
print(f'AUC: {auc}')
print(f'Accuracy: {accuracy}')


F1 Score: 0.9659863945578231
AUC: 0.9951080112370435
Accuracy: 0.9858356940509915


# Model From [Automatic Classification of Squat Posture Using Inertial Sensors: Deep Learning Approach](https://www.mdpi.com/1424-8220/20/2/361)

In [216]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, LSTM, Dense, Dropout, TimeDistributed
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# First, split the data into a combined train-test set and a production set
X_train_test, X_prod, y_train_test, y_prod = train_test_split(X, Y, test_size=0.1)  
# Now split the train-test set into separate training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_train_test, y_train_test, test_size=0.2)
# Apply one-hot encoding to the target variables
y_train = to_categorical(y_train, num_classes=2)
y_test = to_categorical(y_test, num_classes=2)
y_prod = to_categorical(y_prod, num_classes=2)

# Number of time steps and features
time_steps = 121
features = 9

# Define the input shape for the first TimeDistributed layer
input_shape = (time_steps, features, 1)

# Define the model
model = Sequential()

filters = [8 , 16, 32]
for filter_value in filters:
    # 1D Convolutional layers within TimeDistributed
    model.add(TimeDistributed(Conv1D(filters=filter_value, kernel_size=3, activation='relu', padding='same'), input_shape=input_shape))
    model.add(TimeDistributed(MaxPooling1D(2)))
    model.add(TimeDistributed(Dropout(0.5)))

# Flatten the output for the LSTM layer
model.add(TimeDistributed(Flatten()))

# LSTM layer
model.add(LSTM(64))

# Dense layer
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.5))

# Output layer
model.add(Dense(2, activation='softmax'))  # Replace with your number of classes

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                loss='categorical_crossentropy', metrics=[F1Score(), AUC(), 'accuracy'])

# Model summary
model.summary()

Model: "sequential_88"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 time_distributed_138 (Time  (None, 121, 9, 8)         32        
 Distributed)                                                    
                                                                 
 time_distributed_139 (Time  (None, 121, 4, 8)         0         
 Distributed)                                                    
                                                                 
 time_distributed_140 (Time  (None, 121, 4, 8)         0         
 Distributed)                                                    
                                                                 
 time_distributed_141 (Time  (None, 121, 4, 16)        400       
 Distributed)                                                    
                                                                 
 time_distributed_142 (Time  (None, 121, 2, 16)      

In [217]:
# Train the model
model.fit(X_train, y_train, epochs=500, batch_size=32, validation_split=0.2)

# Evaluate the model
loss, f1, auc, accuracy = model.evaluate(X_test, y_test)
print(f'Loss: {loss}, F1 Score: {f1}, AUC: {auc}, Accuracy: {accuracy}')

Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500
Epoch 26/500
Epoch 27/500
Epoch 28/500
Epoch 29/500
Epoch 30/500
Epoch 31/500
Epoch 32/500
Epoch 33/500
Epoch 34/500
Epoch 35/500
Epoch 36/500
Epoch 37/500
Epoch 38/500
Epoch 39/500
Epoch 40/500
Epoch 41/500
Epoch 42/500
Epoch 43/500
Epoch 44/500
Epoch 45/500
Epoch 46/500
Epoch 47/500
Epoch 48/500
Epoch 49/500
Epoch 50/500
Epoch 51/500
Epoch 52/500
Epoch 53/500
Epoch 54/500
Epoch 55/500
Epoch 56/500
Epoch 57/500
Epoch 58/500
Epoch 59/500
Epoch 60/500
Epoch 61/500
Epoch 62/500
Epoch 63/500
Epoch 64/500
Epoch 65/500
Epoch 66/500
Epoch 67/500
Epoch 68/500
Epoch 69/500
Epoch 70/500
Epoch 71/500
Epoch 72/500
Epoch 73/500
Epoch 74/500
Epoch 75/500
Epoch 76/500
Epoch 77/500
Epoch 78

In [218]:
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import numpy as np
from tensorflow.keras.models import load_model

#Save and load model
model.save('model_uni_ulsan.keras')
model = load_model('model_uni_ulsan.keras', custom_objects={'F1Score': F1Score})
# Use the model to make predictions
y_pred = model.predict(X_prod)
# Convert predictions and true labels back to label format if necessary
y_pred_label = np.argmax(y_pred, axis=1)
y_true_label = np.argmax(y_prod, axis=1)
# Calculate F1 score
f1 = f1_score(y_true_label, y_pred_label)
# Calculate AUC
# For AUC, you need the prediction probabilities for the positive class
y_pred_prob = y_pred[:, 1]  # Assuming class 1 is the 'positive' class
auc = roc_auc_score(y_true_label, y_pred_prob)
# Calculate Accuracy
accuracy = accuracy_score(y_true_label, y_pred_label)
# Print the metrics
print(f'F1 Score: {f1}')
print(f'AUC: {auc}')
print(f'Accuracy: {accuracy}')

F1 Score: 0.76
AUC: 0.9524404464434449
Accuracy: 0.8980169971671388
