In [None]:
import cv2
import imutils
import numpy as np
from pathlib import Path
from natsort import natsorted
import time
import multiprocessing.pool
import pandas as pd

import matplotlib.pyplot as plt

%reload_ext autoreload
%autoreload 1
from main import (resize, getROIFromVideo, cropWithROI, getTemplatesFromVideo, 
                  SelectionWindow, CalibWindow, getTemplateMatches, getOutputVidFrameSize)

# Crop Video

In [None]:
vidPath = "screw.mp4"
crop_roi = getROIFromVideo(vidPath)
cv2.destroyAllWindows()

# Perspective

In [None]:
vidPath = "screw.mp4"
vs = cv2.VideoCapture(vidPath)
ret, frame = vs.read()
frame = cropWithROI(frame, crop_roi)

perspectiveWindow = SelectionWindow("Perspective", frame)
perspectiveWindow.displayWindow()

In [None]:
perspectiveWindow.selectionPts

In [None]:
objLength = 1.83
objWidth = 0.6
imgWidth = 200
imgHeight = round(objLength/objWidth*imgWidth)

srcPts = np.float32(perspectiveWindow.selectionPts)
dstPts = np.float32([(0, 0), (imgWidth, 0), (imgWidth, imgHeight), (0, imgHeight)])
M = cv2.getPerspectiveTransform(srcPts, dstPts)
dst = cv2.warpPerspective(frame, M, (imgWidth,imgHeight))
cv2.imshow("Transformed", dst)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [None]:
plt.imshow(cv2.cvtColor(dst, cv2.COLOR_BGR2RGB))

# Get Template(s)

In [None]:
# pipeline = lambda frame: cv2.warpPerspective(cropWithROI(frame, crop_roi), M, (imgWidth, imgHeight))
pipeline = lambda frame: cropWithROI(frame, crop_roi)
templates = getTemplatesFromVideo(vidPath, pipeline, templateWidth=100, templateHeight=100)

# Save Template(s)

In [None]:
templatesDir = Path("templates_screw")
imgPaths = natsorted([str(path) for path in templatesDir.glob("*.jpg")])

if imgPaths:
    latestImgN = int(Path(imgPaths[-1]).stem)+1
else:
    latestImgN = 0
    
for i in range(len(templates)):
    cv2.imwrite(str(templatesDir/f"{latestImgN+i}.jpg"), templates[i])

templates.clear()


# Load Template(s)

In [None]:
templatesDir = Path("templates_screw")
imgPaths = templatesDir.glob("*.jpg")

templates = []
for imgPath in imgPaths:
    template = cv2.imread(str(imgPath))
    templates.append(template)
    cv2.imshow("Template", template)
    cv2.waitKey(0)

cv2.waitKey(0)
cv2.destroyAllWindows()

# Match Templates

In [None]:
VID_PATH = "screw.mp4"
OUTPUT_VID_PATH = "screw_output.mp4"
OUTPUT_HEIGHT = 800

NMS_THRESHOLD = 0.7
CONFIDENCE_THRESHOLD = 0.8

IMSHOW_FLAG = True
WRITE_FLAG = True

num_cpu = multiprocessing.cpu_count() - 1
pool = multiprocessing.pool.ThreadPool(processes=num_cpu)

cap = cv2.VideoCapture(VID_PATH)

totalFrames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
frameCount = 0
startTime = time.time()

pipeline = lambda frame: cropWithROI(frame, crop_roi)
# pipeline = lambda frame: cv2.warpPerspective(cropWithROI(frame, crop_roi), M, (imgWidth, imgHeight))

frameWidth, frameHeight = getOutputVidFrameSize(VID_PATH, pipeline, OUTPUT_HEIGHT)
out = cv2.VideoWriter(OUTPUT_VID_PATH, cv2.VideoWriter_fourcc(*"mp4v"), cap.get(cv2.CAP_PROP_FPS), (frameWidth,frameHeight))
print(f"Output frame width: {frameWidth}, frame height: {frameHeight}")

saveFrames = []
saveBoxes = []

ret, frame = cap.read()
while ret: 
    if frameCount % 100 == 0 and frameCount != 0:
        elapsedTime = time.time()-startTime
        estTimeLeft = (totalFrames-frameCount)/frameCount*elapsedTime
        print(f"Frame {frameCount} out of {round(totalFrames)}.")
        print(f"\tTime taken: {round(elapsedTime)}s. Est. time left: {round(estTimeLeft)}s")

    img_rgb = pipeline(frame)

    # Multithreading
    mapIterable = []
    for i in range(len(templates)):
        template = templates[i]
        mapIterable.append((img_rgb, template, CONFIDENCE_THRESHOLD))
    results = pool.starmap(func=getTemplateMatches, iterable=mapIterable)

    boxes, confidences = [], []
    for result in results:
        boxes.extend(result[0])
        confidences.extend(result[1])

    # Serial
    # boxes, confidences = [], []
    # for template in templates:
    #     val = getTemplateMatches(img_rgb, template, CONFIDENCE_THRESHOLD)
    #     boxes.extend(val[0])
    #     confidences.extend(val[1])

    indices = cv2.dnn.NMSBoxes(boxes, confidences, CONFIDENCE_THRESHOLD, NMS_THRESHOLD)
    boxes = [boxes[idx] for idx in indices]
    confidences = [confidences[idx] for idx in indices]
    
    if boxes:
        saveFrames.append(frameCount)
        saveBoxes.append(boxes[0].copy())

    for box in boxes:
        cv2.rectangle(img_rgb, box[:2], box[2:], (0,0,255), 2)
    
    img_rgb = imutils.resize(img_rgb, height=OUTPUT_HEIGHT)

    if WRITE_FLAG:
        out.write(img_rgb)

    if IMSHOW_FLAG:
        cv2.imshow("Detections", img_rgb)
        key = cv2.waitKey(1)
        if key == ord('q') or key == ord('Q'):
            break
 
    ret, frame = cap.read()
    frameCount += 1

