# Video Processing for BT Lab
Functions to check if run is good
<br>
<br>
![UofC logo](./pictures/uofc_logo-black.jpg)

In [82]:
#import libraries
import os
import re
import json
from pathlib import Path
import numpy as np
import pandas as pd
import traceback
from timeit import default_timer as timer
import cv2

# plot
from matplotlib import pyplot as plt
import matplotlib.patches as patches

# faster r-cnn
import torch
import torchvision.transforms as T
from torchvision.models.detection import fasterrcnn_resnet50_fpn

In [44]:
# import sk libraries
from sklearn.model_selection import train_test_split

In [45]:
# make sure to update path
user_drive = input("Enter user drive: ").upper()
video_path = f"{user_drive}:/Christian/DI_centre_structured"
input(f"Is this the right directory - {video_path}?")

''

In [46]:
VIDEO_CHARACTERISTICS = {
    "With Blankets" : "WB",
    "B" : "WB",
    "Without Blankets" : "WOB",
    "WOB": "WOB",
    "3 Meters" : "3m",
    "2 Meters" : "2m",
    "Hold Breath" : "HB",
    "Hold Breathe" : "HB",
    "H" : "HB",
    "Relaxed" : "rel",
    "R": "rel",
}

In [47]:
# local dirs
repo_dir = os.getcwd()
json_dir = repo_dir + "/records/JSON"
log_dir = repo_dir + "/records/logs"

In [48]:
# create folder if not exists otherwise, delete everything"""
def set_folder(save_folder:str) -> None:
    if not os.path.isdir(save_folder):
        os.mkdir(save_folder)
    else:
        for files in os.listdir(save_folder):
            os.remove(os.path.join(save_folder, files))

In [49]:
# converts video into frames
def run_video_to_frame(video_path:str , save_folder:str, frame_frequency: dict, new_fps:int, patient_id:str) -> list["str"]:
    # open video file
    video = cv2.VideoCapture(video_path)
    assert video.isOpened()

    # unpack metadata
    vid_fps = int(video.get(cv2.CAP_PROP_FPS))
    number_of_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    vid_length_floored = number_of_frames // vid_fps

    # local variables
    pick_counter, save_counter, set_counter, true_frames = [0, 0, 0, 0]
    list_of_frames = [frame for frame, frequency in frame_frequency.items()]
    expected_frames = new_fps*vid_length_floored
    success = True

    # make folder if it doesn't exist
    set_folder(save_folder)
    
    # start video processing
    start_time  = timer()
    while success and set_counter < vid_length_floored:
        success, frame = video.read()

        if not success:
            break

        if pick_counter in list_of_frames:
            num_of_save = frame_frequency.get(pick_counter)

            for _ in range(num_of_save):
                frame_name = f"{patient_id}_{save_counter}.png"
                frame_path = os.path.join(save_folder, frame_name)
                cv2.imwrite(frame_path, frame)
                save_counter += 1

        # update counters
        pick_counter += 1
        true_frames += 1
        if (pick_counter == vid_fps):
            set_counter += 1
            pick_counter = 0

    video.release()
    
    # get timing for one export
    end_time = timer()
    time_delta = end_time - start_time
    print(f"\nDone in {time_delta} seconds.")
    
    if save_counter != expected_frames:
        raise ValueError(f"Expected {expected_frames} frames, but got {save_counter} frames")

    return [set_counter, save_counter, true_frames]

In [50]:
# gets folder paths
def get_frames_path(local_path: str, level:str) -> list["str"]:
    fixed_path = local_path.replace("\\", "/")
    fixed_path_split = fixed_path.split("/")
    video_folder = "/".join(fixed_path_split[:-1])
    video_filename = fixed_path_split[-1].split(".")[0]
    folder_path = video_folder + f"/frames_{video_filename}_{level}"
    return [folder_path, video_folder]

In [51]:
# reads JSON file
def load_json(json_dir:str, filename:str) -> dict:
    full_path = json_dir + "/" + filename

    with open(full_path, "r") as json_data:
        data = json.load(json_data)

    return(data)

In [52]:
# re samples number of frames
def up_sample(old_fps: int, new_fps: int, start:int) -> list[int]:
    frames_arr = np.arange(start, old_fps, dtype=int)
    frames_interp = np.linspace(start, old_fps-1, new_fps)
    nearest_indices = np.round(frames_interp).astype(int)
    up_sampled_list = np.take(frames_arr, nearest_indices, mode='wrap')

    return up_sampled_list.tolist()

In [53]:
# patient_id
def get_id(patient_data: dict, video_count:int) -> str:
    alias = patient_data["alias"]
    blanket = VIDEO_CHARACTERISTICS.get(patient_data["blanket"], "?")
    distance = VIDEO_CHARACTERISTICS.get(patient_data["distance"].title(), "?")
    breathing = VIDEO_CHARACTERISTICS.get(patient_data["breathing"], "?")
    id = alias + "-" + distance + "-" + blanket + "-" + breathing
    id = f"{alias}_{video_count}-{distance}-{blanket}-{breathing}"
    return id

