In [1]:
from detectionmodel2 import Processor
from augmentation import SingleFootageAugmenter
from utils import get_subdirectories, get_filenames
from pathlib import Path
import cv2
import pandas as pd
import numpy as np
import random
import os

In [2]:
mode_mapping = {
    0: '',
    1: 'brightness',
    2: 'img_noise',
    3: 'img_blur',
    4: 'cam_rotation',
    5: 'brightness + img_noise',
    6: 'brightness + img_blur',
    7: 'brightness + cam_rotation',
    8: 'img_noise + img_blur',
    9: 'img_noise + cam_rotation',
    10: 'img_blur + cam_rotation',
    11: 'brightness + img_noise + img_blur',
    12: 'brightness + img_noise + cam_rotation',
    13: 'brightness + img_blur + cam_rotation',
    14: 'img_noise + img_blur + cam_rotation',
    15: 'brightness + img_noise + img_blur + cam_rotation'
    
}

In [3]:
def video_fall_detection(video_path, model, mode=0, frame_lag=False, frame_loss=False, 
                         connection_err=False, display_video=False, save_video=False, output_filename='output.avi'):
    cap = cv2.VideoCapture(str(video_path))

    if not cap.isOpened():
        print("Cannot open video")
        exit()

    # Initialize the VideoWriter object only if save_video is True
    if save_video:
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        out = cv2.VideoWriter(output_filename, fourcc, 20.0, (int(cap.get(3)), int(cap.get(4))))

    # Create an instance of Processor
    processor = Processor(model)
    
    test_mode = mode_mapping[mode]

    while True:
        success, frame = cap.read()
        if not success:
            break
        
        single_frame_augmentor = SingleFootageAugmenter(frame)
        
        if 'brightness' in test_mode:
            single_frame_augmentor.augment_brightness()
        
        if 'img_noise' in test_mode:
            single_frame_augmentor.add_image_noise()
        
        if 'img_blur' in test_mode:
            kernel_size = random.randrange(1, 16, 2)
            single_frame_augmentor.adjust_image_blur(kernel_size=(kernel_size,kernel_size))
        
        if 'cam_rotation' in test_mode:
            angle  = random.randrange(10, 360, 20)
            updated_frames = single_frame_augmentor.adjust_camera_rotation(angle=angle)
            
        frame = single_frame_augmentor.output_augmented_frame()
        # Process the frame using the Processor class
        frame = processor.process(frame)

        # Write the processed frame into the output video if save_video is True
        if save_video:
            out.write(frame)

        # Display the processed frame if display_video is True
        if display_video:
            cv2.imshow("Fall Detection", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    # Release resources depending on the options chosen
    cap.release()
    if save_video:
        out.release()
    if display_video:
        cv2.destroyAllWindows()

    # Return the statistics using the output_stats method from the Processor instance
    return processor.output_stats()


In [4]:
def get_test_results(model,video_dir,fall_patterns_dir, mode=0, frame_lag=False, frame_loss=False, 
                         connection_err=False, display_video=False, save_video=False):
    complete_test_results = []
    for fall_pattern_path in fall_patterns_dir:
        print(fall_pattern_path)
        fall_pattern_full_path = Path(video_dir,fall_pattern_path)
        cam_angles_paths = get_filenames(fall_pattern_full_path)
        for cam_angles_path in cam_angles_paths:
            print(cam_angles_path)
            file_name = os.path.basename(cam_angles_path)
            output_filename = f'Processed mode {mode} {file_name}.avi'
            cam_angle_full_path = Path(fall_pattern_full_path,cam_angles_path)
            output_filename = str(Path(video_dir,'Output',output_filename))
            test_result = video_fall_detection(cam_angle_full_path,model, mode, frame_lag, frame_loss,
                                               connection_err, display_video, save_video, output_filename)
            test_result['Angle'] = cam_angles_path[:-4]
            test_result['Fall Type'] = fall_pattern_path
            complete_test_results.append(test_result)
    df_results = pd.DataFrame(complete_test_results)
    df_results['model'] = model
    return df_results

# How to Use

In [5]:
video_dir = Path(os.getcwd(),'./Videos')

### In the List below enter the name of the subdirectories in Videos you would like to test

In [6]:
# fall_patterns_dir = ['wheelchair','baseline'] for e.g.

fall_patterns_dir = ['wheelchair']

### In the List below enter the name of the models you would like to test

In [7]:
# models = ['yolov8s-pose.pt']  for e.g.

models = ['yolov8s-pose.pt']

### Tester Parameters

In [8]:
# modes = [1,2,3,4] look at mode_mapping to choose your mode and include them in the list
# save_video = True Boolean to save video
# display_video = False Boolean to save video

modes = [1,2,3,4]
save_video = True
display_video = False

In [9]:
complete_df_results = pd.DataFrame()
backup_counter =  0
for model in models:
    for mode in modes:
        print(mode)
        print(model)
        df_result = get_test_results(model,video_dir,fall_patterns_dir,mode=mode,save_video=save_video,display_video=display_video)
        df_result.to_pickle(f'Data/df_result_{backup_counter}.pkl')
        complete_df_results = pd.concat([complete_df_results,df_result]).reset_index(drop=True)
        backup_counter+=1
complete_df_results.to_pickle(f'Data/complete_results_df.pkl')

1
yolov8s-pose.pt
wheelchair
wheelchair_calvin - 01.mp4
Wheelchair_Daniel_1.mov
2
yolov8s-pose.pt
wheelchair
wheelchair_calvin - 01.mp4
Wheelchair_Daniel_1.mov
3
yolov8s-pose.pt
wheelchair
wheelchair_calvin - 01.mp4
Wheelchair_Daniel_1.mov
4
yolov8s-pose.pt
wheelchair
wheelchair_calvin - 01.mp4
Wheelchair_Daniel_1.mov
