## Importing Libraries


In [1]:
import cv2
import numpy as np
import torch
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort


 ## Loading models

In [2]:
yolo = YOLO(r"C:\Users\nisha\OneDrive\Desktop\ML Projects\Kalkini_Project\models\yolov8n.pt")  # YOLOv8 for person detection
deepsort_tracker_cam1 = DeepSort(max_age=30)  # DeepSORT tracker for Camera 1
deepsort_tracker_cam2 = DeepSort(max_age=30)  # DeepSORT tracker for Camera 2

# Initialize ReID model with updated weights syntax
class ReIDModel(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.model = torch.hub.load('pytorch/vision', 'resnet50', weights='DEFAULT')
        self.model.eval()
    
    def forward(self, x):
        x = self.model.conv1(x)
        x = self.model.bn1(x)
        x = self.model.relu(x)
        x = self.model.maxpool(x)
        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)
        x = self.model.avgpool(x)
        return x.flatten(1)

reid_model = ReIDModel().eval()

Using cache found in C:\Users\nisha/.cache\torch\hub\pytorch_vision_main


## Open video streams

In [3]:
cap1 = cv2.VideoCapture(r"C:\Users\nisha\OneDrive\Desktop\ML Projects\Kalkini_Project\input_videos\test.mp4")  # Camera 1
cap2 = cv2.VideoCapture(r"C:\Users\nisha\OneDrive\Desktop\ML Projects\Kalkini_Project\input_videos\test.mp4")  # Camera 2

# Global storage for cross-camera ReID features
global_reid_features = []

## Functions Defining

In [4]:
def extract_reid_embedding(crop):
    """Extract ReID embedding from a cropped image."""
    crop = cv2.resize(crop, (128, 256))  # Resize to ReID input size
    crop = torch.from_numpy(crop).permute(2, 0, 1).float().div(255).unsqueeze(0)
    with torch.no_grad():
        embedding = reid_model(crop).squeeze().cpu().numpy()
    return embedding / np.linalg.norm(embedding)  # Normalize

def process_camera(cap, tracker, camera_id):
    """Process a single camera feed and return tracked persons with embeddings and frame."""
    ret, frame = cap.read()
    if not ret:
        return [], None  # Return None for frame if read fails
    
    # Detect people with YOLOv8
    results = yolo.predict(frame, classes=[0], verbose=False)  # Class 0 = person
    boxes = results[0].boxes.xyxy.cpu().numpy()
    confs = results[0].boxes.conf.cpu().numpy()
    
    # Prepare DeepSORT detections
    detections = []
    for box, conf in zip(boxes, confs):
        x1, y1, x2, y2 = map(int, box)
        detections.append(([x1, y1, x2, y2], conf, "person"))
    
    # Update DeepSORT tracker
    tracks = tracker.update_tracks(detections, frame=frame)
    
    # Extract ReID embeddings for tracked persons
    tracked_persons = []
    for track in tracks:
        if not track.is_confirmed():
            continue
        track_id = track.track_id
        ltrb = track.to_ltrb()
        x1, y1, x2, y2 = map(int, ltrb)
        crop = frame[y1:y2, x1:x2]
        if crop.size == 0:
            continue
        
        # Extract ReID embedding
        embedding = extract_reid_embedding(crop)
        tracked_persons.append((track_id, embedding, (x1, y1, x2, y2)))
    
    return tracked_persons, frame  # Return both tracked persons and frame

## Main LOOP

