In [9]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from concurrent.futures import ThreadPoolExecutor

## Read video file and applay mask

In [2]:
#load mp4 test video
cap = cv2.VideoCapture('test_data/test_pallina_piccola_telefono.mp4')

#set video start position (in milliseconds)
cap.set(cv2.CAP_PROP_POS_MSEC,11800)

#get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

#create a mask to get only the tube
ret, frame = cap.read()
mask = np.zeros_like(frame)
cv2.rectangle(mask, (150, 50), (380, 800), (255, 255, 255), -1)


#apply mask to video and put frames in a list
frames = []
while(cap.isOpened()):
    ret, frame = cap.read()
    if ret == True:
        frame = cv2.bitwise_and(frame, mask)
        frames.append(frame)
    else:
        break

cap.release()


## Compute difference between frames $n$ and $n-1$
we compute the difference and then produce a video to see the result

In [3]:
#compute difference between consecutive frames
diff_frames = []
for i in range(len(frames)-1):
    diff = cv2.absdiff(frames[i], frames[i+1])
    diff_frames.append(diff)

In [4]:
#create video with difference frames
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('test_data/diff.avi', fourcc, fps, (frame_width, frame_height))
for i in range(len(diff_frames)):
    out.write(diff_frames[i])
out.release()


## Create new mask

In [7]:
import warnings

warnings.filterwarnings("ignore", message="KMeans is known to have a memory leak on Windows")

In [8]:
bw_frames = []
for i in range(len(diff_frames)):
    bw = cv2.cvtColor(diff_frames[i], cv2.COLOR_BGR2GRAY)
    #rescale image to 0-255
    bw = cv2.normalize(bw, None, 0, 255, cv2.NORM_MINMAX)
    bw_frames.append(bw)


402


In [10]:
#find centroid of the ball in each frame using k-means clustering (k=1)
def find_centroid(frame):
    y, x = np.where(frame == 255)
    coordinates = np.column_stack([x, y])
    if len(coordinates) > 0:
        kmeans = KMeans(n_clusters=1)
        kmeans.fit(coordinates)
        centroid = kmeans.cluster_centers_
        cv2.circle(frame, (int(centroid[0][0]), int(centroid[0][1])), 10, (0, 0, 255), 5)
        return centroid

In [13]:
with ThreadPoolExecutor() as executor:
    centroids = list(executor.map(find_centroid, bw_frames))


In [27]:
#create a new mask to better isolate the falling ball
#mask is circular and is centered in the centroid of the ball

#last frame uses same centroid as previous frame
centroids.append(centroids[-1])

frames_msked = []
for i in range(len(frames)):
    mask = np.zeros_like(frames[0])
    cv2.circle(mask, (int(centroids[i][0][0]), int(centroids[i][0][1])), 20, (255, 255, 255), -1)
    frame = cv2.bitwise_and(frames[i], mask)
    frames_msked.append(frame)


In [28]:
#make a video with masked frames
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('test_data/masked.avi', fourcc, fps, (frame_width, frame_height))
for i in range(len(frames_msked)):
    out.write(frames_msked[i])
out.release()