## Importing modules

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import cv2
import os
import PIL # image library in python
import tensorflow as tf
from skimage.io import imread,imshow

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential

## Importing dataset

In [None]:
# Fall - Train
fall_train = []
for dirname, _, filenames in os.walk('FALL-vs-NORMAL/FALL/train'):
    for filename in filenames:
        fall_train.append(os.path.join(dirname, filename))

fall_train_count = len(fall_train)
print(fall_train_count)
print(fall_train[0:5])

In [None]:
# Fall - Val
fall_val = []
for dirname, _, filenames in os.walk('FALL-vs-NORMAL/FALL/val'):
    for filename in filenames:
        fall_val.append(os.path.join(dirname, filename))

fall_val_count = len(fall_val)
print(fall_val_count)
print(fall_val[0:5])

In [None]:
# Normal - Train
normal_train = []
for dirname, _, filenames in os.walk('FALL-vs-NORMAL/NORMAL/train'):
    for filename in filenames:
        normal_train.append(os.path.join(dirname, filename))

normal_train_count = len(normal_train)
print(normal_train_count)
print(normal_train[0:5])

In [None]:
# Normal - Val
normal_val = []
for dirname, _, filenames in os.walk('FALL-vs-NORMAL/NORMAL/val'):
    for filename in filenames:
        normal_val.append(os.path.join(dirname, filename))

normal_val_count = len(normal_val)
print(normal_val_count)
print(normal_val[0:5])

## View data

In [None]:
PIL.Image.open(fall_train[0])

## Creating dictionaries

In [None]:
train_dict = {
    'fall' : fall_train,
    'normal' : normal_train
}

val_dict = {
    'fall' : fall_val,
    'normal' : normal_val
}

class_dict = {
    'fall' : 1,
    'normal' : 0
}

## Preparing data as arrays

In [None]:
# Training
x_train, y_train = [], []

for class_name, images in train_dict.items():
    for image in images:
      img = cv2.imread(image)
      resized_img = cv2.resize(img,(128,128))
      x_train.append(resized_img)
      y_train.append(class_dict[class_name])

In [None]:
# Validation
x_val, y_val = [], []

for class_name, images in val_dict.items():
    for image in images:
      img = cv2.imread(image)
      resized_img = cv2.resize(img,(128,128))
      x_val.append(resized_img)
      y_val.append(class_dict[class_name])

In [None]:
# converting list into numpy array for simpler further use
x_train = np.array(x_train)
y_train = np.array(y_train)

x_val = np.array(x_val)
y_val = np.array(y_val)

## Scaling

In [None]:
x_train_scaled = x_train / 255
x_val_scaled = x_val / 255

## Number of training and validation samples

In [None]:
print("Training images: ",len(x_train_scaled))
print("Training labels: ",len(y_train))
print()
print("Validation images: ",len(x_val_scaled))
print("Validation labels: ",len(y_val))
print()

## Data augmentation

In [None]:
data_augmentation = keras.Sequential([
    layers.experimental.preprocessing.RandomZoom(0.3), 
    layers.experimental.preprocessing.RandomContrast(0.3),
    layers.experimental.preprocessing.RandomRotation(0.1),
    layers.experimental.preprocessing.RandomFlip("horizontal")
])

## Creating CNN model

In [None]:
num_classes = 1

