Main source:
https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_video/py_lucas_kanade/py_lucas_kanade.html#lucas-kanade

In [1]:
import os
import numpy as np
import cv2
import time
from imageio import imwrite
from datetime import datetime

In [2]:
from motion_sig_utils import array_to_image, end_loop_util, writeAnotationsToCV

In [3]:
# in-notebook display
import IPython.display

### define paths

In [4]:
data_path = r"C:\Users\alexa\Documents\data\mlproj_data"

In [5]:
vid_folder='merayxu/campus'

In [6]:
folders = os.listdir(os.path.join(data_path, vid_folder))
folders = [f for f in folders if '.' not in f]
vids = {}
for f in folders:
    vids[f] = os.listdir(os.path.join(data_path, vid_folder, f))
    vids[f] = [v for v in vids[f] if 'mp4' in v]

In [7]:
f = folders[0]
v = 2
vid_file = vids[f][v]
vid_path = os.path.join(data_path, vid_folder, f, vid_file)
print(vid_path)

C:\Users\alexa\Documents\data\mlproj_data\merayxu/campus\Auditorium\view-IP2.mp4


In [8]:
vid_file = r"Test video for Object Detection __ TRIDE.mp4"
vid_file = r"view-Contour2.mp4"
vid_path = os.path.join(data_path,vid_file)
print(vid_path)

C:\Users\alexa\Documents\data\mlproj_data\view-Contour2.mp4


### video properties

In [9]:
cap = cv2.VideoCapture(vid_path)

In [10]:
total_frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
fps = cap.get(cv2.CAP_PROP_FPS)
dur = total_frame_count / fps
print('{:.2f} seconds'.format(dur))
print('{:.0f} fps'.format(fps))
print('{:.0f} frames'.format(total_frame_count))
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
print('{:.0f}x{:.0f}'.format(width,height))

99.53 seconds
30 fps
2983 frames
1920x1080


### run GF OF

In [11]:
cap = cv2.VideoCapture(vid_path)

# set parameters
sample_freq = 3 # seconds
max_frame_count = 600
OF_flag = True
OF_frame_freq = int(fps * sample_freq)
save_masks_flag = True
save_frame_freq = OF_frame_freq # int(dur)
save_raw_flag = False
display_flag = False
display_only_mask_flag = True
reduce_reso_factor = 1.5
reduce_reso_flag = True
new_width = int(width/reduce_reso_factor)
new_height = int(height/reduce_reso_factor)
reset_mask = True
write_csv_flag = True

# initialize vars
if display_flag: dipy = IPython.display.display("", display_id=1)
frame_count, img_count = 0, 0
now_string = datetime.now().strftime(r'%Y%m%d_%H%M%S')
t0 = time.time()

ret, frame1 = cap.read()
if reduce_reso_flag: frame1 = cv2.resize(frame1,(new_width,new_height),fx=0,fy=0, interpolation = cv2.INTER_CUBIC)
prvs = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY)
hsv = np.zeros_like(frame1)
hsv[...,1] = 255

print('running through video...')
while(True):
    try:
        # read frame
        frame_count += 1
        ret, frame = cap.read()
       
        # check stream finished or not
        if not ret:
            tend = end_loop_util(cap, "stream finished\n")
            break
        
        # optional: reduce resolution
        if reduce_reso_flag:
            frame = cv2.resize(frame,(new_width,new_height),fx=0,fy=0, interpolation = cv2.INTER_CUBIC)
        
        # get gray color space
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
           
        # apply optical flow
        if OF_flag&(frame_count %  OF_frame_freq == 0):
        
            # calculate optical flow
            flow = cv2.calcOpticalFlowFarneback(
                prvs
                ,frame_gray
                ,flow=None
                ,pyr_scale=0.5
                ,levels=3
                ,winsize=15
                ,iterations=3
                ,poly_n=5
                ,poly_sigma=1.2
                ,flags=0
            )

            # draw motion signature
            mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
            hsv[...,0] = ang*180/np.pi/2
            hsv[...,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX)
            mask = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR)
            
            # define output image
            img = mask
            if not display_only_mask_flag:
                img = cv2.add(frame,img)
        else:
            # no optical flow: output image is original frame
            img = frame
            
        # update previous frame once processed
        prvs = frame_gray.copy()
        
        # display output image (mask / original frame / both)
        if display_flag:
            im = array_to_image(img)
            dipy.update(im)
        
        # save mask
        if (save_masks_flag)&(frame_count %  save_frame_freq == 0):
            mask_amend = mask.copy()
            #mask_amend[mask_amend == 0] = 255 # convert background from black to white
            imwrite('masks/{}_mask_{}.jpg'.format(now_string,img_count+1), mask_amend)
            img_count += 1
            if reset_mask:
                mask = np.zeros_like(mask)
                
            # save raw
            if (save_raw_flag)&(frame_count % save_frame_freq == 0):
                imwrite('masks/{}_raw_{}.jpg'.format(now_string,img_count), frame)
                
            # write log in csv
            if write_csv_flag:
                writeAnotationsToCV(vid_file,frame_count,'{}_mask_{}.jpg'.format(now_string,img_count),'masks/',now_string)
        
        # stop running if max_frame_count reached
        if frame_count > max_frame_count:
            tend = end_loop_util(cap, "reached max frame count\n")
            break
            
    except KeyboardInterrupt: # manual interruption
        tend = end_loop_util(cap, "stream interrupted\n")
        break

tend = time.time()
print('normal speed: {:.2f}s'.format(max_frame_count / fps))
print('run speed: {:.2f}s'.format(tend - t0))

stream interrupted

normal speed: 20.02s
run speed: 9.22s
