In this task, We are going to implement optical flow with lucas kanade method. In the start, we will import all the necessay libraries and also include the display function.

In [1]:
import cv2
import numpy as np
from matplotlib import pyplot as plt


# Define our imshow function 
def imshow(title = "Image", image = None, size = 10):
    w, h = image.shape[0], image.shape[1]
    aspect_ratio = w/h
    plt.figure(figsize=(size * aspect_ratio,size))
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    plt.title(title)
    plt.show()

Now we will implement the Lucas Kanade with following steps 

1- Video Stream Initialization: First we will load a video stream from a file. There are two options available: a short clip or a long clip.

2- ShiTomasi Corner Detection Parameters: Then we will set the parameters for ShiTomasi corner detection. These parameters will define the maximum number of corners to detect (maxCorners), the quality level for accepting corners (qualityLevel), the minimum distance between corners (minDistance), and the block size for corner detection (blockSize).

3- Lucas-Kanade Optical Flow Parameters: Then code will set the parameters for the Lucas-Kanade optical flow algorithm. These parameters will include the window size (winSize) for tracking, the maximum pyramid level (maxLevel), and the termination criteria (criteria) for the iterative optimization process.

4- Random Colors Generation: Then we will generates random colors to use for drawing the trails of object movement in the image. These colors will be stored in the color array.

5- Initialization of the First Frame: The code will read the first frame of the video and then applies the ShiTomasi corner detection on the first frame to find the initial corner locations. These corners are stored in the prev_corners variable.

6- Main Loop for Optical Flow Calculation: The code woll then enter into a loop where it will read frames from the video stream using. For each frame, it will convert it to grayscale and apply the Lucas-Kanade optical flow algorithm. This function will estimate the optical flow between the previous frame and the current frame, providing the new corner locations (new_corners), status of tracked points (status), and tracking errors (errors).

7- Selecting Good Points: The code will then select the tracked points that will have a status of 1 (successfully tracked) and stores them in the good_new and good_old variables.

8- Drawing Optical Flow Visualization: Then it will iterate over the selected points and draw the trails, circles, and direction arrows on the mask and frame. The trails and arrows will be drawn. The mask and frame are combined.

9- Saving and Updating Frames: The code will then save the resulting image img to the output video. It will then update the previous frame (prev_gray) with the current frame (frame_gray) and update the previous corners (prev_corners) with the newly tracked points.

10- Video Cleanup: Once the video stream ends, the code will release the video capture and writer objects.

In [3]:
# Load video stream, short clip
#cap = cv2.VideoCapture('walking_short_clip.mp4')

# Load video stream, long clip
cap = cv2.VideoCapture('walking.avi')

# Get the height and width of the frame (required to be an interger)
width = int(cap.get(3)) 
height = int(cap.get(4))

# Define the codec and create VideoWriter object. The output is stored in '*.avi' file.
out = cv2.VideoWriter('optical_flow_walking.avi', cv2.VideoWriter_fourcc('M','J','P','G'), 30, (width, height))

# Set parameters for ShiTomasi corner detection
feature_params = dict( maxCorners = 100,
                       qualityLevel = 0.3,
                       minDistance = 7,
                       blockSize = 7 )

# Set parameters for lucas kanade optical flow
lucas_kanade_params = dict( winSize  = (15,15),
                  maxLevel = 2,
                  criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Create some random colors
# Used to create our trails for object movement in the image 
color = np.random.randint(0,255,(100,3))

# Take first frame and find corners in it
ret, prev_frame = cap.read()
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

# Find inital corner locations
prev_corners = cv2.goodFeaturesToTrack(prev_gray, mask = None, **feature_params)

# Create a mask image for drawing purposes
mask = np.zeros_like(prev_frame)

while(1):
    ret, frame = cap.read()

    if ret == True:
      frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

      # calculate optical flow
      new_corners, status, errors = cv2.calcOpticalFlowPyrLK(prev_gray, 
                                                            frame_gray, 
                                                            prev_corners, 
                                                            None, 
                                                            **lucas_kanade_params)

      # Select and store good points
      good_new = new_corners[status==1]
      good_old = prev_corners[status==1]

      # Draw the tracks
      for i, (new, old) in enumerate(zip(good_new, good_old)):
        a, b = new.ravel()
        c, d = old.ravel()

        u = good_new[:,0] - good_old[:,0]
        v = good_new[:,1] - good_old[:,1]

        # Calculate the end points of the arrows
        arrow_len = 10
        arrow_scale = 3
        points = np.int32(good_new)
        endpoints = points + arrow_scale*np.int32([u, v]).T


        # Draw the line and arrow on the mask
        mask = cv2.line(mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
        
        '''
        uncomment below Mask to show direction arrow 
        '''
        mask = cv2.arrowedLine(mask, (int(a), int(b)), tuple(endpoints[i]), (0, 0, 255), thickness=2, tipLength=0.5)

        # Draw the circle on the frame
        frame = cv2.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)
          
      img = cv2.add(frame,mask)

      # Save Video
      out.write(img)
      # Show Optical Flow
      #imshow('Optical Flow - Lucas-Kanade',img)

      # Now update the previous frame and previous points
      prev_gray = frame_gray.copy()
      prev_corners = good_new.reshape(-1,1,2)

    else:
      break
    
cap.release()
out.release()

In [7]:
from IPython.display import HTML
from base64 import b64encode

mp4 = open('optical_flow_walking.avi','rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()

In [10]:
HTML("""
<video controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)