In [4]:
from ultralytics import YOLO
import cv2
import os

model = YOLO("yolov8n.pt") 

# Create folders to save detections
os.makedirs("frames_in", exist_ok=True)
os.makedirs("frames_out", exist_ok=True)


[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 3.4MB/s 1.8s.8s<0.0s<1.5s


In [6]:
#Person Detection in each video:

def extract_person_detections(video_path, output_folder, conf_threshold=0.5):
    cap = cv2.VideoCapture(video_path)
    frame_idx = 0
    detections = {}

    while True:
        ret,frame = cap.read()
        if not ret:
            break

        results = model(frame, conf=conf_threshold, verbose=False)
        boxes = []
        for r in results[0].boxes:
            cls = int(r.cls)
            if cls == 0: 
                x1, y1, x2, y2 = map(int, r.xyxy[0])
                boxes.append([x1, y1, x2, y2])
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

        detections[frame_idx] = boxes
        cv2.imwrite(f"{output_folder}/frame_{frame_idx:05d}.jpg", frame)
        frame_idx += 1

    cap.release()
    return detections

# Detect people in both videos
in_detections = extract_person_detections("In.mp4", "frames_in")
out_detections = extract_person_detections("Out.mp4", "frames_out")


In [7]:
#person_Identification
import torch
import torch.nn as nn
import torchvision.transforms as T
import timm
from PIL import Image
import numpy as np

device = "cuda" if torch.cuda.is_available() else "cpu"


# Load resnet50 model for embeddings
reid_model = timm.create_model("resnet50", pretrained=True)
reid_model.fc = nn.Identity()
reid_model = reid_model.to(device).eval()

# Preprocessing transform for person crops
transform = T.Compose([
    T.Resize((256, 128)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225])])



In [8]:
def extract_embeddings(frames_folder, detections_dict):
    embeddings = {}
    cap = cv2.VideoCapture(frames_folder.replace("frames_", "") + ".mp4")

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()

    for frame_idx, boxes in detections_dict.items():
        frame_path = os.path.join(frames_folder, f"frame_{frame_idx:05d}.jpg")
        if not os.path.exists(frame_path):
            continue

        frame = Image.open(frame_path).convert("RGB")
        person_embeddings = []

        for (x1, y1, x2, y2) in boxes:
            crop = frame.crop((x1, y1, x2, y2))
            img_tensor = transform(crop).unsqueeze(0).to(device)
            with torch.no_grad():
                emb = reid_model(img_tensor)
            emb = emb.squeeze().cpu().numpy()
            emb = emb / np.linalg.norm(emb)
            person_embeddings.append(emb)

        embeddings[frame_idx] = person_embeddings

    return embeddings

in_embeddings = extract_embeddings("frames_in", in_detections)
out_embeddings = extract_embeddings("frames_out", out_detections)


In [9]:
#Match entry and exit persons
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd

def match_people(in_embeds, out_embeds, threshold=0.6):
    results = []
    person_id = 1

    for in_frame, in_vectors in in_embeds.items():
        for in_vec in in_vectors:
            best_match = None
            best_score = -1
            best_out_frame = None

            for out_frame, out_vectors in out_embeds.items():
                if len(out_vectors) == 0:
                    continue
                sims = cosine_similarity([in_vec], out_vectors)[0]
                max_sim = np.max(sims)
                if max_sim > best_score:
                    best_score = max_sim
                    best_out_frame = out_frame

            if best_score >= threshold:
                results.append({
                    "Person_ID": person_id,
                    "Entry_Timestamp_Video": in_frame,
                    "Exit_Timestamp_Video": best_out_frame,})
                
                person_id += 1

    return pd.DataFrame(results)

matches_df = match_people(in_embeddings, out_embeddings)
matches_df.to_csv("results.csv", index=False)
matches_df


Unnamed: 0,Person_ID,Entry_Timestamp_Video,Exit_Timestamp_Video
0,1,6092,15379
1,2,6098,14813
2,3,6104,14517
3,4,6105,3721
4,5,6106,15258
...,...,...,...
2498,2499,20887,9964
2499,2500,20888,10510
2500,2501,20889,9953
2501,2502,20890,9924


In [12]:
import cv2
import pandas as pd
import os

