In [1]:
%pip install opencv-python

Note: you may need to restart the kernel to use updated packages.


In [2]:
import cv2      # OpenCV for detection of objects

In [41]:
# The "tracker.py"
import math


class EuclideanDistTracker:
    def __init__(self):
        # Store the center positions of the objects
        self.center_points = {}
        # Keep the count of the IDs
        # each time a new object id detected, the count will increase by one
        self.id_count = 0


    def update(self, objects_rect):
        # Objects boxes and ids
        objects_bbs_ids = []

        # Get center point of new object
        for rect in objects_rect:
            x, y, w, h = rect
            cx = (x + x + w) // 2
            cy = (y + y + h) // 2

            # Find out if that object was detected already
            same_object_detected = False
            for id, pt in self.center_points.items():
                #dist = math.hypot(cx - pt[0], cy - pt[1])
                distsq = (cx - pt[0])**2 + (cy - pt[1])**2

                #if dist < 25:
                if distsq < 625:
                #if distsq < 10300:
                    self.center_points[id] = (cx, cy)
                    print(self.center_points)
                    objects_bbs_ids.append([x, y, w, h, id])
                    same_object_detected = True
                    break

            # New object is detected we assign the ID to that object
            if same_object_detected is False:
                self.center_points[self.id_count] = (cx, cy)
                objects_bbs_ids.append([x, y, w, h, self.id_count])
                self.id_count += 1

        # Clean the dictionary by center points to remove IDS not used anymore
        new_center_points = {}
        for obj_bb_id in objects_bbs_ids:
            _, _, _, _, object_id = obj_bb_id
            center = self.center_points[object_id]
            new_center_points[object_id] = center

        # Update dictionary with IDs not used removed
        self.center_points = new_center_points.copy()
        return objects_bbs_ids


In [17]:
#importing the video
cap = cv2.VideoCapture("luxonis_task_video.mp4")

In [19]:
# Object detection from Stable camera
object_detector = cv2.createBackgroundSubtractorMOG2(history=100, varThreshold=40)


In [31]:
#importing the video, has to be in same cell
cap = cv2.VideoCapture("luxonis_task_video.mp4")

# Object detection from Stable camera
object_detector = cv2.createBackgroundSubtractorMOG2(history=1000, varThreshold=500)

