In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torchvision import transforms, models
from PIL import Image
import cv2
import numpy as np
from google.colab import drive
from google.colab.patches import cv2_imshow
import time
from tqdm import tqdm

# 1. Install Ultralytics for YOLOv8
print("Installing Ultralytics...")
!pip install ultralytics -q

# 2. Install Torchreid
print("Installing Torchreid...")
!pip install torchreid -q

# 3. Mount Google Drive
print("Mounting Google Drive...")
drive.mount('/content/drive')

from ultralytics import YOLO
import torchreid # For the appearance model
import torch.nn.functional as F # For normalization
print("All libraries imported successfully.")

Installing Ultralytics...
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m21.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
class BasicConv2d(nn.Module):
    """A simple 2D Convolutional block with BatchNorm and ReLU."""
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        super(BasicConv2d, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, bias=False)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        return x

class GaitCNNLSTM(nn.Module):
    """
    CNN-LSTM architecture.
    (This must be identical to your training notebook)
    """
    def __init__(self, embedding_dim=256, num_subjects=74, lstm_hidden_dim=512):
        super(GaitCNNLSTM, self).__init__()
        self.embedding_dim = embedding_dim
        self.num_subjects = num_subjects
        self.lstm_hidden_dim = lstm_hidden_dim

        # --- CNN Backbone ---
        self.conv1 = BasicConv2d(1, 32, 5, 1, 2)
        self.conv2 = BasicConv2d(32, 32, 3, 1, 1)
        self.maxpool1 = nn.MaxPool2d(2, 2)
        self.conv3 = BasicConv2d(32, 64, 3, 1, 1)
        self.conv4 = BasicConv2d(64, 64, 3, 1, 1)
        self.maxpool2 = nn.MaxPool2d(2, 2)
        self.conv5 = BasicConv2d(64, 128, 3, 1, 1)
        self.conv6 = BasicConv2d(128, 128, 3, 1, 1)
        self.cnn_feature_size = 128 * 16 * 16

        # --- LSTM Layer ---
        self.lstm = nn.LSTM(
            input_size=self.cnn_feature_size,
            hidden_size=self.lstm_hidden_dim,
            num_layers=1,
            batch_first=True
        )

        # --- Head ---
        self.fc1 = nn.Linear(self.lstm_hidden_dim, self.embedding_dim)
        self.classifier = nn.Linear(self.embedding_dim, self.num_subjects)

    def forward(self, x):
        batch_size, seq_len, _, _, _ = x.shape
        x = x.view(batch_size * seq_len, 1, 64, 64)

        x = self.conv1(x); x = self.conv2(x); x = self.maxpool1(x)
        x = self.conv3(x); x = self.conv4(x); x = self.maxpool2(x)
        x = self.conv5(x); x = self.conv6(x)

        x = x.view(batch_size * seq_len, -1)

        x = x.view(batch_size, seq_len, self.cnn_feature_size)
        lstm_out, (h_n, c_n) = self.lstm(x)
        x = h_n.squeeze(0)

        embedding = self.fc1(x)
        logits = self.classifier(embedding)

        return logits, embedding

print("Model class (GaitCNNLSTM) defined.")

Model class (GaitCNNLSTM) defined.


In [None]:
# --- 1. SET UP DEVICE AND HYPERPARAMETERS ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- Path to your new gallery file ---
gallery_save_path = "/content/drive/MyDrive/CAPSTONE/my_known_gallery.pth"

# --- Gait Model Hyperparameters ---
GAIT_EMBEDDING_DIM = 256
NUM_SUBJECTS = 74
SEQ_LEN = 30

# --- ⚠️ CRITICAL ⚠️ ---
# You MUST tune this value manually.
# Since we are using normalized embeddings, the distance is between 0 (identical) and 2 (opposite).
# A good starting guess is 1.0.
REID_THRESHOLD = 1.5 # <--- START WITH 1.0 AND TUNE
# ----------------------

