In [30]:
import json
import numpy as np
import pandas as pd
import plotly.express as px
import tensorflow as tf
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
from tensorflow import keras
from tensorflow.keras import layers

---
### Load the dataset and split train test split
---

In [31]:
# Load the data
with open('../data/curated-data-train.json') as f:
    data_train = json.load(f)
df_train = pd.DataFrame(data_train).T
# Filter dataframe to only include the columns we want
df_train = df_train[['back', 'left', 'right']]
# Separate the three data into the same label
for i in range(3):
    for pose in ['back', 'left', 'right']:
        df_train[f'{pose}_{i}'] = df_train[pose].apply(lambda x: x[i])
# Drop the original columns
df_train = df_train.drop(columns=['back', 'left', 'right'])
# Expand the data into a single column
df_train = df_train.melt()
# Rename the 'variable' values to only include the pose
df_train['variable'] = df_train['variable'].apply(lambda x: x.split('_')[0])
# Rename the columns
df_train.columns = ['posture', 'reading']
df_train.head()

Unnamed: 0,posture,reading
0,back,"[[23.64, 23.91, 23.95, 23.82, 24.64, 24.24, 25..."
1,back,"[[24.09, 24.33, 24.21, 24.42, 25.24, 24.97, 25..."
2,back,"[[24.92, 24.97, 25.39, 25.1, 26.22, 26.13, 26...."
3,back,"[[26.39, 25.85, 26.58, 26.84, 27.47, 27.24, 27..."
4,back,"[[25.03, 24.98, 24.68, 24.7, 25.71, 25.73, 25...."


In [32]:
# Load the data
with open('../data/curated-data-test.json') as f:
    data_test = json.load(f)
df_test = pd.DataFrame(data_test).T
# Filter dataframe to only include the columns we want
df_test = df_test[['back', 'left', 'right']]
# Separate the three data into the same label
for i in range(3):
    for pose in ['back', 'left', 'right']:
        df_test[f'{pose}_{i}'] = df_test[pose].apply(lambda x: x[i])
# Drop the original columns
df_test = df_test.drop(columns=['back', 'left', 'right'])
# Expand the data into a single column
df_test = df_test.melt()
# Rename the 'variable' values to only include the pose
df_test['variable'] = df_test['variable'].apply(lambda x: x.split('_')[0])
# Rename the columns
df_test.columns = ['posture', 'reading']
df_test.head()

Unnamed: 0,posture,reading
0,back,"[[24.34, 24.45, 24.23, 24.19, 24.62, 24.79, 25..."
1,back,"[[23.11, 23.47, 23.46, 23.26, 24.27, 24.1, 24...."
2,back,"[[23.27, 23.49, 23.71, 23.37, 24.37, 24.4, 25...."
3,back,"[[24.16, 23.52, 24.36, 23.67, 24.85, 24.1, 24...."
4,back,"[[23.32, 22.9, 23.3, 22.98, 23.96, 23.38, 24.0..."


In [33]:
# Split the data into training and testing sets
X_train = df_train['reading']
y_train = df_train['posture']
X_test = df_test['reading']
y_test = df_test['posture']
# One-hot encode the labels
y_train = pd.get_dummies(y_train).values
y_test = pd.get_dummies(y_test).values

In [34]:
X_train = np.array([np.array(x) for x in X_train])
X_train = np.expand_dims(X_train, -1)
X_test = np.array([np.array(x) for x in X_test])
X_test = np.expand_dims(X_test, -1)
y_train = np.array([np.array(x) for x in y_train])
y_test = np.array([np.array(x) for x in y_test])
# Convert the data to tensors
X_train = tf.convert_to_tensor(X_train)
X_test = tf.convert_to_tensor(X_test)
y_train = tf.convert_to_tensor(y_train)
y_test = tf.convert_to_tensor(y_test)

---
### Traininig a generic CNN model
---

In [35]:
def create_model():
    model = keras.Sequential(
        [
            layers.Conv2D(8, kernel_size=(3, 3), activation='relu', padding='same', input_shape=X_train.shape[1:]),
            layers.MaxPooling2D(pool_size=(2, 2)),
            layers.Conv2D(4, kernel_size=(3, 3), activation='relu', padding='same'),
            layers.MaxPooling2D(pool_size=(2, 2)),
            layers.Flatten(),  # Flatten the output of the CNN
            layers.Dense(32, activation='relu'),
            layers.Dense(16, activation='relu'),
            layers.Dense(3, activation='softmax'),
        ]
    )
    model.compile(
        loss='categorical_crossentropy',
        optimizer='adam',
        metrics=['accuracy'],
    )
    return model