# going through the frames and extracting it
while True:
    ret, frame = cap.read()

    mask = object_detector.apply(frame)
    _, mask = cv2.threshold(mask, 254, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    #detections = []
    
    for cnt in contours:
        cv2.drawContours(frame, [cnt], -1, (255, 0, 255), 2)

    # This shows each frame
    #cv2.imshow("roi", roi)
    cv2.imshow("Frame", frame)
    cv2.imshow("Mask", mask)

    # key 27 is ESC, this will break the loop
    key = cv2.waitKey(30)
    if key == 27:
        break

cap.release()
cv2.destroyAllWindows()

In [44]:
# Version 2, with tracker

# Create tracker object
tracker = EuclideanDistTracker()

#importing the video, has to be in same cell
cap = cv2.VideoCapture("luxonis_task_video.mp4")

# Object detection from Stable camera
object_detector = cv2.createBackgroundSubtractorMOG2(history=1000, varThreshold=100)

# going through the frames and extracting it
while True:
    ret, frame = cap.read()

    mask = object_detector.apply(frame)

    _, mask = cv2.threshold(mask, 254, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    
    detections = []
    
    for cnt in contours:
        #cv2.drawContours(frame, [cnt], -1, (255, 0, 255), 2)

        x, y, w, h = cv2.boundingRect(cnt)
        detections.append([x, y, w, h])

    # 2. Object Tracking
    boxes_ids = tracker.update(detections)

    # we extract each object in this frame from the boxes_ids
    for box_id in boxes_ids:
        x, y, w, h, id = box_id

        cv2.putText(frame, str(id), (x, y - 15), cv2.FONT_HERSHEY_PLAIN, 2, (255, 0, 0), 2)
        
        # Plots the rectangle of each bounding box
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 3)
    

    # This shows each frame
    #cv2.imshow("roi", roi)
    cv2.imshow("Frame", frame)
    cv2.imshow("Mask", mask)

    # key 27 is ESC, this will break the loop
    key = cv2.waitKey(30)
    if key == 27:
        break

cap.release()
cv2.destroyAllWindows()

ValueError: operands could not be broadcast together with shapes (0,1) (1,196,2) 

In [45]:
import cv2
import numpy as np

# Version 3, with improved detection

# Create tracker object
tracker = EuclideanDistTracker()

# Importing the video, has to be in the same cell
cap = cv2.VideoCapture("luxonis_task_video.mp4")

# Object detection from Stable camera
object_detector = cv2.createBackgroundSubtractorMOG2(history=1000, varThreshold=100)

def detect_circles(frame):
    """Detect circles in the frame using HoughCircles."""
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blurred = cv2.medianBlur(gray, 5)

    # Detect circles using HoughCircles
    circles = cv2.HoughCircles(blurred, cv2.HOUGH_GRADIENT, dp=1.2, minDist=20,
                               param1=50, param2=30, minRadius=10, maxRadius=100)
    detections = []
    if circles is not None:
        circles = np.round(circles[0, :]).astype("int")
        for (x, y, r) in circles:
            detections.append([x-r, y-r, 2*r, 2*r])  # x, y, width, height
    return detections

def detect_rectangles(mask):
    """Detect rectangles using contours."""
    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    detections = []
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        if w > 20 and h > 20:  # Filter out small noise
            detections.append([x, y, w, h])
    return detections

# Going through the frames and extracting them
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Apply background subtraction and threshold
    mask = object_detector.apply(frame)
    _, mask = cv2.threshold(mask, 254, 255, cv2.THRESH_BINARY)

    # Detect circles
    circle_detections = detect_circles(frame)
    
    # Detect rectangles (potential squares)
    rect_detections = detect_rectangles(mask)

    # Combine both detections
    detections = circle_detections + rect_detections

    # Object Tracking
    boxes_ids = tracker.update(detections)

    # We extract each object in this frame from the boxes_ids
    for box_id in boxes_ids:
        x, y, w, h, id = box_id
        cv2.putText(frame, f'ID: {id}', (x, y - 15), cv2.FONT_HERSHEY_PLAIN, 2, (255, 0, 0), 2)
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 3)

    # Show each frame
    cv2.imshow("Frame", frame)
    cv2.imshow("Mask", mask)

    # key 27 is ESC, this will break the loop
    key = cv2.waitKey(30)
    if key == 27:
        break

cap.release()
cv2.destroyAllWindows()


ValueError: operands could not be broadcast together with shapes (0,1) (1,5,2) 

In [49]:
# New tracker with Kalman Filter and Hungarian Algorithm
import cv2
import numpy as np
from scipy.optimize import linear_sum_assignment

class KalmanFilter:
    def __init__(self, x, y, w, h):
        self.kalman = cv2.KalmanFilter(4, 2)
        self.kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
        self.kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
        self.kalman.processNoiseCov = np.eye(4, dtype=np.float32) * 0.03

        self.kalman.statePre = np.array([[x], [y], [0], [0]], np.float32)
        self.kalman.statePost = np.array([[x], [y], [0], [0]], np.float32)

        self.predicted = (x, y)
        self.w = w
        self.h = h
        self.missed_frames = 0

    def predict(self):
        pred = self.kalman.predict()
        self.predicted = (int(pred[0]), int(pred[1]))
        return self.predicted

    def correct(self, x, y):
        self.kalman.correct(np.array([[x], [y]], np.float32))

