### Person Re_Idenfication

In [None]:
import cv2
import numpy as np
import pandas as pd
from collections import defaultdict
from IPython.display import clear_output, display, Image
from ultralytics import YOLO
from scipy.optimize import linear_sum_assignment
import torch
from torchvision import models, transforms
from sklearn.metrics.pairwise import cosine_similarity
from concurrent.futures import ThreadPoolExecutor

model = YOLO("yolov8n.pt")
names = model.model.names

class FeatureExtractor:
    def __init__(self):
        self.model = models.resnet50(pretrained=True).eval()
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
    
    def extract_features(self, image, bbox):
        x1, y1, x2, y2 = map(int, bbox)
        cropped_image = image[y1:y2, x1:x2]
        if cropped_image.size == 0:
            return torch.zeros((1, 2048)) 
        input_tensor = self.transform(cropped_image).unsqueeze(0)
        with torch.no_grad():
            features = self.model(input_tensor).flatten()
        return features

feature_extractor = FeatureExtractor()
track_history = defaultdict(lambda: [])
track_features = {}
next_track_id = 0
track_info = []

class KalmanFilter:
    def __init__(self, dt, u_x, u_y, std_acc, x_std_meas, y_std_meas):
        self.dt = dt
        self.u = np.matrix([[u_x], [u_y]])
        self.x = np.matrix([[0], [0], [0], [0]])
        self.A = np.matrix([[1, 0, self.dt, 0], [0, 1, 0, self.dt], [0, 0, 1, 0], [0, 0, 0, 1]])
        self.B = np.matrix([[(self.dt**2) / 2, 0], [0, (self.dt**2) / 2], [self.dt, 0], [0, self.dt]])
        self.H = np.matrix([[1, 0, 0, 0], [0, 1, 0, 0]])
        self.Q = np.matrix([[self.dt**4 / 4, 0, self.dt**3 / 2, 0],
                            [0, self.dt**4 / 4, 0, self.dt**3 / 2],
                            [self.dt**3 / 2, 0, self.dt**2, 0],
                            [0, self.dt**3 / 2, 0, self.dt**2]]) * std_acc**2
        self.R = np.matrix([[x_std_meas**2, 0], [0, y_std_meas**2]])
        self.P = np.eye(self.A.shape[1])

    def predict(self):
        self.x = np.dot(self.A, self.x) + np.dot(self.B, self.u)
        self.P = np.dot(np.dot(self.A, self.P), self.A.T) + self.Q
        return self.x[0:2]

    def update(self, z):
        S = np.dot(self.H, np.dot(self.P, self.H.T)) + self.R
        K = np.dot(np.dot(self.P, self.H.T), np.linalg.inv(S))
        self.x = self.x + np.dot(K, (z - np.dot(self.H, self.x)))
        self.P = self.P - np.dot(np.dot(K, self.H), self.P)

trackers = {}
video_path = '1.mp4'
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), "Error reading video file"

frame_count = 0
frame_skip = 1  # Process every 30th frame

MATCHING_THRESHOLD = 0.5

def extract_features_batch(images_and_boxes):
    features = []
    for image, bbox in images_and_boxes:
        feature = feature_extractor.extract_features(image, bbox)
        features.append(feature)
    return features

def add_tracking_info(track_id, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    track_info.append({
        'track_id': track_id,
        'x1': x1,
        'y1': y1,
        'x2': x2,
        'y2': y2
    })

class Annotator:
    def __init__(self, img, line_width=2):
        self.img = img
        self.line_width = line_width

    def box_label(self, bbox, color=(0, 255, 0), label=None):
        x1, y1, x2, y2 = map(int, bbox)
        cv2.rectangle(self.img, (x1, y1), (x2, y2), color, thickness=self.line_width)
        if label:
            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(self.img, (x1, y1 - h - 10), (x1 + w, y1), color, -1)
            cv2.putText(self.img, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)

try:
    while cap.isOpened():
        success, frame = cap.read()
        if success:
            frame_count += 1
            if frame_count % frame_skip != 0:
                continue 

            results = model.track(frame, persist=True, verbose=False)

            if results[0].boxes.id is not None:
                clss = results[0].boxes.cls.cpu().tolist()
                boxes = results[0].boxes.xyxy.cpu()
                track_ids = results[0].boxes.id.int().cpu().tolist()

                annotator = Annotator(frame, line_width=2)

                detections = []
                for box, cls in zip(boxes, clss):
                    class_name = names[int(cls)]
                    if class_name == "person":
                        detections.append((frame, box))

                with ThreadPoolExecutor() as executor:
                    features_batch = list(executor.map(extract_features_batch, [detections]))

                detections = [(box, feature) for (frame, box), feature in zip(detections, features_batch[0])]

                # Re-identification and tracking update
                if len(track_features) > 0:
                    cost_matrix = np.zeros((len(track_features), len(detections)))
                    for i, (track_id, track_feature) in enumerate(track_features.items()):
                        for j, (box, detection_feature) in enumerate(detections):
                            cost_matrix[i, j] = 1 - cosine_similarity(track_feature.reshape(1, -1), detection_feature.reshape(1, -1)).item()

                    row_ind, col_ind = linear_sum_assignment(cost_matrix)

                    assigned_tracks = {}
                    for i, j in zip(row_ind, col_ind):
                        if cost_matrix[i, j] < MATCHING_THRESHOLD:
                            track_id = list(track_features.keys())[i]
                            box, features = detections[j]
                            trackers[track_id].update(np.array([[((box[0] + box[2]) / 2)], [(box[1] + box[3]) / 2]]))
                            assigned_tracks[track_id] = box, features
                            detections[j] = None
                            add_tracking_info(track_id, box)

                    detections = [d for d in detections if d is not None]

                    for track_id, (box, features) in assigned_tracks.items():
                        annotator.box_label(box, color=(255, 0, 0), label=f"person {track_id}")
                        track_features[track_id] = features
                        track = track_history[track_id]
                        track.append((int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2)))
                        if len(track) > 30:
                            track.pop(0)
                        points = np.array(track, dtype=np.int32).reshape((-1, 1, 2))
                        cv2.circle(frame, track[-1], 7, (255, 0, 0), -1)
                        cv2.polylines(frame, [points], isClosed=False, color=(255, 0, 0), thickness=2)
                        cv2.putText(frame, f"ID: {track_id}", (int(box[2]) + 10, int(box[1]) + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

                for box, features in detections:
                    new_track_id = next_track_id
                    next_track_id += 1
                    trackers[new_track_id] = KalmanFilter(dt=1, u_x=0, u_y=0, std_acc=1, x_std_meas=0.1, y_std_meas=0.1)
                    trackers[new_track_id].update(np.array([[((box[0] + box[2]) / 2)], [(box[1] + box[3]) / 2]]))
                    track_features[new_track_id] = features
                    track_history[new_track_id].append((int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2)))
                    annotator.box_label(box, color=(0, 255, 0), label=f"person {new_track_id}")
                    cv2.circle(frame, track_history[new_track_id][-1], 7, (0, 255, 0), -1)
                    cv2.putText(frame, f"ID: {new_track_id}", (int(box[2]) + 10, int(box[1]) + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
                    add_tracking_info(new_track_id, box)

            df = pd.DataFrame(track_info)
            df.to_csv('tracking_info.csv', index=False)
            display(Image(data=cv2.imencode('.jpg', frame)[1].tobytes()))
            clear_output(wait=True)
        else:
            break
finally:
    cap.release()