In [None]:
from datetime import time
from datetime import date
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pickle as pkl
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from sklearn.model_selection import TimeSeriesSplit

In [None]:
# For reading in the pickled files

subjects_dir = '' # Directory where the pickled user files are located
subjects = os.listdir(subjects_dir)
subjects_data = []
s = []
file_names = [] # Specify here the files to read from. Should be comma seperated in list. Removed for privacy.

file_map = {index: file_name for index, file_name in enumerate(file_names)}

# The following subjects have insufficient amount of data, so ignore them
leave_out = [] # Here you can specify which users to leave out from further use

# Iterate over file_names and load subject data for valid indices
for index, file_name in file_map.items():
    if index not in leave_out:
        subject_file = os.path.join(subjects_dir, file_name)
        # Load subject data from file
        with open(subject_file, 'rb') as f:
            subject_data = pkl.load(f)
            s.append(file_name)
            subjects_data.append(subject_data) # Array of data per subject. Index is used as ID of the user.

In [None]:
# Function to make sliding window from array
def sliding_window (xs, window_size, stride):
    ps = []
    if len(xs) <= window_size:
        return []
    for i in range(len(xs) - window_size + 1):
        if i % stride == 0:
            ps.append(xs[i: i + window_size])
    return ps

# Test case
print(sliding_window([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 4 , 1))

In [None]:
# Function to average the handcrafted features of multiple arrays
def handcraft(xs):
    s = [0, 0, 0, 0]
    n = len(xs)
    for i in xs:
        h = handcraft_simple(i)
        for j in range(4):
            s[j] += h[j]
    new_s = np.array(s) / n
    return new_s

# Function to simply create handcrafted features for a single array
def handcraft_simple(hr):
    min_hr = min(hr)
    max_hr = max(hr)
    mean_hr = np.mean(hr)
    var_hr = np.var(hr)
        
    return [min_hr, max_hr, mean_hr, var_hr]
# Test case
print(handcraft([[2,4, 3], [2,6], [2], [2]]))

In [None]:
# Function to get the mean and standard deviation of an array
def normalize_vars(xs):
    valid_values = xs[~np.isnan(xs)]
    mean = np.mean(valid_values)
    sd = np.std(valid_values)
    return(mean, sd)

# Function to normalize array based on mean and standard deviation calculated before.
def normalize(xs, mean, sd):
    templist = []
    for i in range(len(xs)):
        templist.append((xs[i] - mean) / sd)
    return templist

# Test case
test1 = np.array([1, 2, 3 ,4, 5, 6])
vrs1 = normalize_vars(test1)
test2 = np.array([np.NaN, np.NaN, np.NaN, 1, np.NaN, np.NaN])
vrs2 = normalize_vars(test2)
print(vrs1)
print(vrs2)
print(normalize(test1, vrs1[0], vrs1[1]))

In [None]:
# Filter from 0:00 - 6:00

subjects = len(subjects_data) # 54
subjects_data_night = []
for i in range(subjects):
    df = subjects_data[i]
    df['time'] = pd.to_datetime(df['time']).dt.time
    out = df[df['time'].between(time(0, 0), time(6, 0))]
    subjects_data_night.append(out)
    
print(len(subjects_data_night[0]))
print(len((subjects_data[0])))

In [None]:
# Split data into days

subjects = len(subjects_data) # 54
subjects_data_by_day = []
for i in range(subjects): # splits dataframe for each user into days
    df = subjects_data[i]
    day = df['time'].dt.floor("D")
    agg = df.groupby([day])
    smallList = []
    for i in agg:
        # Take only days of which there are enough readings! 14000 is approximately 20 hours.
        if (len(i[1]) >= 14000): 
            steps = i[1].steps
            newsteps = steps[~np.isnan(steps)]
            smallList.append(i[1].head(14000)) 
    subjects_data_by_day.append(smallList)
print(subjects_data_by_day[0]) # [0] refers to data from user 0, [0][0] refers to first window of user 0.

In [None]:
# Find how many days there are for each user
for i in range(subjects):
    l = len(subjects_data_by_day[i])
    print(l, i)

In [None]:
balance = 330  # amount of days to take for every user
identity = np.identity(subjects) # used for one hot encoding

# 0. Convert dataframe to np heart rate
subjects_data_hr = []
# Used for experiment 3
# for i in range(subjects):
#     if (i == 4):
#         smallList = []
#         for j in range(len(subjects_data_by_day[i][:445])): # Take all possible windows for this user!
#             smallList.append(subjects_data_by_day[i][j].hr)
#         subjects_data_hr.append(smallList)
#     else:
#         smallList = []
#         for j in range(len(subjects_data_by_day[i][:balance])):
#             smallList.append(subjects_data_by_day[i][j].hr)
#         subjects_data_hr.append(smallList)
# Alternative setup
for i in range(subjects):
    smallList = []
    for j in range(len(subjects_data_by_day[i][:balance])):
        smallList.append(subjects_data_by_day[i][j].hr)
    subjects_data_hr.append(smallList)


# 1. Convert to train test split
X_train_users_temp = []
X_test_users_temp = []
Y_train_users = []
Y_test_users = []
for i in range(subjects):
    X_temp = np.array(subjects_data_hr[i])
    Y_temp = np.full(shape=len(X_temp), fill_value=i)
    X_train_temp, X_test_temp, Y_train_temp, Y_test_temp = train_test_split(X_temp, Y_temp, test_size=0.2, shuffle=False)
    
# Used in experiment 3
#     if i == 4:
#         X_train_sliced = X_train_temp[92:356]
#         print(len(X_train_sliced))
#         print(handcraft(X_train_sliced))
#         print(handcraft(X_test_temp))
#         X_train_users_temp.append(X_train_sliced)
#         X_test_users_temp.append(X_test_temp) # 66 is taken anyways when making X_train 
#     else: 
#         X_train_sliced = X_train_temp[0:264] # 
#         X_train_users_temp.append(X_train_sliced)
#         X_test_users_temp.append(X_test_temp)
# Alternative setup
    X_train_sliced = X_train_temp[0:264] # 
    X_train_users_temp.append(X_train_sliced)
    X_test_users_temp.append(X_test_temp)
    

# 2. normalize on variables from train set
mean, sd = normalize_vars(np.concatenate(np.concatenate(X_train_users_temp)))
print(mean)
print(sd)
X_train_users_hr = []
X_test_users_hr = []
for i in range(subjects):
    X_train_temp = np.array(normalize(X_train_users_temp[i], mean, sd))
    X_test_temp = np.array(normalize(X_test_users_temp[i], mean, sd))
    
    X_train_users_hr.append(X_train_temp)
    X_test_users_hr.append(X_test_temp)


In [None]:
# Convert windows to average age to start of test set (in days)
s = []
for i in range(subjects):
    f = subjects_data_by_day[i][0].time.head(1).to_numpy()[0]
    t = subjects_data_by_day[i][264].time.head(1).to_numpy()[0]
    dif = int((t-f)/86400000000000) # Convert from nanoseconds to days
    s.append(dif)
print(np.mean(s))
print(s)

In [None]:
# Convert windows to age to start of test set (in days), for experiment 3
f = subjects_data_by_day[1][0].time.head(1).to_numpy()[0]
t = subjects_data_by_day[1][373].time.head(1).to_numpy()[0]
dif = int((t-f)/86400000000000) # Convert from nanoseconds to days
print(dif)

In [None]:
# 0. Convert dataframe to np steps
subjects_data_steps = []
# Used for experiment 3
# for i in range(subjects):
#     if (i == 6):
#         smallList = []
#         for j in range(len(subjects_data_by_day[i][:467])):
#             smallList.append(subjects_data_by_day[i][j].steps)
#         subjects_data_steps.append(smallList)
#     else:
#         smallList = []
#         for j in range(len(subjects_data_by_day[i][:balance])):
#             smallList.append(subjects_data_by_day[i][j].steps)
#         subjects_data_steps.append(smallList)

# Alternative setup
for i in range(subjects):
    smallList = []
    for j in range(len(subjects_data_by_day[i][:balance])):
        smallList.append(subjects_data_by_day[i][j].steps)
    subjects_data_steps.append(smallList)

# 1. Convert to train test
X_train_users_temp = []
X_test_users_temp = []
Y_train_users = []
Y_test_users = []
for i in range(subjects):
    X_temp = np.array(subjects_data_steps[i])
    Y_temp = np.full(shape=len(X_temp), fill_value=i)
    X_train_temp, X_test_temp, Y_train_temp, Y_test_temp = train_test_split(X_temp, Y_temp, test_size=0.2, shuffle=False)
# Used for experiment 3
#     if i == 6:
#         X_train_sliced = X_train_temp[109:373] # possible 352
#         X_train_users_temp.append(X_train_sliced)
#         X_test_users_temp.append(X_test_temp)
#     else: 
#         X_train_sliced = X_train_temp[0:264] # 
#         X_train_users_temp.append(X_train_sliced)
#         X_test_users_temp.append(X_test_temp)

# Alternative setup
    X_train_sliced = X_train_temp[0:264] # 
    X_train_users_temp.append(X_train_sliced)
    X_test_users_temp.append(X_test_temp)

# 3. normalize on variables from train set
mean, sd = normalize_vars(np.concatenate(np.concatenate(X_train_users_temp)))
print(mean)
print(sd)
X_train_users_steps_temp = []
X_test_users_steps_temp = []
for i in range(subjects):
    X_train_temp = np.array(normalize(X_train_users_temp[i], mean, sd))
    X_test_temp = np.array(normalize(X_test_users_temp[i], mean, sd))
    
    X_train_users_steps_temp.append(X_train_temp)
    X_test_users_steps_temp.append(X_test_temp)

# 1.5 convert nan to unrealistic value
X_train_users_steps = []
X_test_users_steps = []
for i in range(subjects):
    tempList = []
    for j in range(len(X_train_users_steps_temp[i])):
        window = X_train_users_steps_temp[i][j]
        window = np.where(np.isnan(window), mean, window)
        tempList.append(window)
    X_train_users_steps.append(tempList)
for i in range(subjects):
    tempList = []
    for j in range(len(X_test_users_steps_temp[i])):
        window = X_test_users_steps_temp[i][j]
        window = np.where(np.isnan(window), mean, window)
        tempList.append(window)
    X_test_users_steps.append(tempList)

In [None]:
# 4. apply sliding window
X_train = []
X_test = []
Y_train = []
Y_test = [] #
train_len = int(balance * 0.8) # 264
test_len = int(balance * 0.2) # 66
for j in range(train_len):
    for i in range(subjects):
        windows_train_hr = X_train_users_hr[i][j]
        windows_train_steps = X_train_users_steps[i][j]

        X_train.append(np.array([windows_train_hr, windows_train_steps]))
        Y_train.append(identity[i])
    print(j)
print("done with making training windows!")
for j in range(test_len):
    for i in range(subjects):
        windows_test_hr = X_test_users_hr[i][j]
        windows_test_steps = X_test_users_steps[i][j]

        X_test.append(np.array([windows_test_hr, windows_test_steps]))
        Y_test.append(identity[i])
    print(j)
print("done with making testing windows!")


In [None]:
class CNN_LSTM(nn.Module):
    def __init__(self, out_channels1, out_channels2, kernel_size1, kernel_size2, stride1, stride2, lstm_hidden, lstm_layers, subjects):
        super(CNN_LSTM, self).__init__()
        
        self.conv1 = nn.Conv1d(2, out_channels1, kernel_size1, stride1)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(0.1)
        self.maxpool1 = nn.MaxPool1d(kernel_size=2)
        
        self.conv2 = nn.Conv1d(out_channels1, out_channels2, kernel_size2, stride2)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(0.1)
        self.maxpool2 = nn.MaxPool1d(kernel_size=2)
        
        self.flatten = nn.Flatten()
        self.lstm = nn.LSTM(input_size=864, hidden_size=lstm_hidden, num_layers=lstm_layers, batch_first=True)
        self.linear = nn.Linear(lstm_hidden, subjects)
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.dropout1(x)
        x = self.maxpool1(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.dropout2(x)
        x = self.maxpool2(x)
        
        x = self.flatten(x)
        x = x.view(x.size(0), 1, -1)  # Reshape the tensor to have a sequence dimension

        x, _ = self.lstm(x)
        x = x[:, -1, :]  # Select the last LSTM output of the sequence
        x = self.linear(x)
        x = self.softmax(x)
        
        return x

In [None]:
input_size = 14000  # data for one day
# Hyperparameters
lr = 0.0005
epochs = 100
batch_size = 64

# Device to run DNN on GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Reshape the training and testing data to match the expected input shape of the model
X_train_new = np.reshape(X_train, (len(X_train), 2, input_size))  # Should be (samples, timestamps, features)
X_test_new = np.reshape(X_test, (len(X_test), 2, input_size))


# Define the model architecture

# Model with hyperparameters included
model = CNN_LSTM(
    out_channels1=4, 
    out_channels2=8, 
    kernel_size1=5, 
    kernel_size2=64, 
    stride1=1, 
    stride2=32, 
    lstm_hidden=128,
    lstm_layers=1,
    subjects=subjects).to(device)

# Optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=lr)
loss_fn = nn.CrossEntropyLoss()

accuracy_test = []
accuracy_train = []
f1_score_test = []
losses = []

# Training loop
for epoch in range(epochs):
    correct_class_train = np.zeros(subjects)
    total_class_train = np.zeros(subjects)
    count_train = 0

    # Training
    model.train()
    for i in range(0, len(X_train_new), batch_size):
        remaining_samples = min(batch_size, len(X_train_new) - i)
        X_batch = torch.tensor(np.array(X_train_new[i: i + remaining_samples]), dtype=torch.float32)
        Y_batch = torch.tensor(np.array(Y_train[i: i + remaining_samples]))
        y_pred = model(X_batch)

        loss = loss_fn(y_pred, Y_batch)
        optimizer.zero_grad()

        y_correct_train = np.argmax(Y_batch.detach().cpu().numpy(), axis=1)
        correct_class_train += np.sum(np.argmax(y_pred.detach().cpu().numpy(), axis=1) == y_correct_train)

        count_train += np.sum(np.argmax(y_pred.detach().cpu().numpy(), axis=1) == y_correct_train)

        loss.backward()
        optimizer.step()

    losses.append(loss.item())
    accuracy_train.append(count_train / len(X_train_new))

    # Testing
    model.eval()
    with torch.no_grad():
        correct_class_test = np.zeros(subjects)
        total_class_test = np.zeros(subjects)
        count_test = 0
        predicted_labels = []
        true_labels = []

        for k in range(len(X_test_new)):
            X_batch = np.reshape(X_test_new[k], (1, 2, input_size))
            tensor = torch.tensor(X_batch, dtype=torch.float32).to(device)
            y_pred = model(tensor)
            y_correct = np.argmax(Y_test[k])
            total_class_test[y_correct] += 1

            if np.argmax(y_pred.detach().cpu().numpy()) == y_correct:
                correct_class_test[y_correct] += 1
                count_test += 1

            predicted_labels.append(np.argmax(y_pred.detach().cpu().numpy()))
            true_labels.append(y_correct)

        accuracy_test.append(count_test / len(X_test_new))
        f1 = f1_score(true_labels, predicted_labels, average='weighted')
        f1_score_test.append(f1)

    final_class_test = correct_class_test / total_class_test


    print(f"Epoch {epoch + 1}/{epochs}:")
    print(f"  Train Loss: {losses[-1]:.4f} - Train Accuracy: {accuracy_train[-1]*100:.2f}%")
    print(f"  Test Accuracy: {accuracy_test[-1]*100:.2f}% - Test F1 Score: {f1_score_test[-1]:.4f}")
    print()
# Output e.g. 
#  Epoch 1/100:
#    Train Loss: 2.2145 - Train Accuracy: 21.69%
#    Test Accuracy: 36.23% - Test F1 Score: 0.2620



print("Done")

In [None]:
# Plotting
plt.figure(figsize=(18, 12))
print(np.max(accuracy_test))

# Training, and Test Accuracy plot
plt.subplot(2, 3, 1)
plt.plot(accuracy_train, label='Training Accuracy', color='blue')
plt.plot(accuracy_test, label='Test Accuracy', color='red')
plt.title("Training and Test Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.ylim(0, 1)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.legend()

plt.subplot(2, 3, 4)
plt.plot(accuracy_test, label='Test Accuracy', color='red')
plt.title("Test Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.ylim(0,1)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.legend()
print(np.max(final_class_test[6]))

# Training Loss plot
plt.subplot(2, 3, 2)
plt.plot(losses)
plt.title("Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")

# Final Testing Accuracy per subject plot
plt.subplot(2, 3, 5)
plt.bar(range(subjects), final_class_test)
plt.title("Final Testing Accuracy Per Subject")
plt.xlabel("Class")
plt.ylabel("Accuracy")
plt.ylim(0, 1)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.xticks(range(subjects), range(0, subjects))  # Add class numbering

plt.tight_layout()
plt.show()

In [None]:
# To print the model architecture
print(model)