# --- 2. Load GAIT Model (GaitCNNLSTM) ---
print("Loading trained GaitCNNLSTM model...")
gait_model = GaitCNNLSTM(GAIT_EMBEDDING_DIM, NUM_SUBJECTS).to(device)
gait_model_path = "/content/drive/MyDrive/CAPSTONE/my_gait_cnnlstm.pth"
gait_model.load_state_dict(torch.load(gait_model_path))
gait_model.eval()
print("GaitCNNLSTM model loaded successfully.")

# --- 3. Load APPEARANCE Model (Torchreid OSNet) ---
print("Loading pre-trained Appearance (OSNet) model from Torchreid...")
appearance_model = torchreid.models.build_model(
    name='osnet_x1_0',
    num_classes=751,
    pretrained=True
)
appearance_model = appearance_model.to(device)
appearance_model.eval()
print("Appearance (OSNet) model loaded successfully. (Output dim: 512)")

# --- 4. Load TRACKING Model (YOLOv8-Seg) ---
print("Loading YOLOv8-Seg model for tracking...")
yolo_seg_model = YOLO('yolov8n-seg.pt')
yolo_seg_model.to(device)
print("YOLOv8-Seg model loaded successfully.")

# --- 5. Load POSE Model (YOLOv8-Pose) ---
print("Loading YOLOv8-Pose model...")
pose_model = YOLO('yolov8n-pose.pt')
pose_model.to(device)
print("YOLOv8-Pose model loaded successfully.")

# --- 6. Define ALL preprocessing transforms ---
gait_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor()
])

