In [None]:
import json
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn import metrics
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, Flatten, LSTM, Dense, concatenate, Reshape
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.utils import to_categorical

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!ls "/content/drive/My Drive/FYP"

'Copy of Fixed 5 second window raw1.0.ipynb'	      testPostureAtomicdata1D_502.json
'plot graphs with dimension reduction - Copy.ipynb'   testPostureAtomicdata1D_530.json
 Sync_502					      testPostureAtomicdata1D_v.json
 Sync_530					     'Training model.ipynb'
 Synchronized					      trainPostureAtomicdata1D.json
 test_541.json					      Untitled1.ipynb


In [None]:
import json

json_file_path = "/content/drive/My Drive/FYP/test_541.json"  # Update path if inside a folder

with open(json_file_path, 'r') as json_file:
    data = json.load(json_file)

print("File loaded successfully!")

File loaded successfully!


In [None]:
len(data['GlassAccelerometer_train'])

305

In [None]:
df = pd.DataFrame(data)

In [None]:
len(df)

305

In [None]:
# Define a function to calculate the minimum length of non-empty lists in each column
def min_list_length(column):
    non_empty_lists = [x for x in column if isinstance(x, list) and len(x) > 0]
    return min([len(x) for x in non_empty_lists]) if non_empty_lists else 0

# Use the apply function to apply the custom function to each column
min_lengths = df.apply(min_list_length)

# min_lengths will contain the minimum lengths of non-empty lists in each column
print(min_lengths)

GlassAccelerometer_train      48
GlassGyroscope_train          48
GlassMagnetometer_train       48
PhoneAccelerometer_train    1200
PhoneGyroscope_train        5880
PhoneMagnetometer_train     1200
WatchAccelerometer_train    1176
WatchGyroscope_train        1176
WatchMagnetometer_train     1176
Y_train                        0
dtype: int64


In [None]:
# Define other constants
feature_dimension = 1  # Since each column contains a 1D list
num_classes = 61  # Number of unique values in Y_train 61,55,6
num_features = 9  # Number of features in DataFrame
flatten_size =10

# Prepare data and train the model
y_train = np.array(df['Y_train'])

X_features = [np.array(df[column].to_list()) for column in df.columns[:-1]]


In [None]:
y_train = np.array(df['Y_train'])
values = np.unique(y_train)
print(values)

[ 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22]


In [None]:

# Create the input layers for each feature
input_layers = []
# Create separate CNNs for each feature
cnn_outputs = []
for feature in df.columns[:-1]:
    length = df[feature].apply(len)[0]
    input_layer = Input(shape=(length,1))
    input_layers.append(input_layer)
    cnn = Conv1D(32, kernel_size=3, activation='relu')(input_layer)
    cnn = MaxPooling1D(pool_size=2)(cnn)
    cnn = Conv1D(16, kernel_size=3, activation='relu')(cnn)
    cnn = MaxPooling1D(pool_size=2)(cnn)
    cnn = Flatten()(cnn)
    print(cnn)
    cnn = Dense(flatten_size, activation='relu')(cnn)
    cnn_outputs.append(cnn)

# Concatenate the CNN outputs
merged_cnn = concatenate(cnn_outputs)

<KerasTensor shape=(None, 160), dtype=float32, sparse=False, name=keras_tensor_5>
<KerasTensor shape=(None, 160), dtype=float32, sparse=False, name=keras_tensor_12>
<KerasTensor shape=(None, 160), dtype=float32, sparse=False, name=keras_tensor_19>
<KerasTensor shape=(None, 4768), dtype=float32, sparse=False, name=keras_tensor_26>
<KerasTensor shape=(None, 23488), dtype=float32, sparse=False, name=keras_tensor_33>
<KerasTensor shape=(None, 4768), dtype=float32, sparse=False, name=keras_tensor_40>
<KerasTensor shape=(None, 4672), dtype=float32, sparse=False, name=keras_tensor_47>
<KerasTensor shape=(None, 4672), dtype=float32, sparse=False, name=keras_tensor_54>
<KerasTensor shape=(None, 4672), dtype=float32, sparse=False, name=keras_tensor_61>


In [None]:
merged_cnn

<KerasTensor shape=(None, 90), dtype=float32, sparse=False, name=keras_tensor_63>

In [None]:
cnn_outputs