cv2.destroyAllWindows()
out.release()
cap.release()

Multithreading Time (1100 frames, 5 threads, 5 templates): 75s  
Serial Time (1100 frames, 5 templates): 193s

TODO: try with map_async

In [None]:
# import pickle
# f = open("temp.pkl", "wb")
# pickle.dump([saveFrames, saveBoxes], f)

# f = open("temp.pkl", "rb")
# pickle.load(f)

# Scale Calibration

In [None]:
cap = cv2.VideoCapture(VID_PATH)
ret, frame = cap.read()

img_rgb = pipeline(frame)

# CalibWin = CalibWindow("Calibration", img_rgb)
# CalibWin.displayWindow()
# calibPoints = CalibWin.calibPoints
# calibLength = 0.60
# scale = CalibWin.getCalibScale(calibLength)
# key = cv2.waitKey(0)

scale = img_rgb.shape[1]/objWidth
scale

In [None]:
img_rgb.shape

In [None]:
coords = np.array([[(b[0]+b[2])/2, (b[1]+b[3])/2] for b in saveBoxes])/scale
plt.scatter(saveFrames[::1], coords[:,1][::1], s=1)

# Write to CSV

In [None]:
csvPath = Path(VID_PATH).stem + ".csv"
df = pd.DataFrame([saveFrames, coords[:,0], coords[:,1]]).T
df.to_csv(csvPath, index=None, header=["frame","x","y"])

In [None]:
cap = cv2.VideoCapture(VID_PATH)
h = int(cap.get(cv2.CAP_PROP_FOURCC))
codec = chr(h&0xff) + chr((h>>8)&0xff) + chr((h>>16)&0xff) + chr((h>>24)&0xff)
codec

cap.get(cv2.CAP_PROP_FPS)

# Junk

In [None]:
cap = cv2.VideoCapture("video.mov")
ret, frame = cap.read()
while ret:    
    ret, frame = cap.read()

    frame = frame[int(roi[1]):int(roi[1]+roi[3]), int(roi[0]):int(roi[0]+roi[2])]

    cv2.imshow("RGB", frame)

    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    cv2.imshow("HSV", hsv)
    h,s,v = cv2.split(hsv)

    ret, th = cv2.threshold(v, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU)

    cv2.imshow("OTSU", th)
    cv2.imshow("V", v)

    rows = v.shape[0]
    circles = cv2.HoughCircles(v, cv2.HOUGH_GRADIENT_ALT, 1, rows/128,
                               param1=500, param2=0.8,
                               minRadius=1, maxRadius=200)
    
    if circles is not None:
        circles = np.uint16(np.around(circles))
        for i in circles[0, :]:
            center = (i[0], i[1])
            # circle center
            cv2.circle(frame, center, 1, (0, 100, 100), 1)
            # circle outline
            radius = i[2]
            cv2.circle(frame, center, radius, (255, 0, 255), 1)
    
    cv2.imshow("detected circles", frame)
    key = cv2.waitKey(0)

    if key == ord('q') or key == ord('Q'):
        break

cv2.destroyAllWindows()

In [None]:
import pysift
sift = cv2.SIFT_create()
def getTemplateMatchesSIFT(frame, template, confidenceThresh):
    # find the keypoints and descriptors with SIFT
    kp1, des1 = sift.detectAndCompute(frame, None)
    kp2, des2 = sift.detectAndCompute(template, None)

    FLANN_INDEX_KDTREE = 0
    index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=20)
    search_params = dict(checks=150)

    flann = cv2.FlannBasedMatcher(index_params, search_params)

    # find matches by knn which calculates point distance in 128 dim
    matches = flann.knnMatch(des1, des2, k=2)

    # store all the good matches as per Lowe's ratio test.
    good = []
    good_matches = [[0, 0] for i in range(len(matches))]
    for i, (m, n) in enumerate(matches):
        if m.distance < confidenceThresh*n.distance:
            good.append(m)
            good_matches[i] = [1, 0]    

    Matched = cv2.drawMatchesKnn(frame,
                             kp1,
                             template,
                             kp2,
                             matches,
                             outImg=None,
                             matchColor=(0, 155, 0),
                             singlePointColor=(0, 255, 255),
                             matchesMask=good_matches,
                             flags=0
                             )

    return good, Matched

In [None]:
import cv2
vidPath = "screw.mp4"
cap = cv2.VideoCapture(vidPath)
cap.set(cv2.CAP_PROP_POS_FRAMES, 100)
res, frame = cap.read()

cv2.imshow("frame", templates[0])
cv2.waitKey(0)
cv2.destroyAllWindows()

template = templates[3]
vidPath = "screw.mp4"
cap = cv2.VideoCapture(vidPath)
cap.set(cv2.CAP_PROP_POS_FRAMES, 100)
res, frame = cap.read()

# frame = cv2.imread("geeks-full.png")
# template = cv2.imread("geeks-half.jpg")

good, matched = getTemplateMatchesSIFT(frame, template, 0.5)
cv2.imshow("test", matched)
cv2.waitKey(0)
cv2.destroyAllWindows()