### NOTE: Neither the lines of code nor text have been proofread

In [1]:
import cv2 as cv
import numpy as np

### Optical Flow

This enables one see the apparent motion of an object between successive frames. The theory behind it can be read at:
<br>
https://docs.opencv.org/3.4/d4/dee/tutorial_optical_flow.html

Achieving optical motion requires solving an optical flow equation, but this equation has two unknowns. To get round this, assumptions are made; several methods subsequently exist to solve the optical flow equation. Two methods are demonstrated

**Lucas-Kanade Method**

Quite simply, this method takes a 3 &times; 3 patch around each point in the object's motion such that for each point we have (<em>f<sub>x</sub></em>, <em>f<sub>y</sub></em>, <em>f<sub>t</sub></em>), 't' meaning time since optical flow is a 2D vector. The previous points, previous frame, and next frame are passed on to <code>cv.calcOpticalFlowPyrLK() </code> which returns the next points (for the next frame). See the aforementioned webpage for more details of the theory.

An implementation is below

In [2]:
capture = cv.VideoCapture('bank_videos/M6 Motorway Traffic_Extract.mp4')

# parameters for ShiTomasi corner detection
feature_params = dict( maxCorners = 100,
                       qualityLevel = 0.3,
                       minDistance = 7,
                       blockSize = 7 )

# Parameters for lucas kanade optical flow
lk_params = dict( winSize  = (15, 15),
                  maxLevel = 2,
                  criteria = (cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 10, 0.03))

# Create some random colors
color = np.random.randint(0, 255, (100, 3))

# Take first frame and find corners in it
ret, old_frame = capture.read()
old_gray = cv.cvtColor(old_frame, cv.COLOR_BGR2GRAY)
p0 = cv.goodFeaturesToTrack(old_gray, mask = None, **feature_params)

# Create a mask image for drawing purposes
mask = np.zeros_like(old_frame)

while capture.isOpened():
    ret, frame = capture.read()
    # if frame is read correctly ret is True
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break
    else:
        frame_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        
       
        # calculate optical flow
        p1, st, err = cv.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

        # Select good points
        if p1 is not None:
            good_new = p1[st==1]
            good_old = p0[st==1]

        # draw the tracks
        for i, (new, old) in enumerate(zip(good_new, good_old)):
            a, b = new.ravel()
            c, d = old.ravel()
            mask = cv.line(mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
            frame = cv.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)
        img = cv.add(frame, mask)
        old_gray = frame_gray.copy()
        p0 = good_new.reshape(-1, 1, 2)
        cv.imshow('frame', img)
        if cv.waitKey(12) == ord('q'):
            break

Can't receive frame (stream end?). Exiting ...


In [3]:
capture.release()

In [4]:
cv.destroyAllWindows()

In [5]:
del(feature_params,lk_params,capture)

Let's save this

In [6]:
# Read video
capture = cv.VideoCapture('bank_videos/M6 Motorway Traffic_Extract.mp4')

# Get its size
width = capture.get(cv.CAP_PROP_FRAME_WIDTH)
height = capture.get(cv.CAP_PROP_FRAME_HEIGHT)
fps = capture.get(cv.CAP_PROP_FPS)
count = capture.get(cv.CAP_PROP_FRAME_COUNT)

properties = [int(width), int(height), int(fps), int(count)]
properties

[1280, 720, 30, 503]

In [10]:
capture = cv.VideoCapture('bank_videos/M6 Motorway Traffic_Extract.mp4')

# Define the codec and create VideoWriter object
codec = cv.VideoWriter_fourcc(*"mp4v")
out = cv.VideoWriter('bank_videos/M6 Motorway Traffic_Extract_OpticalFlowLK.mp4', codec, 30.0, (1280, 720))

# parameters for ShiTomasi corner detection
feature_params = dict( maxCorners = 100,
                       qualityLevel = 0.3,
                       minDistance = 7,
                       blockSize = 7 )

