In [None]:
import torch
import numpy as np
import torch.optim as optim
from torch.utils import data
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import os
import cv2
import h5py
import re
from glob import glob
from dataset_loader import create_pytorch_dataset
import parameters

window_len = parameters.window_len
stride = parameters.stride
fair_comparison = parameters.fair_comparison
TOD = parameters.TOD
forward_chunk_size = parameters.forward_chunk_size

device = parameters.device
key_frame_extraction = parameters.key_frame_extraction
key_frame_extraction_algorithm = parameters.key_frame_extraction_algorithm
feature_extraction = parameters.feature_extraction
background_subtraction = parameters.background_subtraction
background_subtraction_algorithm = parameters.background_subtraction_algorithm
data_augmentation = parameters.data_augmentation
anomaly_detection_model = parameters.anomaly_detection_model

frame_rate_adjusted_dataset = parameters.frame_rate_adjusted_dataset
dataset_category = parameters.dataset_category
project_directory = parameters.project_directory
dataset_directory = parameters.dataset_directory
ht = parameters.ht
wd = parameters.wd

In [None]:
# To display Original Video, h5py file Video, Input Video and Reconstructed Output Video
def display_videos(name, dset, path, vid_folder, modified_video, reconstructed_video, labels):

    display_ht = 450
    display_wd = 450
    # Video can be 4fps / 8fps / 20fps. So each frame can be displayed for 250 ms / 125 ms / 50 ms
    ms_per_frame = 125  # millisecond per frame

    # Extract if fall folder or ADL folder
    dir_type = re.findall("[a-zA-Z]+", vid_folder)[0]

    # Original Video
    vid_location = "{}\Dataset\Fall-Data\{}\{}\{}\{}".format(
        dataset_directory, dataset_category, dset, dir_type, vid_folder
    )
    vid_location = glob(vid_location + "/*.jpg") + glob(vid_location + "/*.png")
    vid_location.sort(key=lambda var: [int(x) if x.isdigit() else x for x in re.findall(r"[^0-9]|[0-9]+", var)])
    original_video = []
    for filename in vid_location:
        img = cv2.imread(filename, cv2.IMREAD_ANYCOLOR)
        if img is not None:
            original_video.append(img)

    # Preprocessed Video
    with h5py.File(path, "r") as hf:
        data_dict = hf["{}/Processed/Split_by_video".format(name)]
        preprocessed_video = data_dict[vid_folder]["Data"][:]

    # Modified Video
    modified_video = modified_video

    # Output Video.
    # Original Shape - [batch_size, no.of windows, window_len, ht, wd].
    # New Shape - [no.of windows, window_len, ht, wd] (batch_size is 1)
    reconstructed_video = reconstructed_video.reshape(reconstructed_video.shape[1], window_len, ht, wd)
    output_video = []
    # Reconstructed Video. It is in window format, convert to frames
    # If length is 510, it means 510 windows are there.
    # So from the 1st to 509th window, take the first frame. For the 510th window, take all the frames. Total - 509 + 8 = 517 frames
    for i in range(len(reconstructed_video) - 1):
        output_video.append(reconstructed_video[i][0])  # First frame of each window
    # Concatenate all the frames from the final window
    output_video.extend(reconstructed_video[-1])
    # Windowing code creates 1 window less than required. So duplicate last frame
    output_video.append(output_video[-1])
    output_video = np.array(output_video)

    if background_subtraction:
        # [-1,1] Normalisation. Only apply if background_subtraction is turned on.
        output_video = 2.0 * (output_video - np.min(output_video)) / np.ptp(output_video) - 1

        # Remove first 120 elements. This is because background subtraction history is 120.
        # So those intial elements will be black
        original_video = original_video[120:]
        preprocessed_video = preprocessed_video[120:]
        modified_video = modified_video[120:]
        output_video = output_video[120:]
        labels = labels[120:]

    # print(len(original_video), len(preprocessed_video), len(modified_video), len(output_video))

    for index in range(len(original_video)):

        original_frame = cv2.resize(original_video[index], (display_ht, display_wd))
        # uint8 to float32, scale down by 255
        original_frame = (np.array(original_frame, dtype=np.float32)) / 255

        preprocessed_frame = cv2.resize(preprocessed_video[index], (display_ht, display_wd))
        # Only has height, width. So add a dimension for channel
        preprocessed_frame = np.expand_dims(preprocessed_frame, axis=-1)
        # float64 to float32
        preprocessed_frame = np.array(preprocessed_frame, dtype=np.float32)
        # Convert image from greyscale to RGB (To obtain 3 channels)
        preprocessed_frame = cv2.cvtColor(preprocessed_frame, cv2.COLOR_GRAY2RGB)

        modified_frame = cv2.resize(modified_video[index], (display_ht, display_wd))
        # Only has height, width. So add a dimension for channel
        modified_frame = np.expand_dims(modified_frame, axis=-1)
        # uint8 to float32
        modified_frame = np.array(modified_frame, dtype=np.float32)
        # Convert image from greyscale to RGB (To obtain 3 channels)
        modified_frame = cv2.cvtColor(modified_frame, cv2.COLOR_GRAY2RGB)

        output_frame = cv2.resize(output_video[index], (display_ht, display_wd))
        # Only has height, width. So add a dimension for channel
        output_frame = np.expand_dims(output_frame, axis=-1)
        # float32
        # output_frame = np.array(output_frame, dtype=np.float32)
        # Convert image from greyscale to RGB (To obtain 3 channels)
        output_frame = cv2.cvtColor(output_frame, cv2.COLOR_GRAY2RGB)

        horizontal_concatenation_1 = np.concatenate([original_frame, preprocessed_frame], axis=1)
        horizontal_concatenation_2 = np.concatenate([modified_frame, output_frame], axis=1)
        vertical_concatenation = np.concatenate([horizontal_concatenation_1, horizontal_concatenation_2], axis=0)

        cv2.imshow("Original, Preprocessed, Modified, Output", vertical_concatenation)

        if labels[index] == 1:
            print("Fall at Frame - {}".format(index))

        k = cv2.waitKey(ms_per_frame) & 0xFF
        # Exit on 'esc' key
        if k == 27:
            break

    cv2.destroyAllWindows()


