In [1]:
from ultralytics import YOLO
import matplotlib.pyplot as plt
import numpy as np
import cv2
from tqdm import tqdm
import torch
import torchreid
device = torch.device("mps")

# Load pre-trained OSNet model
reid_model = torchreid.models.build_model(
    name='osnet_x1_0',
    num_classes=1000,
    pretrained=True)

reid_model = reid_model.to(device)
reid_model.eval()

model = YOLO("yolov8l.pt")



Successfully loaded imagenet pretrained weights from "/Users/bogdanmatache/.cache/torch/checkpoints/osnet_x1_0_imagenet.pth"


In [2]:
def detect_people(frame):
    bboxes = {'bbox': [], 'frame': [], 'conf_score': []}
    result = model(frame, classes = [0], verbose = False)
    for conf_score in result[0].boxes.conf:
        bboxes['conf_score'].append(conf_score.squeeze().tolist())
    for bbox_coord in result[0].boxes.xyxy:
        bbox = (np.round(bbox_coord.squeeze().tolist()).astype(int)).tolist()
        bboxes['bbox'].append(bbox)
        bboxes['frame'].append(result[0].orig_img)
    return bboxes

In [3]:
def cosine_similarity(a, b):
    a = a.flatten()
    b = b.flatten()
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

def dot_product(a, b):
    a = a.flatten()
    b = b.flatten()
    return np.dot(a, b)

feature_db = {}
next_id = 0

def assign_id(features):
    global next_id
    best_match = None
    best_score = -1
    for id, db_features in feature_db.items():
        score = cosine_similarity(features.detach().cpu().numpy(), db_features)
        if score > best_score:
            best_score = score
            best_match = id
    if best_score > 0.65:  # threshold for matching
        return best_match
    else:
        feature_db[next_id] = features.detach().cpu().numpy()
        next_id += 1
        return next_id - 1

In [4]:
def preprocess(bbox, frame):
    image = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (256, 256))
    image = image / 255.0
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    image = (image - mean) / std
    image = image.transpose(2, 0, 1)
    image = torch.FloatTensor(image)
    image = image.unsqueeze(0).to(device)
    return image

In [5]:
def extract_features(model, image):
    with torch.no_grad():
        features = model(image)
    return features

In [6]:
cap = cv2.VideoCapture('scene_example.avi')
num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

out = cv2.VideoWriter('annotated_video_80.avi', cv2.VideoWriter_fourcc(*'XVID'), frame_rate, (frame_width, frame_height))
inference_results = []

frame_data = {'frame_nr': [], 'bbox': [], 'id': [], 'orig_img': []}

with tqdm(total=num_frames, desc="Person Detection") as pbar:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        id_list = []
        orig_img_list = []
        frame_nr_list = []
        bboxes_list = []

        frame_nr_list.append(int(cap.get(cv2.CAP_PROP_POS_FRAMES)))
        bboxes = detect_people(frame)
        for bbox, frame in zip(bboxes['bbox'], bboxes['frame']):
            image = preprocess(bbox, frame)
            features = reid_model(image)
            id = assign_id(features)
            id_list.append(id)
            orig_img_list.append(frame)
            bboxes_list.append(bbox)
            cv2.putText(frame, f"ID: {id}", (bbox[0], bbox[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)
            cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (255, 0, 0), 2)
            frame_data['frame_nr'].append(frame_nr_list)
            frame_data['bbox'].append(bboxes_list)
            frame_data['id'].append(id_list)
            frame_data['orig_img'].append(orig_img_list)
        pbar.update(1)

        out.write(frame)

cap.release()
cv2.destroyAllWindows()

Person Detection: 100%|██████████| 718/718 [03:25<00:00,  3.50it/s]


In [9]:
import pandas as pd

frame_df = pd.DataFrame(frame_data)
frame_df.to_csv('frame_data_80.csv')

In [8]:
"""for ids, bboxes, orig_img, frame_nr in zip(frame_data['id'], frame_data['bbox'], frame_data['orig_img'], frame_data['frame_nr']):
    for id in ids:
        print(f"ID: {id}")
        person_frame = orig_img[ids.index(id)]
        bbox_coord = bboxes[ids.index(id)]
        person_frame = person_frame[bbox_coord[1]:bbox_coord[3], bbox_coord[0]:bbox_coord[2]]
        plt.figure(figsize=(8, 6))
        plt.imshow(person_frame)
        plt.title(f"Person ID: {id} - Frame Number: {frame_nr}")
        plt.axis('off')
        plt.show()"""

'for ids, bboxes, orig_img, frame_nr in zip(frame_data[\'id\'], frame_data[\'bbox\'], frame_data[\'orig_img\'], frame_data[\'frame_nr\']):\n    for id in ids:\n        print(f"ID: {id}")\n        person_frame = orig_img[ids.index(id)]\n        bbox_coord = bboxes[ids.index(id)]\n        person_frame = person_frame[bbox_coord[1]:bbox_coord[3], bbox_coord[0]:bbox_coord[2]]\n        plt.figure(figsize=(8, 6))\n        plt.imshow(person_frame)\n        plt.title(f"Person ID: {id} - Frame Number: {frame_nr}")\n        plt.axis(\'off\')\n        plt.show()'