def annotate_video(video_path, detections, id_assignments, output_path):
    """Draw bounding boxes and IDs on a video.
    detections: dict frame_idx - list of [x1, y1, x2, y2]
    id_assignments: dict frame_idx - list of person IDs (same order as detections) """
   
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_vid = cv2.VideoWriter(output_path,fourcc,fps,(width,height))

    frame_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        boxes = detections.get(frame_idx, [])
        ids = id_assignments.get(frame_idx,[None]*len(boxes))

        for (box, pid) in zip(boxes,ids):
            x1,y1,x2,y2 = box
            color = (0,255,0)
            label = f"ID {pid}" if pid else "Unknown"
            cv2.rectangle(frame,(x1,y1),(x2,y2),color,2)
            cv2.putText(frame,label,(x1,y1-10),
                        cv2.FONT_HERSHEY_SIMPLEX,0.6,color,2)

        out_vid.write(frame)
        frame_idx += 1

    cap.release()
    out_vid.release()
    print(f"Annotated video saved to:{output_path}")

#create simple ID assignment mapping using your matches_df
def generate_id_assignments(matches_df, detections):
    """
    Generates frame-wise ID assignments based on matching results.
    For simplicity: mark frames listed in matches_df with person IDs."""
    
    id_assignments = {f:[None]*len(b) for f, b in detections.items()}

    for _, row in matches_df.iterrows():
        entry_frame = int(row["Entry_Timestamp_Video"])
        pid = int(row["Person_ID"])
        if entry_frame in id_assignments:
            for i in range(len(id_assignments[entry_frame])):
                id_assignments[entry_frame][i] = pid
    return id_assignments

#Run visualization for both videos
in_id_assignments = generate_id_assignments(matches_df, in_detections)
out_id_assignments = generate_id_assignments(matches_df, out_detections)

annotate_video("In.mp4", in_detections, in_id_assignments, "In_annotated.mp4")
annotate_video("Out.mp4", out_detections, out_id_assignments, "Out_annotated.mp4")


Annotated video saved to:In_annotated.mp4
Annotated video saved to:Out_annotated.mp4


In [13]:
import cv2
import pandas as pd
import os
from tqdm import tqdm
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# Initialize YOLOv8 for person detection
model = YOLO("yolov8n.pt") 

# Initialize DeepSORT tracker for smooth continuous IDs
tracker = DeepSort(
    max_age=30,              
    n_init=3,                 
    nms_max_overlap=1.0,
    max_cosine_distance=0.3,  
    embedder="mobilenet",
    half=True)

def visualize_tracking(video_path, output_path):
    
    """ Run YOLO + DeepSORT on the full video and visualize smooth tracking. """
    
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_vid = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    frame_idx = 0
    id_records = []

    for _ in tqdm(range(total_frames), desc=f"Processing {os.path.basename(video_path)}"):
        ret, frame = cap.read()
        if not ret:
            break

        # YOLO person detection
        results = model(frame, classes=[0], verbose=False)
        detections = []
        for box in results[0].boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            conf = float(box.conf[0])
            detections.append(([x1, y1, x2 - x1, y2 - y1], conf, 'person'))

        # Update DeepSORT tracker
        tracks = tracker.update_tracks(detections, frame=frame)

        for track in tracks:
            if not track.is_confirmed():
                continue
            track_id = track.track_id
            l, t, r, b = map(int, track.to_ltrb())

            # Draw bounding box + label
            cv2.rectangle(frame, (l, t), (r, b), (0, 255, 0), 2)
            cv2.putText(frame, f"ID {track_id}", (l, t - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

            # Record timestamp info
            timestamp = frame_idx / fps
            id_records.append([track_id, timestamp])

        out_vid.write(frame)
        frame_idx += 1

    cap.release()
    out_vid.release()

    # Save tracking summary (first and last appearance per ID)
    df = pd.DataFrame(id_records, columns=["Person_ID", "Timestamp"])
    summary = df.groupby("Person_ID").agg(
        Entry_Timestamp_Video=("Timestamp", "min"),
        Exit_Timestamp_Video=("Timestamp", "max")).reset_index()

    csv_path = os.path.splitext(output_path)[0] + "_data.csv"
    summary.to_csv(csv_path, index=False)

    print(f" Full video processed: {output_path}")
    print(f" Tracking summary saved: {csv_path}")
    return summary

In [14]:
in_summary = visualize_tracking("In.mp4", "In_full_tracked.mp4")
out_summary = visualize_tracking("Out.mp4", "Out_full_tracked.mp4")


Processing In.mp4:  50%|██████████████████████████▍                          | 21436/42877 [3:24:09<3:24:11,  1.75it/s]


 Full video processed: In_full_tracked.mp4
 Tracking summary saved: In_full_tracked_data.csv


Processing Out.mp4: 100%|██████████████████████████████████████████████████████| 26997/26997 [2:30:42<00:00,  2.99it/s]


 Full video processed: Out_full_tracked.mp4
 Tracking summary saved: Out_full_tracked_data.csv