# Parameters for lucas kanade optical flow
lk_params = dict( winSize  = (15, 15),
                  maxLevel = 2,
                  criteria = (cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 10, 0.03))

# Create some random colors
color = np.random.randint(0, 255, (100, 3))

# Take first frame and find corners in it
ret, old_frame = capture.read()
old_gray = cv.cvtColor(old_frame, cv.COLOR_BGR2GRAY)
p0 = cv.goodFeaturesToTrack(old_gray, mask = None, **feature_params)

# Create a mask image for drawing purposes
mask = np.zeros_like(old_frame)

while capture.isOpened():
    ret, frame = capture.read()
    # if frame is read correctly ret is True
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break
    else:
        frame_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        
       
        # calculate optical flow
        p1, st, err = cv.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

        # Select good points
        if p1 is not None:
            good_new = p1[st==1]
            good_old = p0[st==1]

        # draw the tracks
        for i, (new, old) in enumerate(zip(good_new, good_old)):
            a, b = new.ravel()
            c, d = old.ravel()
            mask = cv.line(mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
            frame = cv.circle(frame, (int(a), int(b)), 5, color[i].tolist(), -1)
        img = cv.add(frame, mask)
        old_gray = frame_gray.copy()
        p0 = good_new.reshape(-1, 1, 2)
        out.write(img)
        cv.imshow('frame', img)
        if cv.waitKey(12) == ord('q'):
            break

Can't receive frame (stream end?). Exiting ...


In [11]:
capture.release()
out.release()

In [12]:
cv.destroyAllWindows()

Done!

**Dense Optical Flow**

Alternatively, we could compute the optical flow for all the points in the frame; see https://docs.opencv.org/3.4/d4/dee/tutorial_optical_flow.html

In [14]:
capture = cv.VideoCapture('bank_videos/M6 Motorway Traffic_Extract.mp4')

ret, frame = capture.read()
prvs = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
hsv = np.zeros_like(frame)
hsv[..., 1] = 255

while capture.isOpened():
    ret, frame = capture.read()
    if not ret:
        print('No frames grabbed!')
        break
    else:
        next = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        flow = cv.calcOpticalFlowFarneback(prvs, next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        mag, ang = cv.cartToPolar(flow[..., 0], flow[..., 1])
        hsv[..., 0] = ang*180/np.pi/2
        hsv[..., 2] = cv.normalize(mag, None, 0, 255, cv.NORM_MINMAX)
        frame = cv.cvtColor(hsv, cv.COLOR_HSV2BGR)

        cv.imshow('frame', frame)
        if cv.waitKey(12) == ord('q'):
            break
        prvs = next

No frames grabbed!


In [15]:
capture.release()

In [16]:
cv.destroyAllWindows()

Here, as the direction changes, so does the hue.

Let's save this!

In [17]:
capture = cv.VideoCapture('bank_videos/M6 Motorway Traffic_Extract.mp4')

# Define the codec and create VideoWriter object
codec = cv.VideoWriter_fourcc(*"mp4v")
out = cv.VideoWriter('bank_videos/M6 Motorway Traffic_Extract_OpticalFlowDense.mp4', codec, 30.0, (1280, 720))

ret, frame = capture.read()
prvs = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
hsv = np.zeros_like(frame)
hsv[..., 1] = 255

while capture.isOpened():
    ret, frame = capture.read()
    if not ret:
        print('No frames grabbed!')
        break
    else:
        next = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        flow = cv.calcOpticalFlowFarneback(prvs, next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        mag, ang = cv.cartToPolar(flow[..., 0], flow[..., 1])
        hsv[..., 0] = ang*180/np.pi/2
        hsv[..., 2] = cv.normalize(mag, None, 0, 255, cv.NORM_MINMAX)
        frame = cv.cvtColor(hsv, cv.COLOR_HSV2BGR)

        out.write(frame)
        cv.imshow('frame', frame)
        if cv.waitKey(12) == ord('q'):
            break
        prvs = next

No frames grabbed!


In [18]:
capture.release()
out.release()

In [19]:
cv.destroyAllWindows()

Done