def demo_pipeline_unimodality(name, dset, path, modelpath):
    (
        Test_Dataset,
        test_dataloader,
        x_data_test,
        y_data_test,
        x_info_test,
    ) = create_pytorch_dataset(name, dset, path, window_len, fair_comparison, stride, TOD)

    filepath_model = project_directory + "\Output\Models\Demo\\" + modelpath

    # Prepare GPU
    torch.cuda.empty_cache()
    # Load the model. Using Base_3DCAE
    model = parameters.models[0]().to(device)
    model.load_state_dict(torch.load(filepath_model))  # Load saved model weights
    model.eval()  # Sets the model in testing mode.

    print("Device Used - " + device)
    print("Key Frame Extraction - {}".format(key_frame_extraction))
    if key_frame_extraction:
        print("Key Frame Extraction Algorithm - {}".format(key_frame_extraction_algorithm))
    print("Feature Extraction - {}".format(feature_extraction))
    if feature_extraction:
        print("Background Subtraction - {}".format(background_subtraction))
        if background_subtraction:
            print("Background Subtraction Algorithm - {}".format(background_subtraction_algorithm))
    print("Data Augmentation - {}".format(data_augmentation))
    print("Frame rate adjusted dataset - {}".format(frame_rate_adjusted_dataset))
    print("Window Length = {}\n".format(window_len))
    print("{} Demo Test Videos - {}\n".format(dset, len(test_dataloader)))

    with torch.no_grad():
        for i, (sample, labels) in enumerate(test_dataloader):
            vid_folder = x_info_test[i]
            print((vid_folder))

            # forward pass to get output
            print("Forward Pass Initiated")

            torch.cuda.empty_cache()

            sample = sample.to(device, dtype=torch.float)
            chunks = torch.split(sample, forward_chunk_size, dim=1)
            label_chunks = torch.split(labels, forward_chunk_size, dim=1)
            recon_vid = []

            for chunk, label_chunk in zip(chunks, label_chunks):
                output = model(chunk)
                output = output.to(device).permute(1, 0, 2, 3, 4)
                recon_vid.append(output)
                torch.cuda.empty_cache()

            output = torch.cat(recon_vid, dim=1)

            # convert tensors to numpy arrays for easy manipluations
            sample = sample.data.cpu().numpy()
            output = output.data.cpu().numpy()
            labels = labels.data.cpu().numpy()

            print("Forward Pass Completed")

            # modified_video and original labels are not windowed. output is in windowed format
            modified_video = x_data_test[i]
            original_labels = y_data_test[i]

            display_videos(name, dset, path, vid_folder, modified_video, output, original_labels)


# Unimodality
list_of_files = ["Thermal", "ONI_IR", "IP"]
list_of_datasets = ["Thermal_T3", "ONI_IR_T", "IP_T"]

# These are file paths for Base_3DCAE model only. Cannot be used for other Unimodal models
if background_subtraction:
    # Default GMG with L1loss
    thermal_model = "Thermal_T3_2024-03-13-08-07-43"  # Trial-6
    # Default GMG with L1loss()
    oni_ir_model = "ONI_IR_T_2024-03-15-04-14-00"  # Trial-2
    # Default GMG with MSEloss()
    ip_model = "IP_T_2024-03-15-23-52-18"  # Trial-3
else:
    # Baseline Models
    thermal_model = "Thermal_T3_2024-03-11-21-35-40"  # Trial-2
    oni_ir_model = "ONI_IR_T_2024-02-25-14-02-21"  # Trial-1
    ip_model = "IP_T_2024-02-24-15-11-13"  # Trial-1

list_of_models = [thermal_model, oni_ir_model, ip_model]

modality_index = 0  # 0 to 2

dset = list_of_files[modality_index]
name = list_of_datasets[modality_index]
modelpath = list_of_models[modality_index]
path = "{}\Dataset\H5PY\{}_Data_set-{}-imgdim64x64.h5".format(project_directory, dataset_category, name)

demo_pipeline_unimodality(name, dset, path, modelpath)