In [193]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import time

In [194]:
class CentroidTracker:
    def __init__(self, max_disappeared=50):
        self.next_object_id = 1
        self.objects = {}
        self.disappeared = {}
        self.max_disappeared = max_disappeared

    def register(self, centroid):
        self.objects[self.next_object_id] = centroid
        self.disappeared[self.next_object_id] = 0
        self.next_object_id += 1

    def deregister(self, object_id):
        del self.objects[object_id]
        del self.disappeared[object_id]

    def update(self, input_objects):
        if len(input_objects) == 0:
            for object_id in list(self.disappeared.keys()):
                self.disappeared[object_id] += 1

                if self.disappeared[object_id] > self.max_disappeared:
                    self.deregister(object_id)

            return self.objects, list(self.objects.keys())  # Return both objects and their IDs

        input_centroids = np.array([centroid for _, _, _, _, centroid in input_objects], dtype="int")

        if len(self.objects) == 0:
            for i in range(0, len(input_centroids)):
                self.register(input_centroids[i])

        else:
            object_ids = list(self.objects.keys())
            object_centroids = list(self.objects.values())

            D = cv2.spatial.distance.cdist(np.array(object_centroids), input_centroids)

            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            used_rows = set()
            used_cols = set()

            for (row, col) in zip(rows, cols):
                if row in used_rows or col in used_cols:
                    continue

                object_id = object_ids[row]
                self.objects[object_id] = input_centroids[col]
                self.disappeared[object_id] = 0

                used_rows.add(row)
                used_cols.add(col)

            unused_rows = set(range(0, D.shape[0])).difference(used_rows)
            unused_cols = set(range(0, D.shape[1])).difference(used_cols)

            for row in unused_rows:
                object_id = object_ids[row]
                self.disappeared[object_id] += 1

                if self.disappeared[object_id] > self.max_disappeared:
                    self.deregister(object_id)

            for col in unused_cols:
                self.register(input_centroids[col])

        return self.objects, list(self.objects.keys())  # Return both objects and their IDs

In [195]:
net = cv2.dnn.readNet("yolov3.weights", "yolov3.cfg")
layer_names = net.getUnconnectedOutLayersNames()
print(layer_names)

('yolo_82', 'yolo_94', 'yolo_106')


In [196]:
ct = CentroidTracker()

In [197]:
# Set the desired frame rate
desired_frame_rate = 60
delay = int(1000 / desired_frame_rate)  # Delay in milliseconds

In [198]:
new_width = 640
new_height = 480

In [199]:
# Save the processed video
output_path = "processed_video.avi"
fourcc = cv2.VideoWriter_fourcc(*"XVID")
output_video = cv2.VideoWriter(output_path, fourcc, desired_frame_rate, (new_width, new_height))

In [200]:
objects = []
trajectories = {}
velocities = {}
object_ids = []

In [201]:
confidence_threshold = 0

In [202]:
# Read input video
video_path = "Test_sample2.mp4"
cap = cv2.VideoCapture(video_path)

In [203]:
while True:
    start_time = time.time()

    ret, frame = cap.read()
    if not ret or frame is None:
        break

    # Perform object detection
    blob = cv2.dnn.blobFromImage(frame, 1/255.0, (320, 320), swapRB=True, crop=False)
    net.setInput(blob)
    detections = net.forward(layer_names)

    # Process detections and update tracker
    objects = []
    object_ids = []  # Initialize object_ids list
    for layer_name, detection in zip(layer_names, detections):
        for obj in detection:
            scores = obj[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]

            if confidence >= 0:
                # YOLO returns bounding box coordinates as a ratio of the image size
                box = obj[0:4] * np.array([frame.shape[1], frame.shape[0], frame.shape[1], frame.shape[0]])
                (startX, startY, endX, endY) = box.astype("int")

                # Extract centroid from bounding box
                centroid = (int((startX + endX) / 2), int((startY + endY) / 2))

                # Update tracker with the bounding box and centroid
                objects.append((startX, startY, endX, endY, centroid))

                # Update trajectory and velocity information
                if object_ids:
                    object_id = object_ids[0]  # Assuming only one object is detected
                    if object_id not in trajectories:
                        trajectories[object_id] = [centroid]
                        velocities[object_id] = [0, 0]  # Initialize velocity
                    else:
                        # Calculate velocity
                        prev_x, prev_y = trajectories[object_id][-1]
                        velocity_x = centroid[0] - prev_x
                        velocity_y = centroid[1] - prev_y
                        velocities[object_id].append((velocity_x, velocity_y))

                        # Update trajectory
                        trajectories[object_id].append(centroid)
    # Update the centroid tracker
    objects, object_ids = ct.update(objects)

    # Draw bounding boxes and object IDs on the frame
    for i in range(len(objects)):
        if isinstance(objects[i], tuple):  # Check if the object is a tuple
            startX, startY, endX, endY, centroid = objects[i]
            object_id = object_ids[i]

            cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)
            cv2.circle(frame, centroid, 2, (0, 255, 0), -1)
            cv2.putText(frame, f"ID {object_id}", (centroid[0] - 10, centroid[1] - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)



    # Save the frame to the output video
    output_video.write(frame)

    # Display the results
    cv2.imshow("Frame", frame)

    # Calculate the time elapsed and sleep to achieve the desired frame rate
    elapsed_time = time.time() - start_time
    sleep_time = max(0, (delay - int(elapsed_time * 1000)))  # Ensure non-negative sleep time
    time.sleep(sleep_time / 1000)  # Sleep in seconds

    key = cv2.waitKey(1) & 0xFF
    if key == ord("q"):
        break


KeyError: 0

In [None]:
cap.release()
if output_video is not None:
    output_video.release()
cv2.destroyAllWindows()

# Plot trajectory and velocity
for object_id, trajectory in trajectories.items():
    plt.figure(figsize=(10, 5))

    # Plot Trajectory
    plt.subplot(1, 2, 1)
    trajectory = np.array(trajectory)
    plt.plot(trajectory[:, 0], trajectory[:, 1], label=f'Object {object_id}')
    plt.title('Trajectory')
    plt.xlabel('X-coordinate')
    plt.ylabel('Y-coordinate')
    plt.legend()

    # Plot Velocity
    plt.subplot(1, 2, 2)
    velocity = np.array(velocities[object_id])
    plt.plot(velocity[:, 0], label='Velocity X')
    plt.plot(velocity[:, 1], label='Velocity Y')
    plt.title('Velocity')
    plt.xlabel('Frame')
    plt.ylabel('Velocity')
    plt.legend()

    plt.show()

In [None]:
velocities

{}

In [None]:
objects

{}