In [None]:
import numpy as np
from scipy.stats import skew, kurtosis, entropy
from scipy.signal import welch
from scipy.linalg import norm
from statsmodels.tsa.stattools import acf
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from sklearn.model_selection import train_test_split

## Data Preprocessing

In [None]:
trainMSAccelerometer = "TrainingData/trainMSAccelerometer.npy";
trainMSGyroscope = "TrainingData/trainMSGyroscope.npy";
trainLabels = "TrainingData/trainLabels.npy";

tr_msAcc = np.load(trainMSAccelerometer)
tr_msGyr = np.load(trainMSGyroscope)
tr_labels = np.load(trainLabels)

In [None]:
OPEN_DOOR = 20
RUB_HANDS = 36
CLOSE_DOOR = 9

tr_labels_OPEN_DOOR_idx = tr_labels == OPEN_DOOR
tr_labels_RUB_HANDS_idx = tr_labels == RUB_HANDS
tr_labels_CLOSE_DOOR_idx = tr_labels == CLOSE_DOOR

tr_msAcc_OPEN_DOOR = tr_msAcc[tr_labels_OPEN_DOOR_idx]
tr_msGyr_OPEN_DOOR = tr_msGyr[tr_labels_OPEN_DOOR_idx]

tr_msAcc_RUB_HANDS = tr_msAcc[tr_labels_RUB_HANDS_idx]
tr_msGyr_RUB_HANDS = tr_msGyr[tr_labels_RUB_HANDS_idx]

tr_msAcc_CLOSE_DOOR = tr_msAcc[tr_labels_CLOSE_DOOR_idx]
tr_msGyr_CLOSE_DOOR = tr_msGyr[tr_labels_CLOSE_DOOR_idx]

tr_labels_OPEN_DOOR = tr_labels[tr_labels_OPEN_DOOR_idx]
tr_labels_RUB_HANDS = tr_labels[tr_labels_RUB_HANDS_idx]
tr_labels_CLOSE_DOOR = tr_labels[tr_labels_CLOSE_DOOR_idx]

tr_msAcc_Three_Activities = np.concatenate((tr_msAcc_OPEN_DOOR, tr_msAcc_RUB_HANDS, tr_msAcc_CLOSE_DOOR))
tr_msGyr_Three_Activities = np.concatenate((tr_msGyr_OPEN_DOOR, tr_msGyr_RUB_HANDS, tr_msGyr_CLOSE_DOOR))
tr_labels_Three_Activities = np.concatenate((tr_labels_OPEN_DOOR, tr_labels_RUB_HANDS, tr_labels_CLOSE_DOOR))

np.save("TrainingData/train_MSAccelerometer_OpenDoor_RubHands.npy", tr_msAcc_Three_Activities)
np.save("TrainingData/train_MSGyroscope_OpenDoor_RubHands.npy", tr_msGyr_Three_Activities)
np.save("TrainingData/train_labels_OpenDoor_RubHands.npy", tr_labels_Three_Activities)

In [None]:
testMSAccelerometer = "TestingData/testMSAccelerometer.npy";
testMSGyroscope = "TestingData/testMSGyroscope.npy";
testLabels = "TestingData/testLabels.npy";

ts_msAcc = np.load(testMSAccelerometer)
ts_msGyr = np.load(testMSGyroscope)
ts_labels = np.load(testLabels)

In [None]:
OPEN_DOOR = 20
RUB_HANDS = 36
CLOSE_DOOR = 9

ts_labels_OPEN_DOOR_idx = ts_labels == OPEN_DOOR
ts_labels_RUB_HANDS_idx = ts_labels == RUB_HANDS
ts_labels_CLOSE_DOOR_idx = ts_labels == CLOSE_DOOR

ts_msAcc_OPEN_DOOR = ts_msAcc[ts_labels_OPEN_DOOR_idx]
ts_msGyr_OPEN_DOOR = ts_msGyr[ts_labels_OPEN_DOOR_idx]

ts_msAcc_RUB_HANDS = ts_msAcc[ts_labels_RUB_HANDS_idx]
ts_msGyr_RUB_HANDS = ts_msGyr[ts_labels_RUB_HANDS_idx]

ts_msAcc_CLOSE_DOOR = ts_msAcc[ts_labels_CLOSE_DOOR_idx]
ts_msGyr_CLOSE_DOOR = ts_msGyr[ts_labels_CLOSE_DOOR_idx]

ts_labels_OPEN_DOOR = ts_labels[ts_labels_OPEN_DOOR_idx]
ts_labels_RUB_HANDS = ts_labels[ts_labels_RUB_HANDS_idx]
ts_labels_CLOSE_DOOR = ts_labels[ts_labels_CLOSE_DOOR_idx]