In [54]:
# checks if folders have the correct number ofr frames
def run_folder_check(video_path:str , save_folder:str, frame_frequency: dict, new_fps:int) -> None:
    # open video file
    video = cv2.VideoCapture(video_path)
    assert video.isOpened()

    # unpack metadata
    vid_fps = int(video.get(cv2.CAP_PROP_FPS))
    number_of_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    vid_length_floored = number_of_frames // vid_fps

    # local variables
    expected_frames = int(new_fps*vid_length_floored)
    number_of_files_in_folder = int(len(os.listdir(save_folder)))

    if not os.path.exists(save_folder):
        print(f"{save_folder} does not exists.")
        return save_folder

    video.release()

    if number_of_files_in_folder != expected_frames:
        print(f"{save_folder} - Not enough frames in folder. Only found {number_of_files_in_folder} files. Expecting {expected_frames}.")
        return save_folder

In [55]:
# driver code for checking empty folders
def check_if_empty_folders(all_patients:dict, level: str, new_fps:int, user_drive:str) -> list:
    visited_folders = {}    
    rerun_folders = []
    frames_folders = []

    for json_index, patient_info in all_patients.items():
        try:
            video_path = patient_info["local path"]
            old_fps = int(patient_info["old fps"])

            frames_folder, video_folder = get_frames_path(video_path, level)
            visited_folders[video_folder] = visited_folders.get(video_folder, 0) + 1
            frames_folders.append(frames_folder)
            video_count = visited_folders[video_folder]

            frames_to_pick = up_sample(old_fps, new_fps, 1)
            frames_idx = pd.Index(frames_to_pick, name="frames")
            frame_frequency = frames_idx.value_counts()

            if len(frames_to_pick) != new_fps:
                raise ValueError("Number of frames to pick is not equal to new fps")

            # get id
            patient_id = get_id(patient_info, video_count)

            # run video to frames
            rerun_folder = run_folder_check(video_path, frames_folder, frame_frequency, new_fps)
            
            if rerun_folder != None:
                print("Rerunning folder: ", rerun_folder)
                rerun_folders.append(rerun_folder)
                # run_video_to_frame(video_path, frames_folder, frame_frequency, new_fps, patient_id)

        except Exception as e:
            traceback.print_exc()
            print(f'''{type(e)}: {e} for video {patient_info["filename"]}''')

    return [rerun_folders, frames_folders]

In [56]:
# export to json

def export_to_json(filename: str, all_patient_info: dict) -> None:
    patient_json = json.dumps(all_patient_info, indent=2)

    with open(filename, "w") as json_data:
        json_data.write(patient_json)

# Run tests

In [57]:
""" local vars"""

rgb_fps = {
    "lower_bound": 10,
    "upper_bound": 20
}

thermal_fps = {
    "lower_bound": 5,
    "upper_bound": 10
}

In [58]:
""" load JSON files """

metadata_rgb = load_json(json_dir, "/rgb_complete.json")
metadata_thermal = load_json(json_dir, "/thermal_complete.json")

In [59]:
""" check videos (rgb) """
all_rgb_folders = []
all_rgb_rerun_folders = []

for level, new_fps in rgb_fps.items():
    print(f"\nAdjusting FPS to {new_fps}\n" + "="*50)
    rerun_folders, visited_folders = check_if_empty_folders(metadata_rgb, level, new_fps, user_drive)
    all_rgb_rerun_folders.append(rerun_folders)
    all_rgb_folders.append(visited_folders)


Adjusting FPS to 10
C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Hold Breath/frames_Arun2_lower_bound - Not enough frames in folder. Only found 17 files. Expecting 350.
Rerunning folder:  C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Hold Breath/frames_Arun2_lower_bound
C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Relaxed/frames_Arun2_lower_bound - Not enough frames in folder. Only found 17 files. Expecting 20.
Rerunning folder:  C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Relaxed/frames_Arun2_lower_bound
C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/Without Blankets/Hold Breath/frames_short_lower_bound - Not enough frames in folder. Only found 17 files. Expecting 290.
Rerunning folder:  C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/Without Blankets/Hold Breath/frames_short_lower_boun

C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Hold Breath/frames_Arun2_upper_bound - Not enough frames in folder. Only found 34 files. Expecting 700.
Rerunning folder:  C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Hold Breath/frames_Arun2_upper_bound
C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Relaxed/frames_Arun2_upper_bound - Not enough frames in folder. Only found 34 files. Expecting 40.
Rerunning folder:  C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Relaxed/frames_Arun2_upper_bound
C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/Without Blankets/Hold Breath/frames_short_upper_bound - Not enough frames in folder. Only found 34 files. Expecting 580.
Rerunning folder:  C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/Without Blankets/Hold Breath/frames_short_upper_bound
C:/Christian/DI_cen

