MODEL 1: ReID + YOLO

In [2]:
import cv2
import torch
import numpy as np
from ultralytics import YOLO
import torchreid
from torchreid import models as md
from torchreid import utils as ut
from torchvision import transforms
from scipy.spatial.distance import cosine

# Paths to input videos
video_path_a = "C:/Users/kaust/Desktop/Assignment/tacticam.mp4"
video_path_b = "C:/Users/kaust/Desktop/Assignment/broadcast.mp4"

# Initialize YOLO Detector
detector = YOLO("best.pt")

# Initialize Re-ID model (OSNet) - pretrained on person Re-ID datasets
reid_model = md.osnet_x1_0(num_classes=1000)
ut.load_pretrained_weights(reid_model, 'C:/Users/kaust/Desktop/Assignment/osnet_x1_0_imagenet.pth')
reid_model.eval().cuda()

# Transformation for Re-ID input
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((256, 128)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Video capture setup
cap_a = cv2.VideoCapture(video_path_a)
cap_b = cv2.VideoCapture(video_path_b)

# Check and set frame properties properly
width_a = int(cap_a.get(cv2.CAP_PROP_FRAME_WIDTH))
height_a = int(cap_a.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps_a = cap_a.get(cv2.CAP_PROP_FPS)

width_b = int(cap_b.get(cv2.CAP_PROP_FRAME_WIDTH))
height_b = int(cap_b.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps_b = cap_b.get(cv2.CAP_PROP_FPS)

# Ensure same resolution, or resize to match Video A
if (width_a, height_a) != (width_b, height_b):
    resize_video_b = True
else:
    resize_video_b = False

# Output writer: combined width (side by side)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out_width = width_a + width_b
out_height = max(height_a, height_b)
out_fps = min(fps_a, fps_b) if fps_a and fps_b else 20.0
out = cv2.VideoWriter('output_matched_video.mp4', fourcc, out_fps, (out_width, out_height))

def extract_embedding(image):
    img_tensor = transform(image).unsqueeze(0).cuda()
    with torch.no_grad():
        embedding = reid_model(img_tensor)
    return embedding.cpu().numpy().flatten()

def draw_boxes_with_labels(frame, detections, labels, color=(0, 255, 0)):
    for det, label in zip(detections, labels):
        x1, y1, x2, y2 = map(int, det)
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, f"ID: {label}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

while cap_a.isOpened() and cap_b.isOpened():
    ret_a, frame_a = cap_a.read()
    ret_b, frame_b = cap_b.read()

    if not ret_a or not ret_b:
        break

    if resize_video_b:
        frame_b = cv2.resize(frame_b, (width_a, height_a))

    # YOLO detection on both frames
    results_a = detector.predict(source=frame_a, conf=0.3, verbose=False)[0]
    results_b = detector.predict(source=frame_b, conf=0.3, verbose=False)[0]

    detections_a = results_a.boxes.xyxy.cpu().numpy()
    detections_b = results_b.boxes.xyxy.cpu().numpy()

    # Extract embeddings for detections
    embeddings_a = []
    for box in detections_a:
        x1, y1, x2, y2 = map(int, box)
        crop = frame_a[y1:y2, x1:x2]
        if crop.shape[0] == 0 or crop.shape[1] == 0:
            embeddings_a.append(np.zeros(512))  # Fallback to avoid size mismatch
            continue
        embeddings_a.append(extract_embedding(crop))

    embeddings_b = []
    for box in detections_b:
        x1, y1, x2, y2 = map(int, box)
        crop = frame_b[y1:y2, x1:x2]
        if crop.shape[0] == 0 or crop.shape[1] == 0:
            embeddings_b.append(np.zeros(512))
            continue
        embeddings_b.append(extract_embedding(crop))

    matched_labels_a = [-1 for _ in range(len(embeddings_a))]
    matched_labels_b = [-1 for _ in range(len(embeddings_b))]

    if len(embeddings_a) > 0 and len(embeddings_b) > 0:
        similarity_matrix = np.zeros((len(embeddings_a), len(embeddings_b)))
        for i, emb_a in enumerate(embeddings_a):
            for j, emb_b in enumerate(embeddings_b):
                similarity_matrix[i, j] = 1 - cosine(emb_a, emb_b)
        for i in range(len(embeddings_a)):
            best_j = np.argmax(similarity_matrix[i])
            best_i_back = np.argmax(similarity_matrix[:, best_j])
            if best_i_back == i:
                matched_labels_a[i] = best_j
                matched_labels_b[best_j] = i

    # Draw matched boxes
    draw_boxes_with_labels(frame_a, detections_a, matched_labels_a, color=(0, 255, 0))
    draw_boxes_with_labels(frame_b, detections_b, matched_labels_b, color=(0, 0, 255))

    # Combine frames side by side for visualization
    combined_frame = np.hstack((frame_a, frame_b))
    out.write(combined_frame)
    cv2.imshow("Matched Frames (Left: Video A, Right: Video B)", combined_frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Clean up
cap_a.release()
cap_b.release()
out.release()
cv2.destroyAllWindows()


Successfully loaded imagenet pretrained weights from "C:\Users\kaust/.cache\torch\checkpoints\osnet_x1_0_imagenet.pth"
Successfully loaded pretrained weights from "C:/Users/kaust/Desktop/Assignment/osnet_x1_0_imagenet.pth"


MODEL 2: ReID + YOLO + DeepSORT

In [3]:
import cv2
import torch
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
import torchreid
from torchreid import models as md
from torchreid import utils as ut
from torchvision import transforms
from scipy.spatial.distance import cosine

# ==== CONFIGURATION ====
video_path_a = "C:/Users/kaust/Desktop/Assignment/tacticam.mp4"
video_path_b = "C:/Users/kaust/Desktop/Assignment/broadcast.mp4"
output_a = "output_video_a_with_ids.mp4"
output_b = "output_video_b_with_ids.mp4"
yolo_model_path = "best.pt"
reid_weights_path = "C:/Users/kaust/Desktop/Assignment/osnet_x1_0_imagenet.pth"
confidence_threshold = 0.5
SIMILARITY_THRESHOLD = 0.6  # Adjust for tighter or looser matching

# ==== INITIALIZE MODELS ====
detector = YOLO(yolo_model_path)
tracker_a = DeepSort(max_age=30)
tracker_b = DeepSort(max_age=30)

reid_model = md.osnet_x1_0(num_classes=1000)
ut.load_pretrained_weights(reid_model, reid_weights_path)
reid_model.eval().cuda()

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((256, 128)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

cap_a = cv2.VideoCapture(video_path_a)
cap_b = cv2.VideoCapture(video_path_b)

width_a, height_a, fps_a = int(cap_a.get(3)), int(cap_a.get(4)), cap_a.get(cv2.CAP_PROP_FPS) or 20.0
width_b, height_b, fps_b = int(cap_b.get(3)), int(cap_b.get(4)), cap_b.get(cv2.CAP_PROP_FPS) or 20.0
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out_a = cv2.VideoWriter(output_a, fourcc, fps_a, (width_a, height_a))
out_b = cv2.VideoWriter(output_b, fourcc, fps_b, (width_b, height_b))

while cap_a.isOpened() and cap_b.isOpened():
    ret_a, frame_a = cap_a.read()
    ret_b, frame_b = cap_b.read()
    if not ret_a or not ret_b: break

    h_a, w_a, _ = frame_a.shape
    h_b, w_b, _ = frame_b.shape

    # ==== DETECTION ====
    preds_a = detector.predict(frame_a, conf=confidence_threshold, verbose=False)[0]
    preds_b = detector.predict(frame_b, conf=confidence_threshold, verbose=False)[0]

    detections_a = preds_a.boxes.xyxy.cpu().numpy() if preds_a.boxes.xyxy is not None else []
    detections_b = preds_b.boxes.xyxy.cpu().numpy() if preds_b.boxes.xyxy is not None else []
    confs_a = preds_a.boxes.conf.cpu().numpy() if preds_a.boxes.conf is not None else np.ones(len(detections_a))
    confs_b = preds_b.boxes.conf.cpu().numpy() if preds_b.boxes.conf is not None else np.ones(len(detections_b))

    dets_for_tracker_a = [[[float(x1), float(y1), float(x2), float(y2)], float(c), 0] for (x1, y1, x2, y2), c in zip(detections_a, confs_a)]
    dets_for_tracker_b = [[[float(x1), float(y1), float(x2), float(y2)], float(c), 0] for (x1, y1, x2, y2), c in zip(detections_b, confs_b)]

    tracks_a = tracker_a.update_tracks(dets_for_tracker_a, frame=frame_a)
    tracks_b = tracker_b.update_tracks(dets_for_tracker_b, frame=frame_b)

    # ==== EXTRACT EMBEDDINGS ====
    emb_a, ids_a, boxes_a = [], [], []
    for track in tracks_a:
        if not track.is_confirmed(): continue
        x1, y1, x2, y2 = map(int, track.to_ltrb())
        # Clip bounding boxes
        x1, y1, x2, y2 = max(0, x1), max(0, y1), min(w_a - 1, x2), min(h_a - 1, y2)
        crop = frame_a[y1:y2, x1:x2]
        if crop.shape[0] == 0 or crop.shape[1] == 0: continue
        img = transform(crop).unsqueeze(0).cuda()
        with torch.no_grad(): emb = reid_model(img)
        emb_a.append(emb.cpu().numpy().flatten())
        ids_a.append(track.track_id)
        boxes_a.append((x1, y1, x2, y2))

    emb_b, ids_b, boxes_b = [], [], []
    for track in tracks_b:
        if not track.is_confirmed(): continue
        x1, y1, x2, y2 = map(int, track.to_ltrb())
        # Clip bounding boxes
        x1, y1, x2, y2 = max(0, x1), max(0, y1), min(w_b - 1, x2), min(h_b - 1, y2)
        crop = frame_b[y1:y2, x1:x2]
        if crop.shape[0] == 0 or crop.shape[1] == 0: continue
        img = transform(crop).unsqueeze(0).cuda()
        with torch.no_grad(): emb = reid_model(img)
        emb_b.append(emb.cpu().numpy().flatten())
        ids_b.append(track.track_id)
        boxes_b.append((x1, y1, x2, y2))

    # ==== CROSS-VIDEO MATCHING with SIMILARITY THRESHOLD ====
    match_labels_a, match_labels_b = [-1]*len(emb_a), [-1]*len(emb_b)
    if len(emb_a) > 0 and len(emb_b) > 0:
        sim_matrix = np.array([[1 - cosine(ea, eb) for eb in emb_b] for ea in emb_a])
        for i in range(len(emb_a)):
            best_j = np.argmax(sim_matrix[i])
            if np.argmax(sim_matrix[:, best_j]) == i and sim_matrix[i, best_j] > SIMILARITY_THRESHOLD:
                match_labels_a[i] = ids_b[best_j]
                match_labels_b[best_j] = ids_a[i]

    # ==== DRAW RESULTS ====
    for (x1, y1, x2, y2), tid, mid in zip(boxes_a, ids_a, match_labels_a):
        cv2.rectangle(frame_a, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame_a, f"T:{tid} M:{mid}" if mid != -1 else f"T:{tid}", (x1, y1-10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    for (x1, y1, x2, y2), tid, mid in zip(boxes_b, ids_b, match_labels_b):
        cv2.rectangle(frame_b, (x1, y1), (x2, y2), (0, 0, 255), 2)
        cv2.putText(frame_b, f"T:{tid} M:{mid}" if mid != -1 else f"T:{tid}", (x1, y1-10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    out_a.write(frame_a)
    out_b.write(frame_b)
    cv2.imshow("Video A", frame_a)
    cv2.imshow("Video B", frame_b)
    if cv2.waitKey(1) & 0xFF == ord('q'): break

cap_a.release(); cap_b.release(); out_a.release(); out_b.release(); cv2.destroyAllWindows()



Successfully loaded imagenet pretrained weights from "C:\Users\kaust/.cache\torch\checkpoints\osnet_x1_0_imagenet.pth"
Successfully loaded pretrained weights from "C:/Users/kaust/Desktop/Assignment/osnet_x1_0_imagenet.pth"
