Define method for calculating optical flow using the Horn-Shunck algorithm

In [1]:
import cv2
from scipy import signal
import numpy

X_FILTER = [[-1, 1], [-1, 1]]
Y_FILTER = [[-1, -1], [1, 1]]
T_FILTER = [[-1, -1],[-1, -1]]
LAP_FILTER = [[0, -0.25, 0], [-0.25, 1, -0.25], [0, -0.25, 0]]

# Calculate optical flow between two images (frames) using the Horn-Shunck algorithm
#   alpha is a parameter
#   k is the number of iterations until 'convergence' (lazy lazy...)
def calcflowHS(frame1, frame2, alpha, k):
    # Calculate derivatives over images
    #   filter over each frame and sum them for change between frames
    fx = signal.convolve(frame1, X_FILTER) + signal.convolve(frame2, X_FILTER)
    fy = signal.convolve(frame1, Y_FILTER) + signal.convolve(frame2, Y_FILTER)
    ft = signal.convolve(frame1, T_FILTER) + signal.convolve(frame2, -T_FILTER)

    # Initialize flows at zero
    u = np.zeros(frame1.shape[0], frame1.shape[1])
    v = np.zeros(frame2.shape[0], frame2.shape[1])

    # Iterate until error measure specifies convergence
    #   (I am being lazy with the error measure for now and just performing a set num iterations)
    d = alpha + fx**2 + fy**2
    for i in range(k):
        #   Calculate averages across last iteration's estimate
        u_avg = signal.convolve(u, LAP_FILTER)
        v_avg = signal.convolve(v, LAP_FILTER)

        p = numpy.dot(fx, u_avg) + numpy.dot(fy, v_avg) + ft

        #   Calculate new estimates
        u = u_avg - numpy.dot(fx, numpy.divide(p, d))
        v = v_avg - numpy.dot(fy, numpy.divide(p, d))

    return (u, v)

Use James' code for HoF

In [2]:
import cv2
import numpy as np
from matplotlib import pyplot as plt
from scipy import sqrt, pi, arctan2, cos, sin
from scipy.ndimage import uniform_filter

# Gets the optical flow [<dx,dy>] from two frames
def getOpticalFlow(imPrev, imNew):
    flow = cv2.calcOpticalFlowFarneback(imPrev, imNew, flow=None, pyr_scale=.5, levels=3, winsize=9, iterations=1, poly_n=3, poly_sigma=1.1, flags=cv2.OPTFLOW_FARNEBACK_GAUSSIAN)
    return flow

