In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import imutils
import os
from glob import glob
import re
from sklearn.cluster import AgglomerativeClustering
from sklearn.cluster import DBSCAN
import math

In [4]:
def motion_comp(prev_frame, curr_frame, num_points=500, points_to_use=500, transform_type='affine'):
    """ Obtains new warped frame1 to account for camera (ego) motion
        Inputs:
            prev_frame - first image frame
            curr_frame - second sequential image frame
            num_points - number of feature points to obtain from the images
            points_to_use - number of point to use for motion translation estimation 
            transform_type - type of transform to use: either 'affine' or 'homography'
        Outputs:
            A - estimated motion translation matrix or homography matrix
            prev_points - feature points obtained on previous image
            curr_points - feature points obtaine on current image
        """
    transform_type = transform_type.lower()
    assert(transform_type in ['affine', 'homography'])

    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_RGB2GRAY)
    curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_RGB2GRAY)

    # get features for first frame
    corners = cv2.goodFeaturesToTrack(prev_gray, num_points, qualityLevel=0.01, minDistance=10)

    # get matching features in next frame with Sparse Optical Flow Estimation
    matched_corners, status, _ = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, corners, None)

    # reformat previous and current corner points
    prev_points = corners[status==1]
    curr_points = matched_corners[status==1]

    # sub sample number of points so we don't overfit
    if points_to_use > prev_points.shape[0]:
        points_to_use = prev_points.shape[0]

    index = np.random.choice(prev_points.shape[0], size=points_to_use, replace=False)
    prev_points_used = prev_points[index]
    curr_points_used = curr_points[index]

    # find transformation matrix from frame 1 to frame 2
    if transform_type == 'affine':
        A, _ = cv2.estimateAffine2D(prev_points_used, curr_points_used, method=cv2.RANSAC)
    elif transform_type == 'homography':
        A, _ = cv2.findHomography(prev_points_used, curr_points_used)

    return A, prev_points, curr_points

In [49]:
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))

def player_pipeline(path):
    # frame_files = sorted([f for f in os.listdir(path) if f.endswith('.jpg')])


    # first_frame_path = os.path.join(path, frame_files[0])
    # prev = cv2.imread(first_frame_path)
    
    #get frames from video
    cap = cv2.VideoCapture(path)
    ret, prev = cap.read()
    print(ret)

    while ret:
        
        # Read the frame
        #frame_path = os.path.join(path, frame_file)
        ret, frame = cap.read()

        A, prev_points, curr_points = motion_comp(prev, frame, num_points=10000, points_to_use=10000, transform_type='affine')
        transformed1 = cv2.warpAffine(prev, A, dsize=(prev.shape[:2][::-1]))
        comped_delta = cv2.subtract(frame, transformed1)
        comped_delta = cv2.cvtColor(comped_delta, cv2.COLOR_BGR2GRAY)
        ret, thresh = cv2.threshold(comped_delta, 70, 250, cv2.THRESH_BINARY)
        dilated = cv2.dilate(thresh, kernel, iterations=1)

        numLabels, labels, stats, centroids = cv2.connectedComponentsWithStats(dilated, connectivity=8, ltype=cv2.CV_32S)
        
        if len(centroids)>1:

            # Perform hierarchical clustering on component centers
            clusterer = AgglomerativeClustering(n_clusters=None, distance_threshold=52, linkage='single')
            clusters = clusterer.fit_predict(centroids)

            # Draw bounding boxes around clustered component centers
            output = frame.copy()
            #create a data structure to store the bounding boxes
            bounding_boxes = []
            for cluster_id in np.unique(clusters):
                cluster_indices = np.where(clusters == cluster_id)[0]
                cluster_centers = centroids[cluster_indices]
                min_x = int(np.min(cluster_centers[:, 0]))
                min_y = int(np.min(cluster_centers[:, 1]))
                max_x = int(np.max(cluster_centers[:, 0]))
                max_y = int(np.max(cluster_centers[:, 1]))

                # Calculate area of the bounding box
                area = (max_x - min_x) * (max_y - min_y)
                

                if area > 1000:
                    cv2.rectangle(output, (min_x - 5, min_y - 5), (max_x + 5, max_y + 5), (255, 0, 0), 2)
                    #store bounding boxes in the data array
                    bounding_boxes.append([min_x, min_y, max_x, max_y])
                    # Draw center
                    center_x, center_y = (min_x + max_x) // 2, (min_y + max_y) // 2
                    cv2.circle(output, (center_x, center_y), 4, (0, 255, 0), -1)

                    # Add label
                    cv2.putText(output, 'Player', (min_x, min_y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

            cv2.imshow("Output", output)
        else:
            cv2.imshow("Output", frame)
        prev = frame.copy()

        if cv2.waitKey(5) & 0xFF == ord('q'):
            break

    cv2.destroyAllWindows()

In [50]:
player_pipeline('/Users/Gloria/Desktop/LABS/TennisBounceDetector/videoin/veryshorttest.mp4')

True


error: OpenCV(4.9.0) /Users/xperience/GHA-OpenCV-Python2/_work/opencv-python/opencv-python/opencv/modules/imgproc/src/color.cpp:196: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'