class ObjectTracker:
    def __init__(self):
        self.trackers = {}
        self.id_count = 0

    def update(self, detections):
        # Predict the next position for each tracker
        for obj_id, tracker in list(self.trackers.items()):
            predicted_position = tracker.predict()
            tracker.missed_frames += 1

            # Remove trackers that have missed for too many frames
            if tracker.missed_frames > 5:
                del self.trackers[obj_id]

        # If there are no trackers or detections, skip the distance matrix calculation
        if len(self.trackers) == 0 or len(detections) == 0:
            # If there are no trackers, initialize a new one for each detection
            if len(self.trackers) == 0:
                for detection in detections:
                    self.trackers[self.id_count] = KalmanFilter(detection[0], detection[1], detection[2], detection[3])
                    self.id_count += 1

            # If there are no detections, return an empty list (no updates)
            return []

        # Get current tracker IDs and predicted positions
        object_ids = list(self.trackers.keys())
        predicted_positions = np.array([self.trackers[obj_id].predicted for obj_id in object_ids])

        # Calculate the center of the detected positions
        detected_positions = np.array([(d[0] + d[2] // 2, d[1] + d[3] // 2) for d in detections])

        # Compute the distance matrix
        distance_matrix = np.linalg.norm(predicted_positions[:, None] - detected_positions[None, :], axis=2)

        # Solve the assignment problem using the Hungarian algorithm
        row_ind, col_ind = linear_sum_assignment(distance_matrix)

        assigned_ids = set()

        for r, c in zip(row_ind, col_ind):
            if distance_matrix[r, c] < 100:  # Threshold to consider a match
                self.trackers[object_ids[r]].correct(detected_positions[c][0], detected_positions[c][1])
                self.trackers[object_ids[r]].missed_frames = 0
                assigned_ids.add(c)

        # Add new trackers for unassigned detections
        for i, detection in enumerate(detections):
            if i not in assigned_ids:
                self.trackers[self.id_count] = KalmanFilter(detection[0], detection[1], detection[2], detection[3])
                self.id_count += 1

        # Collect tracking results
        results = []
        for obj_id, tracker in self.trackers.items():
            x, y = tracker.predicted
            w = tracker.w
            h = tracker.h
            results.append([x - w // 2, y - h // 2, w, h, obj_id])

        return results



In [46]:
# version 4, Kalma Filter tracker integration

# Main script with detection and tracking
def detect_circles(frame):
    """Detect circles in the frame using HoughCircles."""
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blurred = cv2.medianBlur(gray, 5)

    # Detect circles using HoughCircles
    circles = cv2.HoughCircles(blurred, cv2.HOUGH_GRADIENT, dp=1.2, minDist=20,
                               param1=50, param2=30, minRadius=10, maxRadius=100)
    detections = []
    if circles is not None:
        circles = np.round(circles[0, :]).astype("int")
        for (x, y, r) in circles:
            detections.append([x-r, y-r, 2*r, 2*r])  # x, y, width, height
    return detections

def detect_rectangles(mask):
    """Detect rectangles using contours."""
    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    detections = []
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        if w > 20 and h > 20:  # Filter out small noise
            detections.append([x, y, w, h])
    return detections

In [51]:
# Create tracker object
tracker = ObjectTracker()

# Importing the video, has to be in the same cell
cap = cv2.VideoCapture("luxonis_task_video.mp4")

# Object detection from stable camera
object_detector = cv2.createBackgroundSubtractorMOG2(history=1000, varThreshold=100)

# Going through the frames and extracting them
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Apply background subtraction and threshold
    mask = object_detector.apply(frame)
    _, mask = cv2.threshold(mask, 254, 255, cv2.THRESH_BINARY)

    # Detect circles
    circle_detections = detect_circles(frame)
    
    # Detect rectangles (potential squares)
    rect_detections = detect_rectangles(mask)

    # Combine both detections
    detections = circle_detections + rect_detections

    # Object Tracking
    boxes_ids = tracker.update(detections)

    # We extract each object in this frame from the boxes_ids
    for box_id in boxes_ids:
        x, y, w, h, id = box_id
        cv2.putText(frame, f'ID: {id}', (x, y - 15), cv2.FONT_HERSHEY_PLAIN, 2, (255, 0, 0), 2)
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 3)

    # Show each frame
    cv2.imshow("Frame", frame)
    cv2.imshow("Mask", mask)

    # key 27 is ESC, this will break the loop
    key = cv2.waitKey(30)
    if key == 27:
        break

cap.release()
cv2.destroyAllWindows()

  self.predicted = (int(pred[0]), int(pred[1]))


In [54]:
# attempt with reducing noise

import cv2
import numpy as np

# Define kernel for morphological operations
kernel = np.ones((5, 5), np.uint8)

# Create tracker object (assuming you're using the Kalman tracker described earlier)
tracker = ObjectTracker()

cap = cv2.VideoCapture("luxonis_task_video.mp4")

# Object detection using a background subtractor
object_detector = cv2.createBackgroundSubtractorMOG2(history=1000, varThreshold=100)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    mask = object_detector.apply(frame)

    # Apply morphological operations to clean up the mask
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)

    # Threshold the mask
    _, mask = cv2.threshold(mask, 254, 255, cv2.THRESH_BINARY)

    # Find contours in the mask
    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    detections = []

    # Filter and process contours
    for cnt in contours:
        area = cv2.contourArea(cnt)
        if area > 500:  # Filter out small areas, adjust as needed
            epsilon = 0.02 * cv2.arcLength(cnt, True)
            approx = cv2.approxPolyDP(cnt, epsilon, True)
            if len(approx) == 4:
                x, y, w, h = cv2.boundingRect(cnt)
                detections.append([x, y, w, h])
            elif len(approx) > 8:  # If it looks like a circle
                (x, y), radius = cv2.minEnclosingCircle(cnt)
                x = int(x - radius)
                y = int(y - radius)
                w = h = int(2 * radius)
                detections.append([x, y, w, h])

    # Object tracking
    boxes_ids = tracker.update(detections)

    # Draw results
    for box_id in boxes_ids:
        x, y, w, h, obj_id = box_id
        cv2.putText(frame, str(obj_id), (x, y - 15), cv2.FONT_HERSHEY_PLAIN, 2, (255, 0, 0), 2)
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 3)

    cv2.imshow("Frame", frame)
    cv2.imshow("Mask", mask)

    key = cv2.waitKey(30)
    if key == 27:  # ESC to break
        break

cap.release()
cv2.destroyAllWindows()


  self.predicted = (int(pred[0]), int(pred[1]))


In [60]:
# attempt idk, provided with video

import cv2
import numpy as np

# Create tracker object
tracker = ObjectTracker()

cap = cv2.VideoCapture("luxonis_task_video.mp4")

# Define kernel for morphological operations
kernel = np.ones((5, 5), np.uint8)

# Object detection using a background subtractor
object_detector = cv2.createBackgroundSubtractorMOG2(history=1000, varThreshold=100)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    mask = object_detector.apply(frame)

    # Apply morphological operations to clean up the mask
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)

    # Threshold the mask
    _, mask = cv2.threshold(mask, 254, 255, cv2.THRESH_BINARY)

    # Find contours in the mask for square detection
    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    detections = []

    # Detect squares based on contours
    for cnt in contours:
        area = cv2.contourArea(cnt)
        if area > 500:  # Filter out small areas, adjust as needed
            epsilon = 0.02 * cv2.arcLength(cnt, True)
            approx = cv2.approxPolyDP(cnt, epsilon, True)
            if len(approx) == 4:
                x, y, w, h = cv2.boundingRect(cnt)
                detections.append([x, y, w, h])

    # Detect circles using HoughCircles
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, dp=1.2, minDist=50,
                               param1=50, param2=30, minRadius=10, maxRadius=100)

    if circles is not None:
        circles = np.round(circles[0, :]).astype("int")
        for (x, y, r) in circles:
            detections.append([x - r, y - r, 2 * r, 2 * r])

    # Object tracking
    boxes_ids = tracker.update(detections)

    # Draw results
    for box_id in boxes_ids:
        x, y, w, h, obj_id = box_id
        cv2.putText(frame, str(obj_id), (x, y - 15), cv2.FONT_HERSHEY_PLAIN, 2, (255, 0, 0), 2)
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 3)

    cv2.imshow("Frame", frame)
    cv2.imshow("Mask", mask)

    key = cv2.waitKey(30)
    if key == 27:  # ESC to break
        break

cap.release()
cv2.destroyAllWindows()


  self.predicted = (int(pred[0]), int(pred[1]))
