In [1]:
#!/usr/bin/env python3
import argparse
import os
import sys

import keras
import numpy as np
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.layers import ConvLSTM2D, Dense, Dropout, Flatten, Input, MaxPooling3D, TimeDistributed
from keras.models import Model
from keras.optimizers import Adam
from keras.preprocessing.image import ImageDataGenerator, image_utils
from loguru import logger
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split

# Define default hyperparameters
SEQUENCE_LENGTH = 15
IMAGE_HEIGHT = 480 // 8
IMAGE_WIDTH = 640 // 8
BATCH_SIZE = 8
EPOCHS = 10
LEARNING_RATE = 1e-4
PATIENCE = 3
FILENAME = f'weights-{SEQUENCE_LENGTH}_{IMAGE_HEIGHT}_{IMAGE_WIDTH}.h5'
DEBUG=False

def create_model(input_shape):
    """Create a ConvLSTM model."""
    inputs = Input(shape=input_shape)
    x = ConvLSTM2D(filters=4, kernel_size=(3, 3), activation="tanh", recurrent_dropout=0.2, return_sequences=True)(inputs)
    x = Flatten()(x)
    outputs = Dense(1, activation="sigmoid")(x)
    model = Model(inputs=inputs, outputs=outputs)
    model.summary()
    return model


def load_data(images_path, seq_length, image_height, image_width):
    """Load the data and labels."""
    X, y = [], []
    images = []
    labels = []
    for file in sorted(os.listdir(images_path), key=lambda x: int(x.split(".")[0].split("-")[0])):
        if file.endswith(".jpg"):
            logger.debug(f"Loading {file}")
            image = image_utils.load_img(
                os.path.join(images_path, file),
                keep_aspect_ratio=True,
                target_size=(image_height, image_width),
                color_mode="grayscale",
            )
            image = image_utils.img_to_array(image) / 255.0
            images.append(image)
            if "sleep" in file:
                labels.append(1)
            else:
                labels.append(0)
        if len(images) == seq_length:
            X.append(np.array(images))
            y.append(labels[-1])
            images.pop(0)
            labels.pop(0)
        if len(images) > seq_length:
            images.pop(0)
            labels.pop(0)
    X = np.array(X)
    y = np.array(y)
    return X, y


# Load the data and labels
X, y = load_data("./data", SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH)

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, shuffle=True)


2023-03-16 23:56:09.829073: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-16 23:56:09.900936: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-03-16 23:56:09.900952: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-03-16 23:56:10.381924: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-

In [5]:
# Create the ConvLSTM model
input_shape = (SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 1)
model = create_model(input_shape)

# Compile the model
optimizer = Adam(learning_rate=LEARNING_RATE)
model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])

# Define the callbacks
callbacks = [
    ReduceLROnPlateau(monitor="val_loss",
                        factor=0.1,
                        patience=PATIENCE,
                        verbose=1,
                        mode="auto",
                        min_delta=0.0001),
    EarlyStopping(monitor="val_loss", patience=PATIENCE, verbose=1, mode="auto", min_delta=0.0001),]

if not os.path.exists(FILENAME):
    # Train the model
    history = model.fit(
        X_train,
        y_train,
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=(X_val, y_val),
        callbacks=callbacks,
    )

    # Save the model weights
    model.save_weights(FILENAME)
else:
    model.load_weights(FILENAME)


Model: "model_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 15, 60, 80, 1)]   0         
                                                                 
 conv_lstm2d_3 (ConvLSTM2D)  (None, 15, 58, 78, 4)     736       
                                                                 
 flatten_3 (Flatten)         (None, 271440)            0         
                                                                 
 dense_3 (Dense)             (None, 1)                 271441    
                                                                 
Total params: 272,177
Trainable params: 272,177
Non-trainable params: 0
_________________________________________________________________


In [7]:

# Evaluate the model on the test set
loss, acc = model.evaluate(X_val, y_val, verbose=0)
logger.info(f"Validation accuracy: {acc:.4f}, loss: {loss:.4f}")

In [None]:

# Make predictions on the test set
y_pred = model.predict(X_val)
y_pred_classes = np.round(y_pred)

# Evaluate the model performance
print("\nConfusion matrix:")
print(confusion_matrix(y_val, y_pred_classes))
print("\nClassification report:")
print(classification_report(y_val, y_pred_classes))

In [None]:
model_predictions = model.predict(X)
model_predictions = (model_predictions > 0.5).astype(int)

# check and show results
last_result = 0
for i in range(len(model_predictions)):
    if model_predictions[i] != y[i]:
        logger.warning(f"{i} was predicted {'sleep' if model_predictions[i] else 'not sleep'} and is actually {'sleep' if y[i] else 'not sleep'}")
    else:
        logger.info(f"{i} was predicted {'sleep' if model_predictions[i] else 'not sleep'}")