In [2]:
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.regularizers import l2
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import keras_tuner as kt

# set random seed for reproducibility
np.random.seed(69)
tf.random.set_seed(69)

In [3]:
## Load and Process Data ##
# function to load and process data (removes first 1/4)
def load_augmented_sequences(folders):
    """ 
    load csvs as sequences, remove first 1/4 of frames, 
    split the remaining into 3 smaller examples, and remove examples where >50% of columns are all zeros 
    """
    data, labels = [], []

    for folder, label in folders:
        if os.path.exists(folder):
            for file in os.listdir(folder):
                if file.endswith('.csv'):
                    file_path = os.path.join(folder, file)
                    df = pd.read_csv(file_path)

                    # remove 'frame' column and keep only movement data
                    features = df.iloc[:, 2:].values  

                    # remove first 1/4 of the sequence
                    num_rows = len(features)
                    start_idx = num_rows // 4  # remove first 25% of rows
                    features = features[start_idx:]

                    # split remaining data into 3 equal parts
                    num_splits = 3
                    split_size = len(features) // num_splits

                    for i in range(num_splits):
                        sub_features = features[i * split_size: (i + 1) * split_size]

                        # check if more than 50% of the columns are completely zero
                        zero_columns = np.sum(sub_features == 0, axis=0) == sub_features.shape[0]
                        if np.mean(zero_columns) > 0.5:
                            continue  # skip this example

                        data.append(sub_features)
                        labels.append(label)
        else:
            print(f"warning folder {folder} not found")

    return np.array(data), np.array(labels)

# define training folders
train_folders = [
    ('../rat_dance_csv/train', 1),
    ('../neg_control_csv/train', 0)
]

# load dataset
X, y = load_augmented_sequences(train_folders)

# normalize features
scaler = StandardScaler()
X = np.array([scaler.fit_transform(sample) for sample in X])  # normalize each sequence separately

# shuffle dataset
indices = np.random.permutation(len(X))
X, y = X[indices], y[indices]

# reshape X for RNN (samples, timesteps, features)
timesteps, features = X.shape[1], X.shape[2]
X = X.reshape(len(X), timesteps, features)

In [4]:
## Define Optimized RNN ##
# function to build an optimized rnn model
def build_model(hp):
    model = keras.Sequential([
        # bidirectional lstm for better sequence learning
        keras.layers.Bidirectional(keras.layers.LSTM(
            units=hp.Int("units", min_value=32, max_value=96, step=16),
            return_sequences=False,
            recurrent_dropout=hp.Float("recurrent_dropout", 0.1, 0.4, step=0.1),
            input_shape=(timesteps, features),
        )),
        keras.layers.Dropout(hp.Float("dropout", 0.3, 0.5, step=0.1)),

        # fully connected dense layer
        keras.layers.Dense(
            hp.Int("dense_units", 16, 64, step=16), activation="relu", kernel_regularizer=l2(0.02)
        ),
        keras.layers.BatchNormalization(),
        keras.layers.Dropout(hp.Float("dense_dropout", 0.3, 0.5, step=0.1)),

        # output layer
        keras.layers.Dense(1, activation="sigmoid"),
    ])

    # compile model with fixed learning rate
    model.compile(
        optimizer=keras.optimizers.Adam(
            learning_rate=hp.Choice("learning_rate", [0.0005, 0.0001, 0.00005])
        ),
        loss="binary_crossentropy",
        metrics=["accuracy"],
    )

    return model

In [5]:
## Run Hyperparam Tuning
# use keras tuner to search for best hyperparameters
tuner = kt.tuners.RandomSearch(
    hypermodel=build_model,
    objective="val_loss", 
    max_trials=10, 
    executions_per_trial=2,
    directory="delete_me_post_search",
    project_name="lstm_tuning",
)

# split data for tuning
kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=69)
train_index, val_index = next(kf.split(X, y)) 
X_train, X_val = X[train_index], X[val_index]
y_train, y_val = y[train_index], y[val_index]

# search for best hyperparameters
tuner.search(X_train, y_train, epochs=10, batch_size=64, validation_data=(X_val, y_val), verbose=1)

# get best hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# print best values
print(f"Best LSTM units: {best_hps.get('units')}")
print(f"Best dropout: {best_hps.get('dropout')}")
print(f"Best recurrent dropout: {best_hps.get('recurrent_dropout')}")
print(f"Best dense units: {best_hps.get('dense_units')}")
print(f"Best learning rate: {best_hps.get('learning_rate')}")

Trial 10 Complete [00h 00m 10s]
val_loss: 1.8584233522415161