In [36]:
early_stopping = keras.callbacks.EarlyStopping(
    patience=20,
    min_delta=0.001,
    restore_best_weights=True,
)
reduce_lr = keras.callbacks.ReduceLROnPlateau(
    patience=5
)

In [37]:
# Cross-validation
kf = KFold(n_splits=5, shuffle=True)
scores = []
models = []
for train_index, test_index in kf.split(X_train):
    X_train_cv, X_test_cv = tf.gather(X_train, train_index), tf.gather(X_train, test_index)
    y_train_cv, y_test_cv = tf.gather(y_train, train_index), tf.gather(y_train, test_index)
    model = create_model()
    model.fit(
        X_train_cv,
        y_train_cv,
        validation_data=(X_test_cv, y_test_cv),
        epochs=1000,
        batch_size=16,
        callbacks=[early_stopping, reduce_lr],
        verbose=0,
    )
    score = model.evaluate(X_test_cv, y_test_cv)
    scores.append(score[1])
    models.append(model)
    print(score)
# Print the scores
print(scores)

[1.0043184757232666, 0.6666666865348816]
[1.0802910327911377, 0.4444444477558136]
[0.728689968585968, 0.7777777910232544]
[0.411985844373703, 0.9259259104728699]
[0.8543956875801086, 0.7407407164573669]
[0.6666666865348816, 0.4444444477558136, 0.7777777910232544, 0.9259259104728699, 0.7407407164573669]


In [38]:
# Select the best model
best_model = models[np.argmax([score for score in scores])]
# Reset learning rate
best_model.optimizer.lr = 0.001
# Train the best model on the entire training set
hist = best_model.fit(
    X_train,
    y_train,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=16,
)
# Evaluate the model
best_loss, best_score = best_model.evaluate(X_test, y_test)
print(best_score)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [39]:
px.line(
    pd.DataFrame(hist.history),
    y=['loss', 'val_loss'],
    labels={'index': 'epoch', 'value': 'loss'},
    title='Loss over time',
).show()

In [40]:
preds = model.predict(X_test)
preds = np.argmax(preds, axis=1)
y_test_max = np.argmax(y_test, axis=1)
# Create a confusion matrix
confusion_matrix = tf.math.confusion_matrix(labels=y_test_max, predictions=preds).numpy()
# Create a dataframe from the confusion matrix
confusion_matrix = pd.DataFrame(confusion_matrix, index=['back', 'left', 'right'], columns=['back', 'left', 'right'])
# Plot the confusion matrix
px.imshow(confusion_matrix, color_continuous_scale='Blues')



In [41]:
values = {
    0: 'back',
    1: 'left',
    2: 'right',
}

# Get the X inputs that were misclassified
misclassified = X_test[y_test_max != preds]
# Get the y inputs that were misclassified
real_labels = y_test_max[y_test_max != preds]
pred_labels = preds[y_test_max != preds]
# Plot the misclassified inputs
for i in range(len(misclassified)):
    fig = px.imshow(
        img = misclassified[i, :, :, 0]
    )
    fig.update_layout(
        title=f'Predicted: {values[pred_labels[i]]}, '
        f'Actual: {values[real_labels[i]]}'
    )
    fig.show()

---
### Predicting for whole data
---

In [43]:
X = np.concatenate([X_train, X_test])
y = np.concatenate([y_train, y_test])
X_total = tf.convert_to_tensor(X)
y_total = tf.convert_to_tensor(y)

In [44]:
y_pred = model.predict(X_total)
y_pred_max = np.argmax(y_pred, axis=1)
y_total_max = np.argmax(y_total, axis=1)



In [45]:
# Create a confusion matrix
confusion_matrix = tf.math.confusion_matrix(labels=y_total_max, predictions=y_pred_max).numpy()
# Create a dataframe from the confusion matrix
confusion_matrix = pd.DataFrame(confusion_matrix, index=['back', 'left', 'right'], columns=['back', 'left', 'right'])
# Plot the confusion matrix
fig = px.imshow(confusion_matrix, color_continuous_scale='Blues')
fig.update_layout(
    title='Confusion Matrix for CNN',
    xaxis_title='Predicted',
    yaxis_title='Actual',
)

In [46]:
print(f'Accuracy: {np.sum(y_pred_max == y_total_max) / len(y_total_max)}')

Accuracy: 0.7777777777777778


In [None]:
model.save_weights('./cross-validation/best_model')
model.save('./posture_classifier_cv.keras')