In [17]:
!pip install ultralytics




Bot-Sort

In [18]:
!pip install ultralytics opencv-python numpy scipy scikit-learn



In [19]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
from scipy.optimize import linear_sum_assignment
from sklearn.metrics.pairwise import cosine_similarity

In [20]:
TRAJ_LEN = 30
EMA_ALPHA = 0.85
APPEARANCE_WEIGHT = 0.7
MOTION_WEIGHT = 0.3
MATCH_THRESHOLD = 0.45
PLAYER_CLASS_ID = 2

In [21]:
def extract_embedding(frame, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    crop = frame[y1:y2, x1:x2]

    if crop.size == 0:
        return np.zeros(512)

    hist = cv2.calcHist(
        [crop], [0, 1, 2], None,
        [8, 8, 8],
        [0, 256, 0, 256, 0, 256]
    )

    cv2.normalize(hist, hist)
    emb = hist.flatten()
    emb = emb / (np.linalg.norm(emb) + 1e-6)

    return emb

In [22]:
def extract_embedding(frame, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    crop = frame[y1:y2, x1:x2]

    if crop.size == 0:
        return np.zeros(512)

    hist = cv2.calcHist(
        [crop], [0, 1, 2], None,
        [8, 8, 8],
        [0, 256, 0, 256, 0, 256]
    )

    cv2.normalize(hist, hist)
    emb = hist.flatten()
    emb = emb / (np.linalg.norm(emb) + 1e-6)

    return emb

In [23]:
def get_center(bbox):
    x1, y1, x2, y2 = bbox
    return ((x1 + x2) // 2, (y1 + y2) // 2)

In [24]:
def extract_embedding(frame, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    crop = frame[y1:y2, x1:x2]

    if crop.size == 0:
        return np.zeros(512)

    hist = cv2.calcHist(
        [crop], [0, 1, 2], None,
        [8, 8, 8],
        [0, 256, 0, 256, 0, 256]
    )

    cv2.normalize(hist, hist)
    emb = hist.flatten()
    emb = emb / (np.linalg.norm(emb) + 1e-6)

    return emb


def get_center(bbox):
    x1, y1, x2, y2 = bbox
    return ((x1 + x2) // 2, (y1 + y2) // 2)

In [25]:
class CameraProcessor:

    def __init__(self, model_path):
        self.model = YOLO(model_path)
        self.identity_db = {}

    def process_video(self, video_path):

        results = self.model.track(
            source=video_path,
            tracker="botsort.yaml",
            persist=True,
            classes=[PLAYER_CLASS_ID],
            stream=True
        )

        for result in results:

            frame = result.orig_img
            boxes = result.boxes

            if boxes.id is None:
                continue

            for box, track_id in zip(boxes.xyxy, boxes.id):

                track_id = int(track_id.item())
                x1, y1, x2, y2 = map(int, box.tolist())

                bbox = (x1, y1, x2, y2)
                center = get_center(bbox)
                embedding = extract_embedding(frame, bbox)

                self.update_identity(track_id, embedding, center)

    def update_identity(self, track_id, embedding, center):

        if track_id not in self.identity_db:
            self.identity_db[track_id] = {
                "embedding": embedding,
                "trajectory": deque(maxlen=TRAJ_LEN),
                "velocity": np.zeros(2)
            }
        else:
            # EMA smoothing
            self.identity_db[track_id]["embedding"] = (
                EMA_ALPHA * self.identity_db[track_id]["embedding"]
                + (1 - EMA_ALPHA) * embedding
            )

        traj = self.identity_db[track_id]["trajectory"]

        if len(traj) > 0:
            prev = np.array(traj[-1])
            velocity = np.array(center) - prev
            self.identity_db[track_id]["velocity"] = velocity

        traj.append(center)

    def get_database(self):
        return self.identity_db


Cross Camera Matching

In [26]:
def compute_motion_distance(v1, v2):

    if np.linalg.norm(v1) == 0 or np.linalg.norm(v2) == 0:
        return 1.0

    direction_sim = cosine_similarity([v1], [v2])[0][0]
    speed_diff = abs(np.linalg.norm(v1) - np.linalg.norm(v2))

    return (1 - direction_sim) + 0.01 * speed_diff


def cross_camera_match(db_x, db_y):

    ids_x = list(db_x.keys())
    ids_y = list(db_y.keys())

    if not ids_x or not ids_y:
        return {}

    cost_matrix = np.zeros((len(ids_x), len(ids_y)))

    for i, id_x in enumerate(ids_x):
        for j, id_y in enumerate(ids_y):

            emb_x = db_x[id_x]["embedding"]
            emb_y = db_y[id_y]["embedding"]

            appearance_dist = 1 - cosine_similarity(
                [emb_x], [emb_y]
            )[0][0]

            motion_dist = compute_motion_distance(
                db_x[id_x]["velocity"],
                db_y[id_y]["velocity"]
            )

            cost = (
                APPEARANCE_WEIGHT * appearance_dist +
                MOTION_WEIGHT * motion_dist
            )

            cost_matrix[i, j] = cost

    row_ind, col_ind = linear_sum_assignment(cost_matrix)

    matches = {}

    for r, c in zip(row_ind, col_ind):
        if cost_matrix[r, c] < MATCH_THRESHOLD:
            matches[ids_y[c]] = ids_x[r]

    return matches

Main Loop

In [27]:
import cv2
import numpy as np
from ultralytics import YOLO

In [28]:
PLAYER_CLASS_ID = 2  # Replace with your player class ID in YOLO
MODEL_PATH = "/content/drive/MyDrive/best.pt"

In [29]:
class CameraProcessor:
    def __init__(self, model_path):
        self.model = YOLO(model_path)
        self.db = {}  # {track_id: {"embedding":..., "frames":[], "bbox_history":[], "centers":[]}}

    def get_database(self):
        return self.db

    def get_histogram(self, frame, bbox):
        x1, y1, x2, y2 = map(int, bbox)
        crop = frame[y1:y2, x1:x2]
        if crop.size == 0:
            return np.zeros((512,))
        hist = cv2.calcHist([crop], [0,1,2], None, [8,8,8], [0,256,0,256,0,256])
        cv2.normalize(hist, hist)
        return hist.flatten()

    def process_video(self, video_path):
        cap = cv2.VideoCapture(video_path)
        frame_idx = 0

        results_stream = self.model.track(
            source=video_path,
            tracker="botsort.yaml",
            persist=True,
            classes=[PLAYER_CLASS_ID],
            stream=True
        )

        for result in results_stream:
            frame = result.orig_img
            boxes = result.boxes
            frame_idx += 1

            if boxes.id is not None:
                for box, track_id in zip(boxes.xyxy, boxes.id):
                    track_id = int(track_id.item())
                    x1, y1, x2, y2 = map(int, box.tolist())
                    center = ((x1+x2)//2, (y1+y2)//2)
                    hist = self.get_histogram(frame, (x1, y1, x2, y2))
                    hist = hist / np.linalg.norm(hist) if np.linalg.norm(hist) > 0 else hist

                    if track_id not in self.db:
                        self.db[track_id] = {
                            "embedding": hist,
                            "frames": [],
                            "bbox_history": [],
                            "centers": []
                        }

                    # Append per-frame data
                    self.db[track_id]["frames"].append(frame_idx)
                    self.db[track_id]["bbox_history"].append((x1, y1, x2, y2))
                    self.db[track_id]["centers"].append(center)

                    # Moving average for embedding
                    self.db[track_id]["embedding"] = 0.9*self.db[track_id]["embedding"] + 0.1*hist

        cap.release()


In [30]:
def cross_camera_match(db_x, db_y, threshold=0.7):
    matches = {}
    for y_id, y_data in db_y.items():
        best_score = float('inf')
        best_x_id = None
        for x_id, x_data in db_x.items():
            emb_x = x_data["embedding"] / np.linalg.norm(x_data["embedding"])
            emb_y = y_data["embedding"] / np.linalg.norm(y_data["embedding"])
            score = 1 - np.dot(emb_x, emb_y)  # 1 - cosine similarity

            if score < best_score:
                best_score = score
                best_x_id = x_id

        if best_score < (1 - threshold):
            matches[y_id] = best_x_id

    return matches

In [31]:

cam_x = CameraProcessor(MODEL_PATH)
cam_y = CameraProcessor(MODEL_PATH)

print("Processing Camera X...")
cam_x.process_video("/content/drive/MyDrive/broadcast.mp4")

print("Processing Camera Y...")
cam_y.process_video("/content/drive/MyDrive/tacticam.mp4")


print("\nMatching identities...")
matches = cross_camera_match(
    cam_x.get_database(),
    cam_y.get_database()
)

print("\nCross-Camera ID Mapping:")
for y_id, x_id in matches.items():
    print(f"Camera Y ID {y_id} → Camera X ID {x_id}")


global_map_x = {}
global_map_y = {}

next_global_id = 1
for y_id, x_id in matches.items():
    global_map_x[x_id] = next_global_id
    global_map_y[y_id] = next_global_id
    next_global_id += 1


def write_output_video(video_path, output_path, global_map):
    model = YOLO(MODEL_PATH)
    cap = cv2.VideoCapture(video_path)

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    out = cv2.VideoWriter(
        output_path,
        cv2.VideoWriter_fourcc(*"mp4v"),
        fps,
        (width, height)
    )

    results_stream = model.track(
        source=video_path,
        tracker="botsort.yaml",
        persist=True,
        classes=[PLAYER_CLASS_ID],
        stream=True
    )

    for result in results_stream:
        frame = result.orig_img
        boxes = result.boxes

        if boxes.id is not None:
            for box, track_id in zip(boxes.xyxy, boxes.id):
                track_id = int(track_id.item())
                x1, y1, x2, y2 = map(int, box.tolist())

                if track_id in global_map:
                    gid = global_map[track_id]
                    label = f"GLOBAL ID: {gid}"
                else:
                    label = "GLOBAL ID: ?"

                cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
                cv2.putText(frame, label,
                            (x1, y1-10),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            0.7,
                            (0,255,0),
                            2)

        out.write(frame)

    cap.release()
    out.release()

print("\nWriting output videos...")
write_output_video("/content/drive/MyDrive/broadcast.mp4",
                   "/content/broadcast_with_global_ids.mp4",
                   global_map_x)

write_output_video("/content/drive/MyDrive/tacticam.mp4",
                   "/content/tacticam_with_global_ids.mp4",
                   global_map_y)

print("\nDone. Videos saved.")



Processing Camera X...

video 1/1 (frame 1/132) /content/drive/MyDrive/broadcast.mp4: 384x640 3 players, 67.1ms
video 1/1 (frame 2/132) /content/drive/MyDrive/broadcast.mp4: 384x640 1 player, 43.7ms
video 1/1 (frame 3/132) /content/drive/MyDrive/broadcast.mp4: 384x640 5 players, 50.2ms
video 1/1 (frame 4/132) /content/drive/MyDrive/broadcast.mp4: 384x640 2 players, 42.7ms
video 1/1 (frame 5/132) /content/drive/MyDrive/broadcast.mp4: 384x640 4 players, 40.4ms
video 1/1 (frame 6/132) /content/drive/MyDrive/broadcast.mp4: 384x640 12 players, 40.5ms
video 1/1 (frame 7/132) /content/drive/MyDrive/broadcast.mp4: 384x640 11 players, 40.2ms
video 1/1 (frame 8/132) /content/drive/MyDrive/broadcast.mp4: 384x640 11 players, 44.9ms
video 1/1 (frame 9/132) /content/drive/MyDrive/broadcast.mp4: 384x640 11 players, 42.2ms
video 1/1 (frame 10/132) /content/drive/MyDrive/broadcast.mp4: 384x640 12 players, 39.4ms
video 1/1 (frame 11/132) /content/drive/MyDrive/broadcast.mp4: 384x640 11 players, 39.4ms
v

Evalute

In [32]:

db_x = cam_x.get_database()
db_y = cam_y.get_database()

print("\n======================================")
print(" CROSS-CAMERA SYSTEM EVALUATION")
print("======================================\n")

# 1️⃣ Cosine similarity
similarities = []
for y_id, x_id in matches.items():
    if x_id in db_x and y_id in db_y:
        emb_x = db_x[x_id]["embedding"]
        emb_y = db_y[y_id]["embedding"]
        emb_x = emb_x / np.linalg.norm(emb_x)
        emb_y = emb_y / np.linalg.norm(emb_y)
        sim = np.dot(emb_x, emb_y)
        similarities.append(sim)

if similarities:
    print("Average Cosine Similarity:", round(np.mean(similarities),4))
    print("Min Similarity:", round(np.min(similarities),4))
    print("Max Similarity:", round(np.max(similarities),4))
    print("Std Dev:", round(np.std(similarities),4))
else:
    print("No matches found.")

# Matching coverage
total_tracks_y = len(db_y)
matched_tracks = len(matches)
unmatched_tracks = total_tracks_y - matched_tracks
print("\nMatching Coverage:")
print("Total Tracks in Camera Y:", total_tracks_y)
print("Matched Tracks:", matched_tracks)
print("Unmatched Tracks:", unmatched_tracks)
print("Coverage %:", round((matched_tracks / total_tracks_y)*100,2))

#Average track length
def compute_avg_length(db):
    lengths = [len(db[t]["frames"]) for t in db]
    return np.mean(lengths), np.min(lengths), np.max(lengths)

avg_x, min_x, max_x = compute_avg_length(db_x)
avg_y, min_y, max_y = compute_avg_length(db_y)

print("\nTrack Length Statistics:")
print("Camera X → Avg:", round(avg_x,2), "Min:", min_x, "Max:", max_x)
print("Camera Y → Avg:", round(avg_y,2), "Min:", min_y, "Max:", max_y)

#Track stability (motion jitter)
def compute_stability(db):
    displacements = []
    for t in db:
        centers = db[t]["centers"]
        if len(centers)>1:
            for i in range(1, len(centers)):
                d = np.linalg.norm(np.array(centers[i]) - np.array(centers[i-1]))
                displacements.append(d)
    return np.mean(displacements), np.std(displacements)

mean_x, std_x = compute_stability(db_x)
mean_y, std_y = compute_stability(db_y)

print("\nTrack Stability (Lower = Smoother):")
print("Camera X → Avg Motion:", round(mean_x,2), "Std:", round(std_x,2))
print("Camera Y → Avg Motion:", round(mean_y,2), "Std:", round(std_y,2))

print("\nEvaluation Complete.")


 CROSS-CAMERA SYSTEM EVALUATION

Average Cosine Similarity: 0.9132
Min Similarity: 0.7465
Max Similarity: 0.9916
Std Dev: 0.0708

Matching Coverage:
Total Tracks in Camera Y: 30
Matched Tracks: 27
Unmatched Tracks: 3
Coverage %: 90.0

Track Length Statistics:
Camera X → Avg: 51.8 Min: 1 Max: 120
Camera Y → Avg: 142.17 Min: 1 Max: 201

Track Stability (Lower = Smoother):
Camera X → Avg Motion: 6.18 Std: 7.62
Camera Y → Avg Motion: 2.42 Std: 2.06

Evaluation Complete.