ts_msAcc_Three_Activities = np.concatenate((ts_msAcc_OPEN_DOOR, ts_msAcc_RUB_HANDS,ts_msAcc_CLOSE_DOOR))
ts_msGyr_Three_Activities = np.concatenate((ts_msGyr_OPEN_DOOR, ts_msGyr_RUB_HANDS,ts_msGyr_CLOSE_DOOR))
ts_labels_Three_Activities = np.concatenate((ts_labels_OPEN_DOOR, ts_labels_RUB_HANDS,ts_labels_CLOSE_DOOR))

np.save("TestingData/test_MSAccelerometer_OpenDoor_RubHands.npy", ts_msAcc_Three_Activities)
np.save("TestingData/test_MSGyroscope_OpenDoor_RubHands.npy", ts_msGyr_Three_Activities)
np.save("TestingData/test_labels_OpenDoor_RubHands.npy", ts_labels_Three_Activities)

In [None]:
print(ts_msAcc_Three_Activities.shape)
print(ts_msGyr_Three_Activities.shape)
print(ts_labels_Three_Activities.shape)

In [None]:
def compute_features(data,tr_msAcc_Three_Activities):
    for i in range(tr_msAcc_Three_Activities.shape[0]):
        # Initialize an empty list to hold statistics for this sample
        stats = []
    
        # Maximum
        stats.append(np.max(tr_msAcc_Three_Activities[i], axis = 0))
    
        # Minimum
        stats.append(np.min(tr_msAcc_Three_Activities[i], axis = 0))
    
        # First-order mean
        mean_val = np.mean(tr_msAcc_Three_Activities[i], axis = 0)
        stats.append(mean_val)
    
        # Standard Deviation
        stats.append(np.std(tr_msAcc_Three_Activities[i], axis = 0))
    
        # Percentile 50
        stats.append(np.percentile(tr_msAcc_Three_Activities[i], 50, axis = 0))
    
        # Percentile 80
        stats.append(np.percentile(tr_msAcc_Three_Activities[i], 80, axis = 0))
    
        # Norm of the first-order mean
        stats.append(np.full(mean_val.shape, norm(mean_val)))
    
        # Average (same as mean)
        stats.append(mean_val)
    
        # Interquartile range
        stats.append(np.percentile(tr_msAcc_Three_Activities[i], 75, axis = 0) - np.percentile(tr_msAcc_Three_Activities[i], 25, axis = 0))
    
        # Second-order mean
        squared_mean = np.mean(np.square(tr_msAcc_Three_Activities[i]), axis = 0)
        stats.append(squared_mean)
    
        # Skewness
        stats.append(skew(tr_msAcc_Three_Activities[i], axis = 0))
    
        # Norm of the second-order mean
        stats.append(np.full(squared_mean.shape, norm(squared_mean)))
    
        # Zero-crossing
        zero_crossings = np.sum(np.diff(np.sign(tr_msAcc_Three_Activities[i]), axis = 0) != 0, axis = 0)
        stats.append(zero_crossings)
    
        # Kurtosis
        stats.append(kurtosis(tr_msAcc_Three_Activities[i], axis = 0))
    
        # Spectral energy
        frequencies, power_spectral_density = welch(tr_msAcc_Three_Activities[i], axis = 0)
        spectral_energy = np.sum(power_spectral_density, axis = 0)
        stats.append(spectral_energy)
    
        # Percentile 20
        stats.append(np.percentile(tr_msAcc_Three_Activities[i], 20, axis = 0))
    
        # Auto-correlation (assuming lag 1)
        autocorr = np.array([acf(tr_msAcc_Three_Activities[i][:, j], nlags = 1, fft = True)[1] for j in range(tr_msAcc_Three_Activities[i].shape[1])])
        stats.append(autocorr)
    
        # Spectral entropy
        power_spectral_density /= np.sum(power_spectral_density, axis = 0, keepdims = True)
        spectral_entropy = entropy(power_spectral_density, axis = 0)
        stats.append(spectral_entropy)
    
        # Convert list of arrays to a 2D array of shape (18, 3)
        stats_array = np.array(stats)
    
        # Store in pre-allocated data array
        data[i] = stats_array
    
    # Now `data` contains the computed statistics for each sample


# Training Data reshape and concatenate


In [None]:
data = np.empty((tr_msAcc_Three_Activities.shape[0], 18, 3))
compute_features(data,tr_msAcc_Three_Activities)
# reshape the data so that each row contain all features of the one example(x-axis,y-axis,z-axis)
data = np.reshape(data,(tr_msAcc_Three_Activities.shape[0],1,-1))
data[0,0,:]
print(data.shape)
tr_msAcc_Three_Activities = data