Best val_loss So Far: 1.2285157442092896
Total elapsed time: 00h 01m 36s
Best LSTM units: 64
Best dropout: 0.3
Best recurrent dropout: 0.2
Best dense units: 16
Best learning rate: 0.0005


In [6]:
## Train and Validate Final Model
# create final model using best hyperparameters
final_model = tuner.hypermodel.build(best_hps)

# define early stopping
early_stopping = keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=5, restore_best_weights=True
)

# define reduce learning rate callback
reduce_lr = keras.callbacks.ReduceLROnPlateau(
    monitor="val_loss", factor=0.5, patience=3, min_lr=1e-6
)

# train final model on full dataset
kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=69)
cv_accuracies = []

for train_index, val_index in kf.split(X, y):
    X_train, X_val = X[train_index], X[val_index]
    y_train, y_val = y[train_index], y[val_index]

    final_model.fit(X_train, y_train, epochs=15, batch_size=64, verbose=0, validation_data=(X_val, y_val), callbacks=[early_stopping, reduce_lr])

    # evaluate the model
    val_loss, val_acc = final_model.evaluate(X_val, y_val, verbose=0)
    cv_accuracies.append(val_acc)

# print cross validation accuracy
cross_val_acc = np.mean(cv_accuracies)
print(f"final cross validation accuracy: {cross_val_acc:.4f}")


final cross validation accuracy: 0.7700


In [7]:
## Validation set for further Tuning
# load validation dataset
val_folders = [
    ('../rat_dance_csv/val', 1),
    ('../neg_control_csv/val', 0)
]

X_val, y_val = load_augmented_sequences(val_folders)
X_val = np.array([scaler.transform(sample) for sample in X_val])  
X_val = X_val.reshape(len(X_val), timesteps, features)  

# evaluate final model on validation set
y_pred_prob = final_model.predict(X_val)
y_pred = (y_pred_prob > 0.5).astype(int)  

accuracy = accuracy_score(y_val, y_pred)
class_report = classification_report(y_val, y_pred, target_names=["negative control (0)", "ratdance (1)"])
conf_matrix = confusion_matrix(y_val, y_pred)

# show results
print(f"\nValidation Performance:")
print(f"Accuracy: {accuracy:.4f}")
print("\nClassification Report:")
print(class_report)
print("\nConfusion Matrix:")
print(conf_matrix) 


[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 245ms/step

Validation Performance:
Accuracy: 0.5641

Classification Report:
                      precision    recall  f1-score   support

negative control (0)       0.58      0.71      0.64        21
        ratdance (1)       0.54      0.39      0.45        18

            accuracy                           0.56        39
           macro avg       0.56      0.55      0.54        39
        weighted avg       0.56      0.56      0.55        39


Confusion Matrix:
[[15  6]
 [11  7]]


In [8]:
## Final Test Set (No more changes can be made after running this block)
# load test dataset
test_folders = [
    ('../rat_dance_csv/test', 1),
    ('../neg_control_csv/test', 0)
]

X_test, y_test = load_augmented_sequences(test_folders)
X_test = np.array([scaler.transform(sample) for sample in X_test])  
X_test = X_test.reshape(len(X_test), timesteps, features)  

# evaluate final model on test set
y_test_pred_prob = final_model.predict(X_test)
y_test_pred = (y_test_pred_prob > 0.5).astype(int)  

# compute evaluation metrics
test_accuracy = accuracy_score(y_test, y_test_pred)
test_class_report = classification_report(y_test, y_test_pred, target_names=["negative control (0)", "ratdance (1)"])
test_conf_matrix = confusion_matrix(y_test, y_test_pred)

# display results
print(f"\nTest Set Performance:")
print(f"Accuracy: {test_accuracy:.4f}")
print("\nClassification Report:")
print(test_class_report)
print("\nConfusion Matrix:")
print(test_conf_matrix)


[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step

Test Set Performance:
Accuracy: 0.6216

Classification Report:
                      precision    recall  f1-score   support

negative control (0)       0.61      0.85      0.71        20
        ratdance (1)       0.67      0.35      0.46        17

            accuracy                           0.62        37
           macro avg       0.64      0.60      0.58        37
        weighted avg       0.63      0.62      0.59        37


Confusion Matrix:
[[17  3]
 [11  6]]


In [None]:
import joblib

final_model.save('rnn_quarter_model.keras')
joblib.dump(scaler, 'rnn_quarter_scaler.pkl')

['rnn_quater_scaler.pkl']

In [12]:
final_model.summary()