appearance_transform = transforms.Compose([
    transforms.Resize((256, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# --- 7. Load the Pre-Enrolled Gallery ---
if os.path.exists(gallery_save_path):
    gallery_of_known_people = torch.load(gallery_save_path)
    print(f"\n✅ Successfully loaded {len(gallery_of_known_people)} known people from gallery.")
    print(f"   Known IDs: {list(gallery_of_known_people.keys())}")
else:
    gallery_of_known_people = {}
    print(f"\n❌ WARNING: Gallery file not found at {gallery_save_path}. Will not be able to identify anyone.")


print("\nAll models and transforms are ready.")

Using device: cuda
Loading trained GaitCNNLSTM model...
GaitCNNLSTM model loaded successfully.
Loading pre-trained Appearance (OSNet) model from Torchreid...
Successfully loaded imagenet pretrained weights from "/root/.cache/torch/checkpoints/osnet_x1_0_imagenet.pth"
** The following layers are discarded due to unmatched keys or layer size: ['classifier.weight', 'classifier.bias']
Appearance (OSNet) model loaded successfully. (Output dim: 512)
Loading YOLOv8-Seg model for tracking...
YOLOv8-Seg model loaded successfully.
Loading YOLOv8-Pose model...
YOLOv8-Pose model loaded successfully.

✅ Successfully loaded 3 known people from gallery.
   Known IDs: ['Naman', 'Nishant', 'Aadishesh']

All models and transforms are ready.


In [None]:
# --- 1. ⚠️ UPDATE YOUR VIDEO PATHS ---
video_path = '/content/drive/MyDrive/CAPSTONE/Custom_Dataset/house-1/image.mp4' # <-- YOUR CROWDED VIDEO
output_video_path = '/content/drive/MyDrive/CAPSTONE/mainfinal_finaltry.mp4'  # <-- OUTPUT
# ------------------------------------

# --- 2. Open input video and get properties ---
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    print(f"Error: Could not open video file {video_path}")
else:
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
    print(f"Processing {total_frames} frames... Output will be saved to {output_video_path}")

# --- 3. Data structures for 3-WAY FUSED Re-ID ---
tracked_gait_sequences = {}
tracked_appearance_crops = {}
tracked_body_ratios = {}
# gallery_of_known_people is already loaded!
track_id_to_person_id = {} # Format: {track_id: (person_id, match_distance)}

# Helper function to get body ratios
def get_body_ratios(kpts):
    try:
        nose_y = kpts[0, 1]
        shoulder_y = (kpts[5, 1] + kpts[6, 1]) / 2.0
        hip_y = (kpts[11, 1] + kpts[12, 1]) / 2.0
        ankle_y = (kpts[15, 1] + kpts[16, 1]) / 2.0
        total_height = ankle_y - nose_y
        torso_height = hip_y - shoulder_y
        leg_height = ankle_y - hip_y
        if total_height > 1 and leg_height > 1:
            ratio_1 = torso_height / total_height
            ratio_2 = leg_height / total_height
            return torch.tensor([ratio_1, ratio_2], dtype=torch.float32).to(device)
    except Exception as e:
        pass
    return None

# --- 4. Start Video Loop ---
try:
    for _ in tqdm(range(total_frames), desc="Processing video"):
        ret, frame = cap.read()
        if not ret:
            break

        # --- a. Run Tracker (YOLO-Seg) ---
        results = yolo_seg_model.track(frame, persist=True, classes=0, verbose=False)

        frame_draw_info = []

        if results[0].masks is not None and results[0].boxes.id is not None:

            masks = results[0].masks.data
            boxes = results[0].boxes.data
            track_ids = results[0].boxes.id.int().cpu().tolist()

            for mask_tensor, box, track_id in zip(masks, boxes, track_ids):

                x1, y1, x2, y2 = [int(i) for i in box[:4]]

                # --- b. Process GAIT (Silhouette) ---
                mask_np = mask_tensor.cpu().numpy() * 255
                mask_pil = Image.fromarray(mask_np).convert('L')
                silhouette_tensor = gait_transform(mask_pil).to(device)

                # --- c. Process APPEARANCE (RGB Crop) ---
                crop_img = frame[y1:y2, x1:x2]
                crop_pil = Image.fromarray(cv2.cvtColor(crop_img, cv2.COLOR_BGR2RGB))
                appearance_tensor = appearance_transform(crop_pil).to(device)

                # --- d. Process POSE (Body Ratios) ---
                pose_results = pose_model(crop_img, verbose=False)
                body_ratio_tensor = None
                if len(pose_results[0].keypoints.data) > 0:
                    kpts = pose_results[0].keypoints.data[0].cpu().numpy()[:, :2]
                    body_ratio_tensor = get_body_ratios(kpts)

                # --- e. Store ALL THREE features ---
                if track_id not in tracked_gait_sequences:
                    tracked_gait_sequences[track_id] = []
                    tracked_appearance_crops[track_id] = []
                    tracked_body_ratios[track_id] = []

                tracked_gait_sequences[track_id].append(silhouette_tensor)
                tracked_appearance_crops[track_id].append(appearance_tensor)
                if body_ratio_tensor is not None:
                    tracked_body_ratios[track_id].append(body_ratio_tensor)

                # --- f. Check for Full Sequence & Run FUSED Re-ID ---
                if len(tracked_gait_sequences[track_id]) == SEQ_LEN:
                    gait_sequence = torch.stack(tracked_gait_sequences[track_id], dim=0).unsqueeze(0)
                    appearance_sequence = torch.stack(tracked_appearance_crops[track_id], dim=0)

                    if len(tracked_body_ratios[track_id]) > 0:
                        ratio_sequence = torch.stack(tracked_body_ratios[track_id], dim=0)
                    else:
                        ratio_sequence = torch.zeros((SEQ_LEN, 2)).to(device)

                    with torch.no_grad():
                        _ , gait_embedding = gait_model(gait_sequence)
                        appearance_embeddings = appearance_model(appearance_sequence)
                        appearance_embedding = torch.mean(appearance_embeddings, dim=0, keepdim=True)
                        body_ratio_embedding = torch.mean(ratio_sequence, dim=0, keepdim=True)

                        # --- NORMALIZE (Must match enrollment) ---
                        gait_embedding = F.normalize(gait_embedding, p=2, dim=1)
                        appearance_embedding = F.normalize(appearance_embedding, p=2, dim=1)
                        body_ratio_embedding = F.normalize(body_ratio_embedding, p=2, dim=1)

                        final_embedding = torch.cat((gait_embedding, appearance_embedding, body_ratio_embedding), dim=1)
                        # Shape is now [1, 256 + 512 + 2] = [1, 770]

                    # --- MODIFIED Re-ID Logic ---
                    is_new_person = True
                    matched_id = None
                    min_distance = float('inf')

                    if len(gallery_of_known_people) > 0: # Check if gallery has people
                        for person_name, known_embedding in gallery_of_known_people.items():
                            distance = torch.cdist(final_embedding, known_embedding.to(device)).item()
                            if distance < min_distance:
                                min_distance = distance
                                matched_id = person_name # e.g., "Naman"

                    if min_distance < REID_THRESHOLD:
                        is_new_person = False

                    if is_new_person:
                        # This is an unknown person. DO NOT add them to the gallery.
                        track_id_to_person_id[track_id] = (f"Track-{track_id}", min_distance)
                    else:
                        # This is a known person.
                        track_id_to_person_id[track_id] = (matched_id, min_distance)
                    # --- END OF MODIFIED LOGIC ---

                    tracked_gait_sequences[track_id] = []
                    tracked_appearance_crops[track_id] = []
                    tracked_body_ratios[track_id] = []

                # --- g. Store Box Info for Drawing ---
                person_id_mapping = track_id_to_person_id.get(track_id)
                if person_id_mapping:
                    display_id = person_id_mapping[0]
                    match_distance = person_id_mapping[1]
                else:
                    display_id = f"Track-{track_id}"
                    match_distance = float('inf')

                frame_draw_info.append({
                    'box': box,
                    'display_id': display_id,
                    'track_id': track_id,
                    'match_distance': match_distance
                })

            # --- h. Conflict Resolution (Best-Match-Wins) ---
            final_draw_list = []
            person_id_assignments = {}

            for info in frame_draw_info:
                display_id = info['display_id']
                match_distance = info['match_distance']
                is_known = display_id in gallery_of_known_people # Check if it's "Naman" or "Nishant"

                if is_known:
                    if display_id not in person_id_assignments:
                        person_id_assignments[display_id] = (match_distance, info)
                    else:
                        current_best_distance, _ = person_id_assignments[display_id]
                        if match_distance < current_best_distance:
                            _, old_info = person_id_assignments.pop(display_id)
                            old_info['display_id'] = f"Track-{old_info['track_id']}" # Revert to track_id
                            final_draw_list.append(old_info)
                            person_id_assignments[display_id] = (match_distance, info)
                        else:
                            info['display_id'] = f"Track-{info['track_id']}" # Revert to track_id
                            final_draw_list.append(info)
                else:
                    final_draw_list.append(info)

            for best_distance, info in person_id_assignments.values():
                final_draw_list.append(info)

            # --- i. Draw all boxes at the end (using the final list) ---
            for info in final_draw_list:
                box = info['box']
                display_id = info['display_id']
                x1, y1, x2, y2 = [int(i) for i in box[:4]]

                is_known = display_id in gallery_of_known_people
                color = (0, 255, 0) if is_known else (0, 0, 255) # Green for known, Red for unknown
                label = f"{display_id}" # Will show "Naman" or "Track-123"

                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.putText(frame, label, (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # --- j. Write the frame to the output video ---
        writer.write(frame)

except KeyboardInterrupt:
    print("\nProcessing interrupted by user.")
finally:
    # --- 5. CRITICAL: Release all resources ---
    cap.release()
    writer.release()
    cv2.destroyAllWindows()
    print(f"\nVideo processing finished. Output saved to: {output_video_path}")

Processing 1770 frames... Output will be saved to /content/drive/MyDrive/CAPSTONE/mainfinal_finaltry.mp4


Processing video: 100%|██████████| 1770/1770 [02:11<00:00, 13.43it/s]


Video processing finished. Output saved to: /content/drive/MyDrive/CAPSTONE/mainfinal_finaltry.mp4