In [60]:
""" check videos (thermal) """
# all_thermal_folders = []
# all_thermal_rerun_folders = []

# for level, new_fps in thermal_fps.items():
#     print(f"\nAdjusting FPS to {new_fps}\n" + "="*50)
#     rerun_folders, visited_folders = check_if_empty_folders(metadata_thermal, level, new_fps, user_drive)
#     all_thermal_folders.append(visited_folders)
#     all_thermal_rerun_folders.append(rerun_folders)

' check videos (thermal) '

# Crop Image

In [77]:
def detect_objects(img, model, threshold=0.5):
    # Convert the image to RGB and then to a tensor
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    transform = T.Compose([T.ToTensor()])
    img_tensor = transform(img_rgb)

    # Put the model in evaluation mode and apply to the image
    model.eval()
    with torch.no_grad():
        prediction = model([img_tensor])

    # Collect bounding boxes
    bboxes = []
    for element in range(len(prediction[0]['boxes'])):
        if prediction[0]['scores'][element] > threshold:
            box = prediction[0]['boxes'][element].cpu().numpy()
            label = prediction[0]['labels'][element].cpu().numpy()
            
            if label == 1:  # COCO Dataset label for 'person'
                bboxes.append(box)

    return bboxes

In [69]:
# Load a pre-trained Faster R-CNN model
model = fasterrcnn_resnet50_fpn(pretrained=True)



In [74]:
all_rgb_lower_bound, all_rgb_upper_bound = all_rgb_folders
all_rgb_lower_bound

['C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Hold Breath/frames_Arun2_lower_bound',
 'C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Relaxed/frames_Arun2_lower_bound',
 'C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/Without Blankets/Hold Breath/frames_short_lower_bound',
 'C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/Without Blankets/Relaxed/frames_relax 2meter short_lower_bound']

In [81]:
for rgb_folder in all_rgb_lower_bound:
    first_image = rgb_folder + "/" + os.listdir(rgb_folder)[0]
    print(first_image)

    img = cv2.imread(first_image)

    if img is None:
        print("Error: Could not read the image.")
    else:
        bounding_boxes = detect_objects(img, model)
        print("Detected bounding boxes:", bounding_boxes)
    break

C:/Christian/DI_centre_structured/DI_CAMERA_P3225/Final/Arun/2 Meters/With Blankets/Hold Breath/frames_Arun2_lower_bound/15_1-2m-WB-HB_0.png
Detected bounding boxes: [array([  82.82521,  316.22778,  785.2172 , 1029.9728 ], dtype=float32)]


# Getting labels

In [61]:
# gets labels based on breathing

def get_label_breathing(visited_folders: list[str]) -> [list, list]:
    all_frames_path = []
    all_labels = []

    hold_breath_pattern = r"/Hold Breath/|/Hold Breathe/|/H/"
    relaxed_pattern = r"/Relaxed/|/R/"

    for visited_folder in visited_folders:
        if re.search(relaxed_pattern, visited_folder):
            all_labels.append(0) # 0 for relaxed pattern
        elif re.search(hold_breath_pattern, visited_folder):
            all_labels.append(1) # 1 for hold breath pattern
        else:
            print(f"Warning: No matching breathing pattern for {visited_folder}")
            continue

        all_frames_path.append(visited_folder)

    return [all_frames_path, all_labels]

In [62]:
# get labels for rgb

# all_rgb_frames_path = []
# all_rgb_labels = []
# save_data = {}

# for fps_bound in all_rgb_folders:
#     tmp_path_list, tmp_labels_list = get_label_breathing(fps_bound)
#     all_rgb_frames_path += tmp_path_list
#     all_rgb_labels += tmp_labels_list

# save_data["frames_path"] = all_rgb_frames_path
# save_data["labels_path"] = all_rgb_labels

In [63]:
# save test train split in json data (rgb)
# save_rgb_filename = json_dir + "/training_test_split/rgb_labels.json"
# export_to_json(save_rgb_filename, save_data)

# Splitting the data into training and testing

In [64]:
# data splitting

def split_data(all_videos: list[str], all_labels: list[int]):
    train_videos, test_videos, train_labels, test_labels = train_test_split(
        all_videos, all_labels, test_size=0.2, random_state=42)
    return train_videos, test_videos, train_labels, test_labels


In [65]:
# split data (rgb)
# train_rgb_frames, test_rgb_frames, train_rgb_labels, test_rgb_labels = train_test_split(
#     all_rgb_frames_path, all_rgb_labels, test_size=0.2, random_state=42
# )