In [1]:
import os
import cv2
import math
import random
import numpy as np
import datetime as dt
import tensorflow as tf
from collections import deque
import matplotlib.pyplot as plt
%matplotlib inline
 
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import * 
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import plot_model
from keras.applications import ResNet50
from keras.models import Model
from keras.optimizers import SGD
from keras.preprocessing.image import ImageDataGenerator

In [2]:
seed_constant = 27
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

In [3]:
SEQUENCE_LENGTH = 20
epochs = 25
batch_size = 64
# image_height, image_width = 256, 256
# image_height, image_width = 128, 128
image_height, image_width = 64, 64

dataset_directory = 'datasets/activity_dataset_v3'
classes_list = ["normal", "suspicious"]


In [4]:
def frames_extraction(video_path):
    # Empty List declared to store video frames
    frames_list = []
     
    # Reading the Video File Using the VideoCapture
    video_reader = cv2.VideoCapture(video_path)
    
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    
    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1) 
 
    # Iterating through Video Frames
    for fram_counter in range(SEQUENCE_LENGTH):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES , fram_counter * skip_frames_window)
        
        success, frame = video_reader.read() 
        
        if not success:
            break
 
        resized_frame = cv2.resize(frame, (image_height, image_width))
         
        normalized_frame = resized_frame / 255
         
        frames_list.append(normalized_frame)
      
    video_reader.release()
 
    return frames_list

In [5]:
def create_dataset(train_test_path):
    '''
    It will extract the data of selected classes and create a dataset:-
    
    features:         list of extracted frames of videos
    labels:           list of classes of those videos
    video_file_path:  A list containing paths of video files in disl
    '''

    features = []
    labels = []
    video_file_paths = []
     
    # Iterating through all the classes mentioned in the classes list
    for class_index, class_name in enumerate(classes_list):
        print(f'Extracting Data of Class: {class_name}')
         
        files_list = os.listdir(os.path.join(dataset_directory + train_test_path, class_name))
 
        for file_name in files_list:
 
            video_file_path = os.path.join(dataset_directory + train_test_path, class_name, file_name)
 
            frames = frames_extraction(video_file_path)
            
            if len(frames) == SEQUENCE_LENGTH:
                features.append(frames)
                labels.append(class_index)
                video_file_paths.append(video_file_path)
 
    # Converting the features and labels lists to numpy arrays
    features = np.asarray(features)
    labels = np.array(labels)  
 
    return features, labels, video_file_paths

In [6]:
features_train, label_train, video_file_paths = create_dataset('/training_set')

Extracting Data of Class: normal
Extracting Data of Class: suspicious


In [7]:
features_test, label_test, video_file_paths = create_dataset('/testing_set')

Extracting Data of Class: normal
Extracting Data of Class: suspicious


In [8]:
hot_encoded_labels_train = to_categorical(label_train)
hot_encoded_labels_test = to_categorical(label_test)

In [9]:
hot_encoded_labels_train[:5]

array([[1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.]], dtype=float32)

In [10]:
features_train.shape

(235, 20, 64, 64, 3)

In [11]:
# features_train, features_test, labels_train, labels_test = train_test_split(features, hot_encoded_labels,test_size=0.25, shuffle=True, random_state= seed_constant)

In [12]:
# test_set = np.concatenate((features_test, hot_encoded_labels_test), axis=1)

In [16]:
def create_model(): # LNRC Model
    model = Sequential()
    
    # Layer: 1
    model.add(TimeDistributed(Conv2D(filters = 16, kernel_size = 3,padding='same', activation = 'relu'), input_shape = (SEQUENCE_LENGTH,image_height, image_width, 3)))
    model.add(TimeDistributed(MaxPool2D(4,4)))
    model.add(TimeDistributed(Dropout(0.25)))
    
#     # Layer: 2
#     model.add(TimeDistributed(Conv2D(filters= 16, kernel_size=3, padding='same', activation='relu')))
#     model.add(TimeDistributed(MaxPool2D(4,4)))
#     model.add(TimeDistributed(Dropout(0.25)))
    
    # Layer: 3
    model.add(TimeDistributed(Conv2D(filters= 32, kernel_size=3,padding='same', activation='relu')))
    model.add(TimeDistributed(MaxPool2D(2,2)))
    model.add(TimeDistributed(Dropout(0.25)))
    
    
    # Layer: 3
    model.add(TimeDistributed(Conv2D(filters= 64, kernel_size=3,padding='same', activation='relu')))
    model.add(TimeDistributed(MaxPool2D(2,2)))
    model.add(TimeDistributed(Dropout(0.25)))
    
    # Layer: 4
    model.add(TimeDistributed(Conv2D(filters= 128, kernel_size=3, padding='same',activation='relu')))
    model.add(TimeDistributed(MaxPool2D(2,2)))
    model.add(TimeDistributed(Dropout(0.25)))
    
    # Flatten Layer
    model.add(TimeDistributed(Flatten()))
    
    # Connecting to LSTM
    model.add(LSTM(32))
    
    # Final Classification Layer
    model.add(Dense(len(classes_list), activation='softmax'))
    
    model.summary()
    return model

