<a href="https://colab.research.google.com/github/cbalkig/Anomaly_Detection_in_Videos/blob/master/train_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from os import listdir
from os.path import join, isdir
import numpy as np
from PIL import Image
import keras
from keras.layers import Conv2DTranspose, ConvLSTM2D, TimeDistributed, Conv2D
from keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv2D, Conv3D, BatchNormalization
import matplotlib.pyplot as plt
import time
from numpy import save, load

In [None]:
working_directory = '/content/drive/MyDrive/AnomalyDetectionInVideos'

In [None]:
class Config:
    DATASET_PATH = os.path.join(working_directory, "files")
    BATCH_SIZE = 4
    EPOCHS = 5
    MODEL_PATH = os.path.join(working_directory, "model.hdf5")
    FRAME_SIZE = 159
    SAMPLES_COUNT = 36
    FILTERS_COUNT = 5
    IMAGE_SIZE = 128

In [None]:
def get_samples_data():
    threshold = 1000
    sample_count = 0
    # Calculate the min number of frames in a video
    for f in sorted(listdir(Config.DATASET_PATH)):
        directory_path = join(Config.DATASET_PATH, f)
        if isdir(directory_path):
            sample_count = sample_count + 1
            count = len(listdir(directory_path))
            print(f, count)
            if count < threshold:
              threshold = count
            for c in listdir(directory_path):
              if len(c) < 7:
                os.rename(join(directory_path, c), join(directory_path, c.zfill(7)))
    print("Min folder threshold will be", threshold)
    print("Number of samples", sample_count)
    Config.FRAME_SIZE = threshold
    Config.SAMPLES_COUNT = sample_count

In [None]:
#get_samples_data()

In [None]:
def get_training_set():
    # Process all videos and generate videos np array with shape: (Config.SAMPLES_COUNT, Config.FRAME_SIZE, Config.IMAGE_SIZE, Config.IMAGE_SIZE)
    videos = np.zeros((Config.SAMPLES_COUNT, Config.FRAME_SIZE, Config.IMAGE_SIZE, Config.IMAGE_SIZE, 1))
    sample_count = 0
    for f in sorted(listdir(Config.DATASET_PATH)):
        directory_path = join(Config.DATASET_PATH, f)
        if isdir(directory_path):
            print("Processing", directory_path)
            count = 0
            for c in sorted(listdir(directory_path)):
                #print("Frame #", count)
                img_path = join(directory_path, c)
                if str(img_path)[-3:] == "tif":
                    img = Image.open(img_path).resize((Config.IMAGE_SIZE, Config.IMAGE_SIZE))
                    img = np.array(img, dtype=np.float32) / 256.0
                    img = np.reshape(img, img.shape + (1,))
                    videos[sample_count][count] = img
                count = count + 1
                if count == Config.FRAME_SIZE:
                    break
            sample_count = sample_count + 1
    return videos

In [None]:
#training_set = get_training_set()
#training_set = np.array(training_set)
#save(join(Config.DATASET_PATH, 'train_data_' + str(Config.IMAGE_SIZE) + '.npy'), training_set)
#print("Shape of training set", training_set.shape)

In [None]:
training_set = load(join(Config.DATASET_PATH, 'train_data_' + str(Config.IMAGE_SIZE) + '.npy'))

In [None]:
def train_model(reload_model=True):
    if not reload_model:
        return load_model(Config.MODEL_PATH,custom_objects={'BatchNormalization': BatchNormalization})
    model = Sequential()
    model.add(ConvLSTM2D(filters=Config.FILTERS_COUNT, kernel_size=(3, 3), input_shape=(Config.FRAME_SIZE, Config.IMAGE_SIZE, Config.IMAGE_SIZE, 1), padding="same", return_sequences=True))
    model.add(BatchNormalization())
    model.add(ConvLSTM2D(filters=Config.FILTERS_COUNT, kernel_size=(3, 3), padding="same", return_sequences=True))
    model.add(BatchNormalization())
    model.add(ConvLSTM2D(filters=Config.FILTERS_COUNT, kernel_size=(3, 3), padding="same", return_sequences=True))
    model.add(BatchNormalization())
    model.add(Conv3D(filters=1, kernel_size=(3, 3, 3), activation="sigmoid", padding="same", data_format="channels_last"))
    model.compile(loss="binary_crossentropy", optimizer="adadelta", metrics=["accuracy"])
    model.summary()
    model.fit(training_set, training_set,
            batch_size=Config.BATCH_SIZE, epochs=Config.EPOCHS, shuffle=False, verbose=1, 
            callbacks=[keras.callbacks.TensorBoard(log_dir="logs/final", histogram_freq=1, write_graph=True, write_images=True)])
    return model

In [None]:
model = train_model()
print("Model is ready.")
model.save(Config.MODEL_PATH)
print("Model saved : ", Config.MODEL_PATH)