# Compute the Histogram of Optical Flow (HoF) from the given optical flow
def hof(flow, orientations=9, pixels_per_cell=(10, 10),
        cells_per_block=(4, 3), normalise=False, motion_threshold=1.):
    flow = np.atleast_2d(flow)

    if flow.ndim < 3:
        raise ValueError("Requires dense flow in both directions")

    if normalise:
        flow = sqrt(flow)

    if flow.dtype.kind == 'u':
        flow = flow.astype('float')

    gx = np.zeros(flow.shape[:2])
    gy = np.zeros(flow.shape[:2])

    gx = flow[:,:,1]
    gy = flow[:,:,0]

    magnitude = sqrt(gx**2 + gy**2)
    orientation = arctan2(gy, gx) * (180 / pi) % 180

    sy, sx = flow.shape[:2]
    cx, cy = pixels_per_cell
    bx, by = cells_per_block

    n_cellsx = int(np.floor(sx // cx))
    n_cellsy = int(np.floor(sy // cy))

    orientation_histogram = np.zeros((n_cellsy, n_cellsx, orientations))
    subsample = np.index_exp[cy / 2:cy * n_cellsy:cy, cx / 2:cx * n_cellsx:cx]
    for i in range(orientations-1):
        temp_ori = np.where(orientation < 180 / orientations * (i + 1),
                            orientation, -1)
        temp_ori = np.where(orientation >= 180 / orientations * i,
                            temp_ori, -1)

        cond2 = (temp_ori > -1) * (magnitude > motion_threshold)
        temp_mag = np.where(cond2, magnitude, 0)

        temp_filt = uniform_filter(temp_mag, size=(cy, cx))
        orientation_histogram[:, :, i] = temp_filt[subsample]

    temp_mag = np.where(magnitude <= motion_threshold, magnitude, 0)

    temp_filt = uniform_filter(temp_mag, size=(cy, cx))
    orientation_histogram[:, :, -1] = temp_filt[subsample]

    n_blocksx = (n_cellsx - bx) + 1
    n_blocksy = (n_cellsy - by) + 1
    normalised_blocks = np.zeros((n_blocksy, n_blocksx,
                                  by, bx, orientations))

    for x in range(n_blocksx):
        for y in range(n_blocksy):
            block = orientation_histogram[y:y+by, x:x+bx, :]
            eps = 1e-5
            normalised_blocks[y, x, :] = block / sqrt(block.sum()**2 + eps)

    return normalised_blocks.ravel()

FIXED_WIDTH = 160
FIXED_HEIGHT = 120
def normalizeFrame(frame_original):
    frame_gray = cv2.cvtColor(frame_original,cv2.COLOR_BGR2GRAY)
    frame_gray_resized = cv2.resize(frame_gray, (FIXED_WIDTH, FIXED_HEIGHT))
    return frame_gray_resized

Redefine HoF descriptor method to be able to use either flow method

In [3]:
# get the Histogram of Optical Flows of a video grouped sequentially in a 1D array
def getSequentialHoF(video_path, hs):
    hofs = []
    cap = cv2.VideoCapture(video_path)
    ret1, frame1 = cap.read()
    frame1 = normalizeFrame(frame1)
    while(cap.isOpened()):
        ret2, frame2 = cap.read()
        if ret2 == True:
            frame2 = normalizeFrame(frame2)
            
            # Gut original getHoF method and put it here, with option to switch flow types
            flow = calcFlowHS(frame1, frame2, 1, 100) if hs else getOpticalFlow(frame1, frame2)
            hof_array = hof(flow, pixels_per_cell=(20,20), cells_per_block=(5,5))
            
            hofs = np.concatenate((hofs, hof_array),axis=0)
            frame1 = frame2
        else:
            break
    return hofs

Create features from videos

In [4]:
from sklearn import svm
import os

# Collect the file path of all the running and walking videos,
# we will only be using these 2 classes
RUN_DIR = "./hof/walk"
RUN_FILES = os.listdir(RUN_DIR)
RUN_FILES = [RUN_DIR + f for f in RUN_FILES]
WALK_DIR = "./hof/run"
WALK_FILES = os.listdir(WALK_DIR)
WALK_FILES = [WALK_DIR + f for f in WALK_FILES]

# Use equal number of data from each class
nc = min(len(RUN_FILES), len(WALK_FILES))
print "nc:", nc
RUN_FILES = RUN_FILES[0:nc]
WALK_FILES = WALK_FILES[0:nc]

RATIO = 0.9
offset = int(np.floor(nc*RATIO))
print "offset:", offset

# Split test and training at a ratio of 1:9
train_files = RUN_FILES[0:offset] + WALK_FILES[0:offset]
test_files = RUN_FILES[offset:nc] + WALK_FILES[offset:nc]

# Put the labels in vectors
train_labels = np.zeros(offset*2, int)
train_labels[0:offset] = 1 #RUN=1
train_labels[offset:offset*2] = 2 #WALK=2

test_len = nc-offset
test_labels = np.zeros(test_len*2, int)
test_labels[0:test_len] = 1 #RUN=1
test_labels[test_len:test_len*2] = 2 #WALK=2

print "train files:", len(train_files)
print "train labels:", len(train_labels)
print "test files:", len(test_files)
print "test labels:", len(test_labels)

trainHS = [getSequentialHoF(p, True) for p in train_files]
testHS = [getSequentialHoF(p, True) for p in test_files]
train = [getSequentialHoF(p, False) for p in train_files]
test = [getSequentialHoF(p, False) for p in test_files]

max_width = max(np.array([len(i) for i in train]).max(),np.array([len(i) for i in test]).max())
def numpy_fillna(data, width):
    # Get lengths of each row of data
    lens = np.array([len(i) for i in data])

    # Mask of valid places in each row
    mask = np.arange(width) < lens[:,None]

    # Setup output array and put elements from data into masked positions
    out = np.zeros(mask.shape, dtype=data.dtype)
    out[mask] = np.concatenate(data)
    return out

train_pad = numpy_fillna(np.array(train), max_width)
test_pad = numpy_fillna(np.array(test), max_width)
train_pad_HS = numpy_fillna(np.array(train), max_width)
test_pad_HS = numpy_fillna(np.array(test), max_width)

nc: 232
offset: 208
train files: 416
train labels: 416
test files: 48
test labels: 48


error: /build/opencv/src/opencv-3.2.0/modules/imgproc/src/color.cpp:9748: error: (-215) scn == 3 || scn == 4 in function cvtColor


Create and test SVM model

In [None]:
from sklearn import svm

clf = svm.SVC()
clf.fit(train_pad, train_labels)
predict = clf.predict(test_pad)

from sklearn import metrics
print 'cv2: %f' % metrics.accuracy_score(predict, test_labels)

clf = svm.SVC()
clf.fit(train_pad_HS, train_labels)
predict = clf.predict(test_pad_HS)
print 'MyHS: %f' % metrics.accuracy_score(predict, test_labels)

Create and test decision tree model

In [None]:
from sklearn import tree
tree_clf = tree.DecisionTreeClassifier()
tree_clf.fit(train_pad, train_labels)
predict = tree_clf.predict(test_pad)
print 'cv2: %f' % metrics.accuracy_score(predict, test_labels)

tree_clf = tree.DecisionTreeClassifier()
tree_clf.fit(train_pad_HS, train_labels)
predict = tree_clf.predict(test_pad_HS)
print 'MyHS: %f' % metrics.accuracy_score(predict, test_labels)

Create and test random forest model

In [None]:
from sklearn import ensemble
rf_clf = ensemble.RandomForestClassifier()
rf_clf.fit(train_pad, train_labels)
predict = rf_clf.predict(test_pad)
print 'cv2: %f' % metrics.accuracy_score(predict, test_labels)

rf_clf = ensemble.RandomForestClassifier()
rf_clf.fit(train_pad_HS, train_labels)
predict = rf_clf.predict(test_pad_HS)
print 'MyHS: %f' % metrics.accuracy_score(predict, test_labels)

Create and test logistic regression

In [None]:
from sklearn import linear_model
lr_clf = linear_model.LogisticRegression()
lr_clf.fit(train_pad, train_labels)
predict = lr_clf.predict(test_pad)
print 'cv2: %f' % metrics.accuracy_score(predict, test_labels)

lr_clf = linear_model.LogisticRegression()
lr_clf.fit(train_pad_HS, train_labels)
predict = lr_clf.predict(test_pad_HS)
print 'MyHS: %f' % metrics.accuracy_score(predict, test_labels)