In [None]:
import cv2
import math
import numpy as np

DEBUG = False

In [None]:
# Returns the cv2.VideoCapture handle. Remember to release the handle once you are done.
def open_video(video_filepath):
    vid_cap = cv2.VideoCapture(video_filepath)
    
    if not vid_cap.isOpened():
        raise Exception('Error opening video {video_filepath}')
        
    return vid_cap

In [None]:
def get_event_frame_index(video_filepath, event_timestamp_millis):
    vid_cap = open_video(video_filepath)
    vid_fps = vid_cap.get(cv2.CAP_PROP_FPS)
    vid_cap.release()
    
    return math.floor(event_timestamp_millis / 1000 * vid_fps) # todo: revisit this if there is issue with indexing too early

def get_frame_indexes_surrounding_event(video_filepath, event_timestamp_millis, sequence_length, frame_step):
    vid_cap = open_video(video_filepath)
    vid_frame_count = vid_cap.get(cv2.CAP_PROP_FRAME_COUNT)
    vid_cap.release()
    
    event_frame_index = get_event_frame_index(video_filepath, event_timestamp_millis)
    
    max_frame_steps_event_to_beginning = math.floor(event_frame_index / frame_step)
    max_frame_steps_event_to_end = math.floor((vid_frame_count - 1 - event_frame_index)/ frame_step) # reason for minus 1 is this is about index and not about frame count
    max_possible_sequence_length = max_frame_steps_event_to_end + max_frame_steps_event_to_beginning + 1
    
    # plus 1 to include the event frame itself
    if sequence_length > max_possible_sequence_length:
        raise Exception(f"Not possible for frame step {frame_step} and sequence length {sequence_length}. Maximum possible sequence length is {max_possible_sequence_length}")

    # Min and max start frame in which the event frame is still included at the exact point
    # considering the requested sequence length and frame step size.
    min_start_frame_idx = event_frame_index - frame_step * min(sequence_length - 1, max_frame_steps_event_to_beginning)
    max_start_frame_idx = event_frame_index - frame_step * max(0, sequence_length - 1 - max_frame_steps_event_to_end)

    frame_indexes = []
    
    for start_frame_idx in range(min_start_frame_idx, (max_start_frame_idx + frame_step), frame_step):
        # generate the index
        frame_indexes.append([i for i in range(start_frame_idx, start_frame_idx + sequence_length * frame_step, frame_step)])

    frame_indexes = np.array(frame_indexes)
    labels = np.array(frame_indexes >= event_frame_index, dtype=np.int8)
    
    return frame_indexes, labels

# test
video_filepath = 'data/hand_collission.mp4'

sample_result = get_frame_indexes_surrounding_event(video_filepath, event_timestamp_millis=1000, sequence_length=6, frame_step=13)
assert sample_result[0].shape == (3, 6)

if DEBUG:
    print(f'Data shape: {sample_result[0].shape}')
    print(f'Label shape: {sample_result[1].shape}')
    print(sample_result)

try:
    get_frame_indexes_surrounding_event(video_filepath, event_timestamp_millis=1000, sequence_length=14, frame_step=13)
    assert False # shouldn't get to this code as we expect exception to be thrown
except Exception as exc:
    if DEBUG:
        print(f'Received expected exception with message "{exc}".')

sample_result = get_frame_indexes_surrounding_event(video_filepath, event_timestamp_millis=1000, sequence_length=8, frame_step=13)
assert sample_result is not None

if DEBUG:
    print(f'Data shape: {sample_result[0].shape}')
    print(f'Label shape: {sample_result[1].shape}')
    print(sample_result)
    
print('All tests OK.')