model = Sequential([
                   data_augmentation, 
                   layers.Conv2D(16, 3, padding='same', activation='relu'),
                   layers.MaxPooling2D(),
                   layers.Dropout(0.1),
                   layers.Conv2D(32, 3, padding='same', activation='relu'),
                   layers.MaxPooling2D(),
                   layers.Dropout(0.1),
                   layers.Conv2D(64, 3, padding='same', activation='relu'),
                   layers.MaxPooling2D(),
                   layers.Dropout(0.1), # going to drop 10% of neurons at random in each pass to give us better generalization
                   layers.Flatten(),
                   layers.Dense(128,activation='relu'),
                   layers.Dense(num_classes, activation='sigmoid')
                  
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=['accuracy'])

np.random.seed(42)
from keras import callbacks
earlystopping = callbacks.EarlyStopping(monitor ="val_loss", 
                                        mode ="min", patience = 3, 
                                        restore_best_weights = True)
  
history = model.fit(x_train_scaled, y_train, batch_size = 64, 
                    epochs = 100, validation_data =(x_val_scaled, y_val), 
                    callbacks =[earlystopping])

## Evaluate model

In [None]:
model.evaluate(x_val_scaled,y_val, batch_size=64)

## Saving model with best weights

In [None]:
model.save("model.h5")

## Load the saved model

In [None]:
# load model
from tensorflow import keras
load_model = keras.models.load_model('model.h5')

## Prediction on validation set

In [None]:
predictions = load_model.predict(x_val_scaled)
predictions

## Evaluation metrics - validation set

In [None]:
from matplotlib import pyplot as plt
from sklearn.metrics import confusion_matrix , classification_report
import pandas as pd

In [None]:
import seaborn as sns

# Source code credit for this function: https://gist.github.com/shaypal5/94c53d765083101efc0240d776a23823
def print_confusion_matrix(confusion_matrix, class_names, figsize = (10,7), fontsize=14):
    """Prints a confusion matrix, as returned by sklearn.metrics.confusion_matrix, as a heatmap.
    
    Arguments
    ---------
    confusion_matrix: numpy.ndarray
        The numpy.ndarray object returned from a call to sklearn.metrics.confusion_matrix. 
        Similarly constructed ndarrays can also be used.
    class_names: list
        An ordered list of class names, in the order they index the given confusion matrix.
    figsize: tuple
        A 2-long tuple, the first value determining the horizontal size of the ouputted figure,
        the second determining the vertical size. Defaults to (10,7).
    fontsize: int
        Font size for axes labels. Defaults to 14.
        
    Returns
    -------
    matplotlib.figure.Figure
        The resulting confusion matrix figure
    """
    df_cm = pd.DataFrame(
        confusion_matrix, index=class_names, columns=class_names, 
    )
    fig = plt.figure(figsize=figsize)
    try:
        heatmap = sns.heatmap(df_cm, annot=True, fmt="d")
    except ValueError:
        raise ValueError("Confusion matrix values must be integers.")
    heatmap.yaxis.set_ticklabels(heatmap.yaxis.get_ticklabels(), rotation=0, ha='right', fontsize=fontsize)
    heatmap.xaxis.set_ticklabels(heatmap.xaxis.get_ticklabels(), rotation=45, ha='right', fontsize=fontsize)
    plt.ylabel('Truth')
    plt.xlabel('Prediction')

In [None]:
truth = y_val.tolist()

In [None]:
pred_list = []

for i in range(0,len(predictions)):
    score = tf.nn.softmax(predictions[i])
    pred_list.append(np.argmax(score))

In [None]:
# evaluating the validation results
cm = confusion_matrix(truth,pred_list)
print_confusion_matrix(cm,["Not a Fall","Fall"])

## Code to detect fall or oversleep from video

In [None]:
def classify(num):
    if num == 0:
        return "no fall"
    elif num == 1:
        return "fall"
    elif num == 2:
        return "overtime sleep"
    
def demo_func(filename):
    # load model
    load_model = keras.models.load_model('model.h5')

    cap = cv2.VideoCapture(filename)

    frameCount = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fc = 0
    ret = True
    pfp = 0
    th = 30
    th_z = 700
    count = 0 
    zeros = 0
    track = 0

    n=1
    test_pred = []
    print(frameCount)
    while (fc < frameCount):

        ret, image = cap.read()
        if (fc % 30 == 0):
            rgb_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            plt.imshow(rgb_img)
            plt.axis('off')
            plt.show()
            img_scaled = rgb_img / 255
            r_img = cv2.resize(img_scaled,(128,128))
            x = np.expand_dims(r_img, axis=0)
            prediction = load_model.predict(x)
            score = tf.nn.softmax(prediction)
            cfp = np.argmax(score) #current frame prediction
            test_pred.append(cfp)
            print(n," : ",cfp)
            n+=1
            if (cfp == 1):
                if pfp == 0:
                    count = 0
                else:
                    count+=1
            elif (cfp == 0):
                if pfp == 1:
                    zeros = 0
                else:
                    zeros+=1
            if (count == th):
                #print("!!! FALL DETECTED !!!")
                output = classify(1)
                break
            if (zeros == th_z):
                #print("!!! OVERTIME SLEEP !!!")
                output = classify(2)
                break
            pfp = cfp
        fc += 1
    if(count != th and zeros != th_z):
        #print("!!! Fall NOT Detected OR Normal Sleep !!!")
        output = classify(0)
    
    

    if output == "no fall":
        return [(0,output)]
    elif output == "fall":
        return [(1,output)]
    else :
        return [(2,output)]

## User interface with gradio

In [None]:
import gradio as gr
op = gr.outputs.HighlightedText(color_map={ "no fall":"green","fall":"red","overtime sleep":"yellow" })
demo = gr.Interface(demo_func, gr.Video(), outputs = op, live =True)
demo.launch(debug=True) 