In [14]:
def create_cnn_lstm():
    model = Sequential()
    
    # Layer - 1
    model.add(ConvLSTM2D(filters = 16, kernel_size = 3, activation = 'relu' ,data_format='channels_last', recurrent_dropout=0.2, return_sequences=True,  input_shape = (SEQUENCE_LENGTH,image_height, image_width, 3)))
    model.add(MaxPooling3D(pool_size=(1,2,2), padding='same', data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))
    
    # Layer - 1
    model.add(ConvLSTM2D(filters = 32, kernel_size = 3, activation = 'relu' ,data_format='channels_last', recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1,2,2), padding='same', data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))
    
    # Layer - 1
    model.add(ConvLSTM2D(filters = 64, kernel_size = 3, activation = 'relu' ,data_format='channels_last', recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1,2,2), padding='same', data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))
        
    # Layer - 1
    model.add(ConvLSTM2D(filters = 128, kernel_size = 3, activation = 'relu' ,data_format='channels_last', recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1,2,2), padding='same', data_format='channels_last'))
    
    # Flatten
    model.add(Flatten())
    
    # Final Layer
    model.add(Dense(len(classes_list) , activation='softmax'))
    
    model.summary()
    return model

In [17]:
model = create_model()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 time_distributed_14 (TimeDi  (None, 20, 64, 64, 16)   448       
 stributed)                                                      
                                                                 
 time_distributed_15 (TimeDi  (None, 20, 16, 16, 16)   0         
 stributed)                                                      
                                                                 
 time_distributed_16 (TimeDi  (None, 20, 16, 16, 16)   0         
 stributed)                                                      
                                                                 
 time_distributed_17 (TimeDi  (None, 20, 16, 16, 32)   4640      
 stributed)                                                      
                                                                 
 time_distributed_18 (TimeDi  (None, 20, 8, 8, 32)    

In [18]:
# model plot
# plot_model(model, to_file='cnn_lstm_model.png', show_shapes=True, show_layer_names=True, show_layer_activations=True)

In [19]:
# Adding Early Stopping Callback
early_stopping_callback = EarlyStopping(monitor = 'val_loss', patience = 5, mode = 'min', restore_best_weights = True)

# Adding loss, optimizer and metrics values to the model.
model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ["accuracy"])
 
# Start Training
model_training_history = model.fit(x = features_train, y = hot_encoded_labels_train, epochs = epochs,validation_split = 0.2, batch_size = batch_size, shuffle = True, callbacks = [early_stopping_callback])


Epoch 1/25
Epoch 2/25
Epoch 3/25
Epoch 4/25
Epoch 5/25
Epoch 6/25
Epoch 7/25
Epoch 8/25
Epoch 9/25
Epoch 10/25
Epoch 11/25
Epoch 12/25
Epoch 13/25
Epoch 14/25
Epoch 15/25
Epoch 16/25
Epoch 17/25
Epoch 18/25
Epoch 19/25
Epoch 20/25
Epoch 21/25
Epoch 22/25
Epoch 23/25
Epoch 24/25
Epoch 25/25


In [20]:
model_evaluation_history = model.evaluate(features_test, hot_encoded_labels_test)



In [21]:
model.save('models/cnn_lstm_64_v3.h5')

In [22]:
def plot_metric(metric_name_1, metric_name_2, plot_name):
  # Get Metric values using metric names as identifiers
  metric_value_1 = model_training_history.history[metric_name_1]
  metric_value_2 = model_training_history.history[metric_name_2]
 
  # Constructing a range object which will be used as time 
  epochs = range(len(metric_value_1))
   
  # Plotting the Graph
  plt.plot(epochs, metric_value_1, 'blue', label = metric_name_1)
  plt.plot(epochs, metric_value_2, 'red', label = metric_name_2)
   
  # Adding title to the plot
  plt.title(str(plot_name))
 
  # Adding legend to the plot
  plt.legend()

In [23]:
# loss graph
plot_metric('loss', 'val_loss', 'Total Loss vs Total Validation Loss')

KeyError: 'val_loss'

In [24]:
# accuracy graph
plot_metric('accuracy', 'val_accuracy', 'Total Accuracy vs Total Validation Accuracy')

KeyError: 'val_accuracy'

In [24]:
model = load_model('models/LNRC_MODEL_128_v1.h5')

In [25]:
def predict_on_video(video_file_path, output_file_path, SEQUENCE_LENGTH):
     
    video_reader = cv2.VideoCapture(video_file_path)
    
    orignal_video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    orignal_video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    video_writer = cv2.VideoWriter(output_file_path, cv2.VideoWriter_fourcc('M', 'P', '4', 'V'),
                                 video_reader.get(cv2.CAP_PROP_FPS), (orignal_video_width, orignal_video_height))
    
    frames_queue = deque(maxlen = SEQUENCE_LENGTH)
    predictions= []
    
    predicted_class_name = ''
    
    while video_reader.isOpened():
        ok , frame = video_reader.read()
        cv2.imshow('fram',frame)
        
        if not ok:
            break
            
        if cv2.waitKey(1) & 0xFF == ord('q'): # taking keyboard input 'q' to brake it
            cv2.destroyAllWindows()
            break
        
        resized_frame = cv2.resize(frame, (image_height, image_width))
        normalized_frame = resized_frame / 255
        
        frames_queue.append(normalized_frame)
        
        if len(frames_queue) == SEQUENCE_LENGTH:
            predicted_label_probabilities = model.predict(np.expand_dims(frames_queue, axis=0))[0]
            predicted_label = np.argmax(predicted_label_probabilities)
            
            predictions.append(predicted_label)
            predicted_class_name = classes_list[predicted_label]
        
            print(predicted_class_name)
            
            if(predicted_class_name == 'normal'):
                cv2.putText(frame, f'Activity: {predicted_class_name}', (0, 20), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
            elif(predicted_class_name == 'suspicious'):
                cv2.putText(frame, f'Activity: {predicted_class_name}', (0, 20), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
            
            
        
        video_writer.write(frame)
        
        
        
    video_reader.release()
    video_writer.release()

In [None]:
predict_on_video('testing_data/fight.mp4', 'testing_data/result.mp4', SEQUENCE_LENGTH)