In [None]:
# This function will get image frames from the given video file path,
# for the requested image frame indexes.
# You can request several image frame index sequences.
# Each row in the frame index sequence array corresponds to each sequence.
# The returned image frames will be in form of numpy,
# The numpy array will be arranged following the requested image frame index sequences.
# Returned images will be in RGB format.
# The numpy arrays for the image frames are read-only.
# The same image frame will share the same memory location, even though they appear in multiple sequences.
def get_image_frames(video_filepath, frame_index_sequences):
    vid_cap = open_video(video_filepath)
    
    unique_frame_indexes = set([idx for sequence in frame_index_sequences for idx in sequence])
    
    frames_map = {}
    
    for frame_idx in unique_frame_indexes:
        ret = vid_cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        if not ret:
            raise Exception(f'Failed to set the frame position for the VideoCapture.')
        
        ret, frame = vid_cap.read()
        if not ret:
            raise Exception(f'Failed to read image frame index {frame_idx}.')
        
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        frame = np.array(frame)
        
        # Set the numpy array to be read only, because we want this same array to be referenced
        # in multiple location in the returned array.
        frame.flags.writeable = False
        
        frames_map[frame_idx] = frame
        
    vid_cap.release()
    
    vid_frames = [np.array([frames_map[frame_idx] for frame_idx in sequence]) for sequence in frame_index_sequences]
    
    return vid_frames

# test
video_filepath = 'data/hand_collission.mp4'

result = get_image_frames(video_filepath, [[1,2,3], [2,3,4,5]])
assert result[0].shape == (3, 720, 1280, 3)
assert result[1].shape == (4, 720, 1280, 3)

if DEBUG:
    print(result)

print('All tests OK.')

Below I'll show the GIF of the filtered frames. I simulate a "collision" of my hands in the test video that I'm using. My hands collide at around timestamp 1500 msec. The code then extract a set of frames which include the exact frame at timestamp 1500 msec so we have the frame at the exact moment when my hand "collided".

In [None]:
# Example frames in GIF

import imageio
from IPython.display import Image
from pathlib import Path

tmp_dir = './tmp'
Path(tmp_dir).mkdir(parents=True, exist_ok=True)

video_filepath = 'data/hand_collission.mp4'
gif_out_file_path = f'{tmp_dir}/test.gif'

frame_index_sequences, labels = get_frame_indexes_surrounding_event(video_filepath, event_timestamp_millis=1500, sequence_length=11, frame_step=10)
vid_frames = get_image_frames(video_filepath, frame_index_sequences)

imageio.mimsave(gif_out_file_path, vid_frames[0], fps=10)
Image(filename=gif_out_file_path)

In [None]:
def get_index_proportion_of_ones(labels, min_proportion_of_ones, max_proportion_of_ones):
    proportion_of_ones = np.sum(labels, axis=-1) / labels.shape[-1]
    
    return (min_proportion_of_ones <= proportion_of_ones) & (proportion_of_ones <= max_proportion_of_ones)

# test
video_filepath = 'data/hand_collission.mp4'
frame_index_sequences, labels = get_frame_indexes_surrounding_event(video_filepath, event_timestamp_millis=1500, sequence_length=7, frame_step=10)
indexes = get_index_proportion_of_ones(labels, 0.5, 0.8)

assert frame_index_sequences[indexes].shape == (2, 7)
assert labels[indexes].shape == (2, 7)

if DEBUG:
    print(frame_index_sequences[indexes])
    print(labels[indexes])

print('All tests OK.')

In [None]:
def get_frames_surrounding_event(video_filepath, event_timestamp_millis, sequence_length, frame_step, min_proportion_of_after_event_frames, max_proportion_of_after_event_frames):
    frame_idxs, labels = get_frame_indexes_surrounding_event(video_filepath, event_timestamp_millis, sequence_length, frame_step)
    filter_idxs = get_index_proportion_of_ones(labels, min_proportion_of_after_event_frames, max_proportion_of_after_event_frames)
    frame_idxs = frame_idxs[filter_idxs]
    labels = labels[filter_idxs]
    frame_imgs = get_image_frames(video_filepath, frame_idxs)
    frame_imgs = np.array(frame_imgs) # convert to numpy array since the shape is homogeneous
    
    return frame_imgs, labels

# test
video_filepath = 'data/hand_collission.mp4'
frame_imgs, labels = get_frames_surrounding_event(video_filepath,
                                                  event_timestamp_millis=1500,
                                                  sequence_length=15,
                                                  frame_step=5,
                                                  min_proportion_of_after_event_frames=0.4,
                                                  max_proportion_of_after_event_frames=0.6)

assert frame_imgs.shape == (3, 15, 720, 1280, 3)
assert labels.shape == (3, 15)

if DEBUG:
    print('frame_imgs.shape:')
    print(frame_imgs.shape)
    print('labels.shape:')
    print(labels.shape)
    print('labels:')
    print(labels)
    
print('All tests OK.')