In [22]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten
from sklearn.preprocessing import LabelEncoder, StandardScaler

from scipy import stats
from sklearn.metrics import classification_report

In [2]:
# read datasets
train_df = pd.read_csv('../train_data.csv')
validation_df = pd.read_csv('../validation_data.csv')
test_df = pd.read_csv('../test_data.csv')

In [3]:
# BUILD SLIDING WINDOW
# df - dataframe used
# window_size - size of the sliding window, by default 11s if not mentioned otherwise
# step_size - starting point for the current window given the previous, by default 5
# feature_cols - features to be used in the sliding window
def create_windows(dataset, window_size=11, step_size=5, feature_cols=['ACC_X', 'ACC_Y', 'ACC_Z']):
    X = []
    y = []
    window = []

    for person_id in dataset['PERSON_ID'].unique():
        person_data = dataset[dataset['PERSON_ID'] == person_id]
        feature_values = person_data[feature_cols].values
        activity = person_data['ACTIVITY']

        max_window_end = len(person_data)

        for i in range(0, max_window_end - window_size, step_size):
            window = feature_values[i:i+window_size]
            window_label = activity[i:i+window_size].mode(dropna=False).iloc[0]

            # Ensure the window is of the correct size
            if len(window) != window_size:
                continue  # Skip this window if it's the wrong shape

            X.append(window)
            y.append(window_label)

    print(len(X))

    return np.array(X), np.array(y)

In [7]:
# train_dataset = pd.read_csv('../train_data.csv')
# validation_dataset = pd.read_csv('../validation_data.csv')
# test_dataset = pd.read_csv('../test_data.csv')

# min_count_train = min(train_dataset['ACTIVITY'].value_counts())
# min_count_validation = min(validation_dataset['ACTIVITY'].value_counts())
# min_count_test = min(test_dataset['ACTIVITY'].value_counts())

# balanced_train_data = train_dataset.groupby('ACTIVITY').head(min_count_train).reset_index(drop=True)
# balanced_validation_data = validation_dataset.groupby('ACTIVITY').head(min_count_validation).reset_index(drop=True)
# balanced_test_data = test_dataset.groupby('ACTIVITY').head(min_count_test).reset_index(drop=True)

In [4]:
window_size = 60
step_size = 15

In [None]:
X_train, y_train = create_windows(train_df, window_size=window_size, step_size=step_size)
X_val, y_val = create_windows(validation_df, window_size=window_size, step_size=step_size)
X_test, y_test = create_windows(test_df, window_size=window_size, step_size=step_size)

In [None]:
print(X_train)

In [None]:
unique_values, counts = np.unique(y_train, return_counts=True)
count_dict = {str(k): int(v) for k, v in zip(unique_values, counts)}
count_dict_sorted = {k: v for k, v in sorted(count_dict.items(), key=lambda item: item[1], reverse=True)}
print(count_dict_sorted)

In [None]:
weights = {}

for class_name, class_weight in count_dict.items():
    # weight_for_class_i = (total_number_of_rows_in_dataset) / (number_of_classes * total_number_of_rows_with_class_i)
    weight_value = (np.size(y_train) / (len(count_dict) * class_weight))
    # initialize weights dictionary in the format class_new_weight: weight_value 
    weights[class_name] = weight_value

print(weights)

In [9]:
# NORMALIZE DATA FOR THIS MODEL
scaler = StandardScaler()

n_samples = X_train.shape[0]
n_timesteps = X_train.shape[1]
n_features = X_train.shape[2]

In [10]:
X_train_flat = X_train.reshape(-1, X_train.shape[-1])  # Flatten each window into a 1D array
X_val_flat = X_val.reshape(-1, X_val.shape[-1])
X_test_flat = X_test.reshape(-1, X_test.shape[-1])

In [None]:
print(X_train_flat)

In [12]:
X_train_scaled = scaler.fit_transform(X_train_flat).reshape(n_samples, n_timesteps, n_features)
X_val_scaled = scaler.transform(X_val_flat).reshape(X_val.shape[0], n_timesteps, n_features)
X_test_scaled = scaler.transform(X_test_flat).reshape(X_test.shape[0], n_timesteps, n_features)

In [None]:
print(X_test_scaled)

In [14]:
# ENCODE LABELS
le = LabelEncoder()
y_train_enc = le.fit_transform(y_train)
y_val_enc = le.transform(y_val)
y_test_enc = le.transform(y_test)

# Convert labels to one-hot encoding
y_train_cat = to_categorical(y_train_enc)
y_val_cat = to_categorical(y_val_enc)
y_test_cat = to_categorical(y_test_enc)

num_classes = y_train_cat.shape[1]  # Number of unique classes

In [None]:
print(y_test_cat)

In [None]:
# # build model
# # Build the LSTM model
# model = Sequential([
#     LSTM(128, input_shape=(X_train_scaled.shape[1], X_train_scaled.shape[2]), return_sequences=False),
#     Dropout(0.5),
#     Dense(64, activation='relu'),
#     Dense(num_classes, activation='softmax')
# ])

In [None]:
# model = Sequential([
#     LSTM(128, input_shape=(X_train_scaled.shape[1], X_train_scaled.shape[2]), return_sequences=True),
#     Dropout(0.3),
#     LSTM(64, return_sequences=False), # Returns only the last output
#     Dropout(0.5),
#     Dense(64, activation='relu'),
#     Dense(num_classes, activation='softmax')
# ])

In [None]:
model = Sequential([
    Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(X_train_scaled.shape[1], X_train_scaled.shape[2])),
    Conv1D(filters=64, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    Dropout(0.3),
    LSTM(128, return_sequences=False),
    Dropout(0.5),
    Dense(64, activation='relu'),
    Dense(num_classes, activation='softmax')
])

In [24]:
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True)

In [25]:
# compile model
model.compile(
    loss='categorical_crossentropy',
    optimizer='adam',
    metrics=['accuracy']
)

In [None]:
# train model
history = model.fit(
    X_train_scaled, y_train_cat,
    validation_data=(X_val_scaled, y_val_cat),
    epochs=100,
    batch_size=64,
    verbose=1,
    callbacks=[early_stopping]
    # class_weight=weights
)

In [None]:
# evaluate
test_loss, test_acc = model.evaluate(X_test_scaled, y_test_cat, verbose=0)
print(f"Test accuracy: {test_acc:.4f} | Test loss: {test_loss:.4f}")

In [None]:
y_pred = model.predict(X_test_scaled)
y_pred_labels = le.inverse_transform(np.argmax(y_pred, axis=1))
y_true_labels = le.inverse_transform(np.argmax(y_test_cat, axis=1))

print(classification_report(y_true_labels, y_pred_labels))