In [16]:
import onnx
import torch
from torchvision import transforms
import cv2
import numpy as np
from scipy.spatial.distance import cdist

from mlvot.tp1.KalmanFilter import KalmanFilter
from mlvot.tp2_3.tp2 import get_sim_matrix, get_assignments, update_tracks_with_ids
%matplotlib inline

In [3]:
class FeatureExtractor:
    def __init__(self, model_path="reid_osnet_x025_market1501.onnx", input_size=(64, 128)):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = onnx.load(model_path)
        self.model.eval()
        self.input_size = input_size
        self.transform = transforms.Compose([
            transforms.Resize(self.input_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    def preprocess_patch(self, patch):
        patch = cv2.cvtColor(patch, cv2.COLOR_BGR2RGB)
        return self.transform(patch).unsqueeze(0).to(self.device)

    def extract_features(self, im_crops):
        features = []
        for patch in im_crops:
            preprocessed_patch = self.preprocess_patch(patch)
            with torch.no_grad():
                feature = self.model(preprocessed_patch)
            features.append(feature.cpu().numpy())
        return np.vstack(features)


In [12]:
def compute_similarity(features1, features2, metric="cosine"):
    return 1 - cdist(features1, features2, metric)


In [13]:
def get_combined_sim_matrix(bbox1, features1, bbox2, features2, alpha=0.5, beta=0.5):
    iou_matrix = get_sim_matrix(bbox1, bbox2)
    feature_similarity_matrix = compute_similarity(features1, features2, metric="cosine")

    feature_similarity_matrix = (feature_similarity_matrix - feature_similarity_matrix.min()) / (
        feature_similarity_matrix.max() - feature_similarity_matrix.min()
    )

    return alpha * iou_matrix + beta * feature_similarity_matrix


In [None]:
def track_objects(video_path, feature_extractor):
    cap = cv2.VideoCapture(video_path)
    tracked_bboxs = []
    tracked_features = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Simulate detection step (Replace with actual detector)
        detections = []  # List of bounding boxes from the detector
        im_crops = []    # Extract image patches for detected bounding boxes

        # Extract features for detections
        detected_features = feature_extractor.extract_features(im_crops)

        # Calculate similarity matrix
        sim_matrix = get_combined_sim_matrix(tracked_bboxs, tracked_features, detections, detected_features)

        # Assign detections to tracked objects
        assignments = get_assignments(sim_matrix)  # Implement your assignment function

        # Update tracks
        update_tracks_with_ids(tracked_bboxs, detections, assignments)  # Implement your update logic

        # Update features for tracked objects
        tracked_features = [detected_features[i] for i in assignments]

        # Visualize or process frame (optional)
        # ...

    cap.release()


# Run the tracker
if __name__ == "__main__":
    feature_extractor = FeatureExtractor()
    video_path = "randomball.avi"
    track_objects(video_path, feature_extractor)

In [None]:
from mlvot.tp2_3.tp2 import save_video
from mlvot.tp1.Detector import detect
# Import libraries
import cv2
from time import sleep

# Main object tracking loop
def main(video_path, output_video_path):
    # Initialize Kalman Filter
    kf = KalmanFilter(0.1, 1, 1, 1, 0.1, 0.1)
    trajectory = []  # To store trajectory points
    frames = []  # To store video frames with tracking overlays

    # Open video file
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video file.")
        return

    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Process video frames
    while True:
        sleep(0.1)  # Slow down for visualization purposes
        ret, frame = cap.read()
        if not ret:
            print("End of video stream.")
            break

        # Detect objects in the frame
        centers = detect(frame)

        # Track objects using Kalman Filter
        for center in centers:
            # Kalman Filter Prediction
            kf.predict()
            predicted_x, predicted_y = map(int, kf.x_k[:2].flatten())
            cv2.rectangle(frame, (predicted_x - 10, predicted_y - 10),
                          (predicted_x + 10, predicted_y + 10), (255, 0, 0), 2)

            # Kalman Filter Update with detected center
            kf.update(center)
            estimated_x, estimated_y = map(int, kf.x_k[:2].flatten())
            cv2.rectangle(frame, (estimated_x - 10, estimated_y - 10),
                          (estimated_x + 10, estimated_y + 10), (0, 0, 255), 2)

            # Draw trajectory
            trajectory.append((estimated_x, estimated_y))
            for point in trajectory:
                cv2.circle(frame, point, 1, (0, 0, 0), -1)

            # Draw detected center
            cv2.circle(frame, (int(center[0]), int(center[1])), 5, (0, 255, 0), -1)

        # Append the processed frame for saving
        frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        # Display the frame
        cv2.imshow('Object Tracking', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            print("Terminating video processing...")
            break

    # Release resources
    cap.release()
    cv2.destroyAllWindows()

    # Save the output video
    save_video(frames, output_video_path, fps=fps, frame_size=(frame_width, frame_height))
    print(f"Processed video saved to {output_video_path}")

# Run the script
if __name__ == "__main__":
    video_input_path = "../tp1/randomball.avi"
    video_output_path = "tracked_output.avi"
    main(video_input_path, video_output_path)