In [None]:
data = np.empty((tr_msGyr_Three_Activities.shape[0], 18, 3))
compute_features(data,tr_msGyr_Three_Activities)
data = np.reshape(data,(tr_msGyr_Three_Activities.shape[0],1,-1))
data[0,0,:]
print(data.shape)
tr_msGyr_Three_Activities = data

In [None]:
train_data = np.concatenate((tr_msAcc_Three_Activities, tr_msGyr_Three_Activities), axis=2)
train_labels = tr_labels_Three_Activities

train_data = np.squeeze(train_data, axis=1)
train_labels = train_labels[:, np.newaxis]
print(train_data.shape)
print(train_labels.shape)


# Original labels    new lables
# OPEN_DOOR = 20 --> 0
# RUB_HANDS = 36 --> 1
# CLOSE_DOOR = 4 --> 2

for i in range(train_data.shape[0]):
    if train_labels[i] == 20:
        train_labels[i] = 0;
    elif train_labels[i] == 36:
        train_labels[i] = 1;
    elif train_labels[i] == 9:
        train_labels[i] = 2;


indices = np.random.permutation(train_data.shape[0])
train_data = train_data[indices]
train_labels = train_labels[indices]

# Test Data reshape and concatenate

In [None]:
data = np.empty((ts_msAcc_Three_Activities.shape[0], 18, 3))
compute_features(data,ts_msAcc_Three_Activities)
# reshape the data so that each row contain all features of the one example(x-axis,y-axis,z-axis)
data = np.reshape(data,(ts_msAcc_Three_Activities.shape[0],1,-1))
data[0,0,:]
print(data.shape)
ts_msAcc_Three_Activities = data

In [None]:
data = np.empty((ts_msGyr_Three_Activities.shape[0], 18, 3))
compute_features(data,ts_msGyr_Three_Activities)
data = np.reshape(data,(ts_msGyr_Three_Activities.shape[0],1,-1))
data[0,0,:]
print(data.shape)
ts_msGyr_Three_Activities = data

In [None]:


test_data = np.concatenate((ts_msAcc_Three_Activities, ts_msGyr_Three_Activities), axis = 2)
test_labels = ts_labels_Three_Activities

test_data = np.squeeze(test_data, axis = 1)
test_labels = test_labels[:, np.newaxis]

# Original labels    new lables
# OPEN_DOOR = 20 --> 0
# RUB_HANDS = 36 --> 1
# CLOSE_DOOR = 4 --> 2

for i in range(test_data.shape[0]):
    if test_labels[i] == 20:
        test_labels[i] = 0;
    elif test_labels[i] == 36:
        test_labels[i] = 1;
    elif test_labels[i] == 9:
        test_labels[i] = 2;


indices = np.random.permutation(test_data.shape[0])
test_data = test_data[indices]
test_labels = test_labels[indices]

# Implementation

In [None]:
tf.random.set_seed(1234)
model = Sequential(
    [               
        tf.keras.Input(shape=(108,)),
        # Dense(units=32,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.1)), 
        Dense(units=12,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.1)),  
        # Dense(units=6,activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.1)),
        Dense(units=3,activation='linear')
    ], name = "my_model" 
)
model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
             metrics = ['accuracy'])

In [None]:
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='accuracy',patience = 100,restore_best_weights = True)

model.fit(
    train_data, train_labels,
    batch_size = 64,
    epochs=400,
    # callbacks = [early_stopping]
)


In [None]:
# indices = np.random.permutation(108)
# data = data[indices]
# labels = labels[indices]
# print(labels)

train_prediction = model.predict(train_data)

train_prediction = tf.nn.softmax(train_prediction).numpy()
print(train_data.shape)
count = 0;
for i in range(train_data.shape[0]):
    if train_labels[i] == np.argmax(train_prediction[i]):
        count+=1;
print(count)
print((count/train_data.shape[0])*100)
# for i in range(135):
#     print( f"{train_labels[i]}, category: {np.argmax(train_prediction[i])}")

In [None]:
test_prediction = model.predict(test_data)
test_prediction = tf.nn.softmax(test_prediction).numpy();
count = 0;
for i in range(test_data.shape[0]):
    if test_labels[i] == np.argmax(test_prediction[i]):
        count+=1;
print(count)
print((count/test_data.shape[0])*100)
# for i in range(test_data.shape[0]):
#     print( f"{test_labels[i]}, category: {np.argmax(test_prediction[i])}")