[<KerasTensor shape=(None, 10), dtype=float32, sparse=False, name=keras_tensor_6>,
 <KerasTensor shape=(None, 10), dtype=float32, sparse=False, name=keras_tensor_13>,
 <KerasTensor shape=(None, 10), dtype=float32, sparse=False, name=keras_tensor_20>,
 <KerasTensor shape=(None, 10), dtype=float32, sparse=False, name=keras_tensor_27>,
 <KerasTensor shape=(None, 10), dtype=float32, sparse=False, name=keras_tensor_34>,
 <KerasTensor shape=(None, 10), dtype=float32, sparse=False, name=keras_tensor_41>,
 <KerasTensor shape=(None, 10), dtype=float32, sparse=False, name=keras_tensor_48>,
 <KerasTensor shape=(None, 10), dtype=float32, sparse=False, name=keras_tensor_55>,
 <KerasTensor shape=(None, 10), dtype=float32, sparse=False, name=keras_tensor_62>]

In [None]:
# Reshape the concatenated CNN outputs to match the LSTM input shape
reshaped_cnn = Reshape((num_features, -1))(merged_cnn)

# LSTM branch
lstm = LSTM(128, return_sequences=True)(reshaped_cnn)
lstm = LSTM(64)(lstm)

# Fully connected layers
dense1 = Dense(128, activation='relu')(lstm)
output = Dense(num_classes, activation='softmax')(dense1)


In [None]:
# Training parameters
number_of_epoch = 25
batch_size = 32

# Create the combined model
model = Model(inputs=input_layers, outputs=output)

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Train the model
model.fit(X_features, y_train, epochs=number_of_epoch, batch_size=batch_size)

Epoch 1/25
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 392ms/step - accuracy: 0.0380 - loss: 4.0261
Epoch 2/25
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 294ms/step - accuracy: 0.1176 - loss: 3.5175
Epoch 3/25
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 282ms/step - accuracy: 0.1380 - loss: 3.0278
Epoch 4/25
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 407ms/step - accuracy: 0.3433 - loss: 2.5569
Epoch 5/25
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 275ms/step - accuracy: 0.4717 - loss: 1.9706
Epoch 6/25
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 276ms/step - accuracy: 0.6147 - loss: 1.5291
Epoch 7/25
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 434ms/step - accuracy: 0.6724 - loss: 1.1752
Epoch 8/25
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 274ms/step - accuracy: 0.7533 - loss: 0.9191
Epoch 9/25
[1m10/10[0m [32m━━━━━━━━━

<keras.src.callbacks.history.History at 0x7ea1d90a4590>

In [None]:
model.save('test_541_22act_trainedModel.keras')

In [None]:
del df
del data
del X_features
del y_train

# **Loading the Trained Model to Test over Test data**

In [None]:
model = tf.keras.models.load_model('test_541_22act_trainedModel.keras')

In [None]:
# Path to your test JSON file
test_json_file_path = "/content/drive/My Drive/FYP/test_502.json"  # Update path accordingly

# Load test data
with open(test_json_file_path, 'r') as json_file:
    test_data = json.load(json_file)

# Convert test data to DataFrame
import pandas as pd
df_test = pd.DataFrame(test_data)

# Extract features
X_test_features = [np.array(df_test[column].to_list()) for column in df_test.columns[:-1]]  # Assuming last column is Y_test

# Extract ground truth labels (if available)
y_test = np.array(df_test['Y_train'])  # Update based on your actual test label column

In [None]:
# Get model predictions
y_pred_probs = model.predict(X_test_features)

# Convert softmax probabilities to class predictions
y_pred = np.argmax(y_pred_probs, axis=1)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step


In [None]:
from sklearn.metrics import accuracy_score, classification_report

# Calculate accuracy
accuracy = accuracy_score(y_test, y_pred)
print(f"Test Accuracy: {accuracy:.2f}")

# Print classification report
print(classification_report(y_test, y_pred))

Test Accuracy: 0.83
              precision    recall  f1-score   support

           9       0.00      0.00      0.00         0
          16       1.00      0.83      0.91         6

    accuracy                           0.83         6
   macro avg       0.50      0.42      0.45         6
weighted avg       1.00      0.83      0.91         6



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [None]:
sample_index = 0  # Change this to test other samples

# Extract a single test sample
X_sample = [feature[sample_index:sample_index+1] for feature in X_test_features]

# Predict
sample_pred_probs = model.predict(X_sample)
sample_pred = np.argmax(sample_pred_probs, axis=1)

print(f"Predicted class: {sample_pred[0]}")
print(f"Actual Class: {y_test[0]}")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 60ms/step
Predicted class: 16
Actual Class: 16