In [None]:
while True:
    # Process Camera 1 and get its frame
    persons_cam1, frame1 = process_camera(cap1, deepsort_tracker_cam1, camera_id=1)
    # Process Camera 2 and get its frame
    persons_cam2, frame2 = process_camera(cap2, deepsort_tracker_cam2, camera_id=2)
    
    # Skip iteration if frames are None (e.g., camera disconnected)
    if frame1 is None or frame2 is None:
        continue
    
    # Check for cross-camera matches using ReID embeddings
    for p1_id, p1_embed, (x1, y1, x2, y2) in persons_cam1:
        for p2_id, p2_embed, _ in persons_cam2:
            similarity = np.dot(p1_embed, p2_embed)
            if similarity > 0.7:  # Threshold (adjust based on your use case)
                print(f"Person {p1_id} (Cam1) matches Person {p2_id} (Cam2)!")
                # Highlight bounding boxes in both cameras
                cv2.rectangle(frame1, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.rectangle(frame2, (x1, y1), (x2, y2), (0, 255, 0), 2)
    
    # Display frames
    cv2.imshow("Camera 1", frame1)
    cv2.imshow("Camera 2", frame2)
    
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap1.release()
cap2.release()
cv2.destroyAllWindows()

## Comments

## More Features to ADD

### 1. Search Person by ID and Highlight Movement


In [None]:
def highlight_person_trail(frame, track_history, person_id, color=(0, 0, 255)):
    """
    Highlights the movement trail of a specific person.

    :param frame: The current video frame.
    :param track_history: Dictionary storing movement history {id: [(x, y), ...]}
    :param person_id: The ID of the person to highlight.
    :param color: The color for the highlighted trail.
    """
    if person_id in track_history:
        path = track_history[person_id]
        
        # Draw trail
        for i in range(1, len(path)):
            cv2.line(frame, path[i - 1], path[i], color, 3)

        # Draw last known position
        if path:
            cv2.circle(frame, path[-1], 6, color, -1)


In [None]:
import matplotlib.pyplot as plt

def plot_person_movement(track_history, person_id):
    """
    Plots the movement history of a given person ID.

    :param track_history: Dictionary storing movement history {id: [(x, y), ...]}
    :param person_id: The ID of the person to visualize.
    """
    if person_id not in track_history:
        print(f"No movement data for ID {person_id}")
        return
    
    x_vals, y_vals = zip(*track_history[person_id])

    plt.figure(figsize=(8, 6))
    plt.plot(x_vals, y_vals, marker='o', linestyle='-', color='blue', label=f"ID {person_id} Movement")
    plt.scatter(x_vals[-1], y_vals[-1], color='red', label="Last Seen Position", zorder=3)
    
    plt.xlabel("X Coordinate")
    plt.ylabel("Y Coordinate")
    plt.title(f"Movement History of ID {person_id}")
    plt.legend()
    plt.grid(True)
    plt.show()


### 2. Implementing an Automatic Alert System for Restricted Areas

In [None]:
restricted_zones = [
    (100, 100, 400, 400),  # restricted area 1
    (500, 200, 700, 500)   # restricted area 2
]


In [None]:
import winsound

def check_restricted_area(frame, track_history, restricted_zones, alert_ids):
    """
    Checks if a person enters a restricted area and triggers an alert.

    :param frame: The current video frame.
    :param track_history: Dictionary storing movement history {id: [(x, y), ...]}
    :param restricted_zones: List of predefined restricted areas.
    :param alert_ids: Set of person IDs to be monitored.
    """
    for person_id, path in track_history.items():
        if not path:
            continue

        # Get the person's latest position (center of bounding box)
        x, y = path[-1]  

        # Check if inside any restricted zone
        for (x1, y1, x2, y2) in restricted_zones:
            if x1 <= x <= x2 and y1 <= y <= y2:
                print(f"ALERT! Person ID {person_id} entered a restricted area!")
                
                # Play a beep sound (Windows)
                winsound.Beep(1000, 500)  # Frequency: 1000 Hz, Duration: 500ms

                # Highlight the person in red
                cv2.circle(frame, (x, y), 8, (0, 0, 255), -1)
                cv2.putText(frame, f'ALERT ID {person_id}', (x, y - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

                # Stop checking once alert is triggered
                break


### 3. Facial Identification for Verifying Individuals from Database

In [None]:
import cv2

def extract_face(frame, bbox):
    """
    Extracts the face region from a person's bounding box.

    :param frame: The video frame.
    :param bbox: Tuple (x1, y1, x2, y2) representing the bounding box.
    :return: Cropped face image or None if extraction fails.
    """
    x1, y1, x2, y2 = bbox
    face_region = frame[y1:y2, x1:x2]

    if face_region.shape[0] > 0 and face_region.shape[1] > 0:
        return cv2.resize(face_region, (160, 160))  # Resize for FaceNet/DeepFace
    return None


In [None]:
from deepface import DeepFace
import numpy as np

face_db = {}  # Stores embeddings for known people

def identify_person(face_crop):
    """
    Identifies a person using facial recognition.

    :param face_crop: Cropped face image.
    :return: Person ID if recognized, None otherwise.
    """
    if face_crop is None:
        return None

    try:
        embedding = DeepFace.represent(face_crop, model_name="Facenet")[0]["embedding"]

        # Compare with stored embeddings
        for person_id, ref_embedding in face_db.items():
            distance = np.linalg.norm(np.array(embedding) - np.array(ref_embedding))
            if distance < 0.6:  # Threshold for FaceNet similarity
                return person_id

        return None
    except:
        return None


In [None]:
def register_face(person_id, face_crop):
    """
    Stores a new person's face embedding in the database.

    :param person_id: Unique ID assigned to the person.
    :param face_crop: Cropped face image.
    """
    try:
        embedding = DeepFace.represent(face_crop, model_name="Facenet")[0]["embedding"]
        face_db[person_id] = embedding
    except:
        pass
