In [1]:
import os
import pandas as pd
import numpy as np
from scipy import stats
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from datetime import datetime

In [2]:
UUNs = ['s2106809', 's2100273', 's2104454']
cols = ['accel_x', 'accel_y', 'accel_z', 'gyro_x', 'gyro_y', 'gyro_z', 'activity']
data_file_name = f"all_data{datetime.now()}.csv"
if len(UUNs) < 3:
    name_prefix = '_'.join(UUNs)
    data_file_name = f"{name_prefix}_data{datetime.now()}.csv)"

all_data = os.path.join(os.getcwd(), data_file_name)
pd.DataFrame(columns=cols).to_csv(all_data, mode='w', header=True, index=False)

In [3]:
def is_req_file(filename):
    return ("Respeck" in filename and "clean" in filename and
            filename.split("_")[1] in UUNs)

data_dir = os.path.join(os.getcwd(), 'Respeck')

activities = []
for subdir in os.listdir(data_dir):
    if os.path.isdir(os.path.join(data_dir, subdir)):
        for file in os.listdir(os.path.join(data_dir, subdir)):
            df = pd.read_csv(os.path.join(data_dir, subdir, file))
            file_info = file.split('.')[0].split('_')
            act, subact = file_info[2:4]
            df['activity'] = pd.Series(f"{act} {subact}", index=df.index)
            df = df[cols]
            df.to_csv(all_data, mode='a', header=False, index=False)
            activities.append(f"{act} {subact}")

In [16]:
random_seed = 42
n_time_steps = 50
n_features = 6
step = 10
n_epochs = 10
batch_size = 64
learning_rate = 0.0015
l2_loss = 0.0015

segments = []
labels = []

In [5]:
activities = np.array(activities).reshape(-1, 1)
enc = OneHotEncoder(handle_unknown='ignore')
enc.fit(activities)

In [6]:
all_data = os.path.join(os.getcwd(), data_file_name)
df = pd.read_csv(all_data)

for i in range(0, len(df) - n_time_steps, step):
    xs = df['accel_x'].values[i: i + n_time_steps]
    ys = df['accel_y'].values[i: i + n_time_steps]
    zs = df['accel_z'].values[i: i + n_time_steps]
    gx = df['gyro_x'].values[i: i + n_time_steps]
    gy = df['gyro_y'].values[i: i + n_time_steps]
    gz = df['gyro_z'].values[i: i + n_time_steps]
    label = stats.mode(df['activity'][i: i + n_time_steps])[0][0]
    segments.append([xs, ys, zs, gx, gy, gz])
    labels.append(label)

  label = stats.mode(df['activity'][i: i + n_time_steps])[0][0]
  label = stats.mode(df['activity'][i: i + n_time_steps])[0][0]


In [7]:
np.array(segments).shape, np.array(labels).shape

((234780, 6, 50), (234780,))

In [8]:
reshaped_segments = np.asarray(segments, dtype= np.float32).reshape(-1, n_time_steps, n_features)
labels = np.asarray(pd.get_dummies(labels), dtype = np.float32)

In [9]:
reshaped_segments.shape, labels.shape

((234780, 50, 6), (234780, 44))

In [10]:
x_train, x_test, y_train, y_test = train_test_split(reshaped_segments, labels, test_size=0.2, random_state=random_seed)

In [11]:
x_train.shape, y_train.shape, x_test.shape, y_test.shape

((187824, 50, 6), (187824, 44), (46956, 50, 6), (46956, 44))

In [12]:
from keras import Sequential
from keras.layers import LSTM, Dense, Dropout

In [17]:
model = Sequential()
model.add(LSTM(100, input_shape=(n_time_steps, n_features)))
model.add(Dropout(0.5))
model.add(Dense(100, activation='relu'))
model.add(Dense(44, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [18]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_1 (LSTM)               (None, 100)               42800     
                                                                 
 dropout_1 (Dropout)         (None, 100)               0         
                                                                 
 dense_2 (Dense)             (None, 100)               10100     
                                                                 
 dense_3 (Dense)             (None, 44)                4444      
                                                                 
Total params: 57344 (224.00 KB)
Trainable params: 57344 (224.00 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [19]:
model.fit(x_train, y_train, epochs=n_epochs, batch_size=batch_size, verbose=1, validation_data=(x_test, y_test))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x2874ff9d0>

In [None]:
model.evaluate(x_test, y_test)

In [None]:
# save model
model.save('model1011.h5')

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.target_spec.supported_ops = [
  tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
  tf.lite.OpsSet.SELECT_TF_OPS # enable TensorFlow ops.
]
tflite_model = converter.convert()

# save tflite model
with open('model2510.tflite', 'wb') as f:
    f.write(tflite_model)

In [None]:
# Load the TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_path="model.tflite")
interpreter.allocate_tensors()

# Get input and output tensors
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Test the model on one instance
interpreter.set_tensor(input_details[0]['index'], x_test[0:1])
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
print(output_data)

In [None]:
correct_predictions = 0
total_predictions = 0

for i in range(len(x_test)):
    interpreter.set_tensor(input_details[0]['index'], x_test[i:i+1])
    interpreter.invoke()
    output_data = interpreter.get_tensor(output_details[0]['index'])
    total_predictions += 1
    if np.argmax(output_data) == np.argmax(y_test[i]):
        correct_predictions += 1

accuracy = correct_predictions / total_predictions
print('Test accuracy:', accuracy)

In [None]:
test_case = x_test[0:1]
interpreter.set_tensor(input_details[0]['index'], test_case)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
argmax = output_data.argmax()
print('Predicted activity:', enc.categories_[0][argmax])
print('Expected activity:' , enc.categories_[0][np.argmax(y_test[0])])