<a href="https://colab.research.google.com/github/hiu04/NLP-Deep-Learning/blob/main/A3_Anomaly_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Extract the frames from video and store as JPEG images

In [None]:
import os
import cv2
def convert_video_to_images(img_folder, filename='assignment3_video.avi'):
    """
    Converts the video file (assignment3_video.avi) to JPEG images.
    Once the video has been converted to images, then this function doesn't
    need to be run again.

    Arguments
    ---------
    filename : (string) file name (absolute or relative path) of video file.
    img_folder : (string) folder where the video frames will be
                 stored as JPEG images.
    """
    # Make the img_folder if it doesn't exist.
    try:
        if not os.path.exists(img_folder):
            os.makedirs(img_folder)
    except OSError:
        print('Error creating the image folder')

    # Make sure that the absence/presence of path
    # separator doesn't throw an error.
    img_folder = f'{img_folder.rstrip(os.path.sep)}{os.path.sep}'

    # Instantiate the video object.
    video = cv2.VideoCapture(filename)

    # Check if the video is opened successfully
    if not video.isOpened():
        print("Error opening video file")
        return

    i = 0
    while video.isOpened():
        ret, frame = video.read()
        if ret:
            im_fname = f'{img_folder}frame{i:0>4}.jpg'
            print('Captured...', im_fname)
            cv2.imwrite(im_fname, frame)
            i += 1
        else:
            break

    video.release()
    cv2.destroyAllWindows()

    if i:
        print(f'Video converted\n{i} images written to {img_folder}')



In [None]:
# Create a img_folder to store the extracted images
convert_video_to_images("/content/img_folder",
                        filename = "/content/assignment3_video.avi")

## Load the extracted image files

In [None]:
from PIL import Image
from glob import glob
import numpy as np
import os

def load_images(img_dir, im_width=60, im_height=44):
    """
    Reads, resizes, and normalizes the extracted image frames from a folder.
    The images are returned both as a Numpy array of flattened images
    (i.e., the images with the 3-d shape (im_width, im_height, num_channels)
    are reshaped into the 1-d shape (im_width x im_height x num_channels))
    and a list with the images with their original number of dimensions
    suitable for display.

    Arguments
    ---------
    img_dir : (string) the directory where the images are stored.
    im_width : (int) The desired width of the image.
              The default value works well.
    im_height : (int) The desired height of the image.
               The default value works well.

    Returns
    -------
    X : (numpy.array) An array of the flattened images.
    images : (list) A list of the resized images.
    """
    images = []
    fnames = glob(f'{img_dir}{os.path.sep}frame*.jpg')
    fnames.sort()

    for fname in fnames:
        im = Image.open(fname)
        # Resize the image to im_width and im_height.
        im_array = np.array(im.resize((im_width, im_height)))
        # Convert uint8 to decimal and normalize to 0 - 1.
        images.append(im_array.astype(np.float32) / 255.)
        # Close the PIL image once converted and stored.
        im.close()

    # Flatten the images to a single vector
    X = np.array(images).reshape(-1, np.prod(images[0].shape))
    return X, images


In [None]:
X, images = load_images("/content/img_folder", im_width = 60, im_height = 44)

## Train the model

In [None]:
import keras
from keras import layers

In [None]:
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D
from keras.models import Model

# Image dimensions
image_shape = (44, 60, 3)  # height, width, channels
input_img = Input(shape = image_shape)

# Encoder
x = Conv2D(64, (3, 3), activation = 'relu', padding = 'same')(input_img)
x = MaxPooling2D((2, 2), padding = 'same')(x)
x = Conv2D(32, (3, 3), activation = 'relu', padding = 'same')(x)
encoded = MaxPooling2D((2, 2), padding = 'same')(x)

# Decoder
x = Conv2D(32, (3, 3), activation = 'relu', padding = 'same')(encoded)
x = UpSampling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation = 'relu', padding = 'same')(x)
x = UpSampling2D((2, 2))(x)
decoded = Conv2D(3, (3, 3), activation = 'sigmoid', padding = 'same')(x)

autoencoder = Model(input_img, decoded)
autoencoder.compile(optimizer = 'adam', loss = 'binary_crossentropy')

In [None]:
from sklearn.model_selection import train_test_split

x_train, x_test = train_test_split(X, test_size = 0.2, random_state = 42)

In [None]:
import numpy as np
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = np.reshape(x_train, (len(x_train), 44, 60, 3))
x_test = np.reshape(x_test, (len(x_test), 44, 60, 3))

In [None]:
from keras.callbacks import TensorBoard

autoencoder.fit(x_train, x_train,
                epochs = 50,
                batch_size = 128,
                shuffle = True,
                validation_data = (x_test, x_test),
                callbacks = [TensorBoard(log_dir = '/tmp/autoencoder')]
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x7da1a86c9f90>

In [None]:
import tensorflow as tf

In [None]:
# # Calculating Reconstruction Loss
# reconstructed_images = autoencoder.predict(x_train)
# reconstruction_loss = np.mean(np.abs(x_train - reconstructed_images),
                              # axis = (1, 2, 3))

# # Determine a threshold for anomaly detection
# threshold = np.percentile(reconstruction_loss, 95)



In [None]:
# threshold

0.0008118338766507805

In [None]:
# threshold = np.mean(reconstruction_loss) + np.std(reconstruction_loss)
# print("Threshold: ", threshold)

Threshold:  0.0008060304


## Predict using the autoencoder

In [None]:
from keras.preprocessing.image import load_img, img_to_array



def predict(frame):
    # Load and preprocess the image
    img = load_img(frame, target_size = (44, 60))
    img = img_to_array(img)
    img = img / 255.0
    frame_processed = img.reshape(1, 44, 60, 3)  # Reshape for the model

    # Predict using the autoencoder
    reconstructed_frame = autoencoder.predict(frame_processed)
    loss = np.mean(np.abs(frame_processed - reconstructed_frame))

    return loss > 0.43


## Display result

In [None]:
frame = "/content/img_folder/frame0111.jpg"
predict(frame)



False

In [None]:
import os

# Define the directory containing the image frames
img_dir = "/content/img_folder"

# List all files in the directory
img_files = os.listdir(img_dir)

# Initialize a list to store the file names of non-anomalous images
anomalous_files = []

# Iterate through the image files
for img_file in img_files:
    # Check if the file is a JPG image
    if img_file.endswith(".jpg"):
        # Construct the full path to the image
        img_path = os.path.join(img_dir, img_file)

        # Call the predict function for this image
        anomaly = predict(img_path)

        # Check if the frame is not anomalous (anomaly is False)
        if anomaly:
            anomalous_files.append(img_file)

# # Print the file names of non-anomalous images
# print("Anomalous Images:")
# for file_name in anomalous_files:
#     print(file_name)
len(anomalous_files)



393

## Save Model

In [None]:
# Save the trained autoencoder model to a file
autoencoder.save("/content/model")

In [None]:
# from google.colab import files
# import shutil

# # Create a zip archive of the model folder
# shutil.make_archive("/content/model_archive", 'zip', "/content/model")

# # Download the created zip file
# files.download("/content/model_archive.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>