In [2]:
from google.colab import files
import os

# Upload video files manually
uploaded = files.upload()  # This will prompt file upload

# List uploaded video files
video_files = [f for f in uploaded.keys() if f.endswith(('.mp4', '.avi'))]
print("Uploaded videos:", video_files)

Saving Video 1.mp4 to Video 1.mp4
Saving Video 2.mp4 to Video 2.mp4
Saving Video 3.mp4 to Video 3.mp4
Saving Video 4.mp4 to Video 4.mp4
Uploaded videos: ['Video 1.mp4', 'Video 2.mp4', 'Video 3.mp4', 'Video 4.mp4']


In [3]:
import cv2

for video_name in video_files:
    cap = cv2.VideoCapture(video_name)
    if cap.isOpened():
        print(f"Processing: {video_name}")
    cap.release()


Processing: Video 1.mp4
Processing: Video 2.mp4
Processing: Video 3.mp4
Processing: Video 4.mp4


In [4]:
!pip install -q ultralytics deep_sort_realtime torch torchvision opencv-python-headless pandas gdown
!git clone https://github.com/KaiyangZhou/deep-person-reid.git
%cd deep-person-reid
!pip install -r requirements.txt


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.0 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━[0m [32m0.9/1.0 MB[0m [31m26.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m18.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.4/8.4 MB[0m [31m75.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m95.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m76.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m35.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [5]:
# In case of display issues (OPTIONAL, only if you face import errors)
!pip uninstall -y opencv-python-headless
!pip install opencv-python


Found existing installation: opencv-python-headless 4.11.0.86
Uninstalling opencv-python-headless-4.11.0.86:
  Successfully uninstalled opencv-python-headless-4.11.0.86


In [6]:
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
import cv2
import os
import pandas as pd
from tqdm import tqdm

# Load YOLOv8 person detector
model = YOLO('yolov8n.pt')  # Or yolov8m.pt if you want more accuracy

# Initialize Deep SORT tracker
tracker = DeepSort(max_age=30)


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt'...


100%|██████████| 6.25M/6.25M [00:00<00:00, 72.7MB/s]


In [9]:
import cv2
import os
import pandas as pd
import gc

# If you uploaded videos manually to Colab:
extract_path = "/content"

# Or if using Google Drive, use:
# extract_path = "/content/drive/MyDrive/YourFolder"

# List the video files
video_files = [f for f in os.listdir(extract_path) if f.endswith(('.mp4', '.avi'))]
print("Found videos:", video_files)

results_list = []

for video_name in video_files:
    video_path = os.path.join(extract_path, video_name)
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Unable to open video {video_name}")
        continue

    print(f"Processing video: {video_name}")
    frame_num = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame = cv2.resize(frame, (640, 360))

        # Model inference
        detections = model(frame)[0]

        # Person detections only
        person_dets = []
        for det in detections.boxes.data.tolist():
            x1, y1, x2, y2, score, cls = det
            if int(cls) == 0:  # Class 0 = person
                person_dets.append(([x1, y1, x2 - x1, y2 - y1], score, 'person'))

        # Update tracker
        tracks = tracker.update_tracks(person_dets, frame=frame)

        for track in tracks:
            if not track.is_confirmed():
                continue
            track_id = track.track_id
            l, t, w, h = track.to_ltrb()
            results_list.append({
                "id": track_id,
                "video": video_name,
                "frame": frame_num,
                "bbox_x": int(l),
                "bbox_y": int(t),
                "bbox_w": int(w - l),
                "bbox_h": int(h - t)
            })

        frame_num += 1

        del frame, detections, person_dets, tracks
        gc.collect()

    cap.release()
    print(f"Done with {video_name}")

# Save CSV in Colab's file system
csv_path = "/content/results_raw.csv"
df = pd.DataFrame(results_list)
df.to_csv(csv_path, index=False)
print(f"Tracking done. Saved raw CSV to {csv_path}")


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
0: 384x640 5 persons, 153.3ms
Speed: 2.3ms preprocess, 153.3ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 persons, 162.3ms
Speed: 4.1ms preprocess, 162.3ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 persons, 147.9ms
Speed: 3.1ms preprocess, 147.9ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 152.9ms
Speed: 2.2ms preprocess, 152.9ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 169.6ms
Speed: 2.6ms preprocess, 169.6ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 151.3ms
Speed: 3.3ms preprocess, 151.3ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 155.4ms
Speed: 2.5ms preprocess, 155.4ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 1 tv, 149.

In [None]:
from google.colab import filesss
files.download("/content/results_raw.csv")


In [10]:
!pip install -q facenet-pytorch

from facenet_pytorch import InceptionResnetV1
import torch
from torchvision import transforms
from PIL import Image
import numpy as np

# Load pretrained FaceNet model
device = 'cuda' if torch.cuda.is_available() else 'cpu'
facenet = InceptionResnetV1(pretrained='vggface2').eval().to(device)

# Preprocessing transform
face_preprocess = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m25.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m51.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m53.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m755.6/755.6 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m410.6/410.6 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.1/14.1 MB[0m [31m97.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.7/23.7 MB[0m [31m79.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

  0%|          | 0.00/107M [00:00<?, ?B/s]

In [11]:
from collections import defaultdict
from PIL import Image
import torch
import numpy as np
import os
import cv2
import pandas as pd
from tqdm import tqdm

# Ensure face_embeddings and face detection logic
face_embeddings = defaultdict(list)  # {id: [(video_name, embedding)]}
face_detector = model  # YOLOv8 used for face approximation

# CSV path for Colab
csv_path = "/content/results_raw.csv"
extract_path = "/content"  # or your Google Drive folder

def get_face_embedding(frame, bbox):
    x, y, w, h = bbox
    face_img = frame[int(y):int(y + h), int(x):int(x + w)]
    try:
        face = Image.fromarray(cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB))
        face = face_preprocess(face).unsqueeze(0).to(device)
        embedding = facenet(face).detach().cpu().numpy().flatten()
        return embedding
    except Exception as e:
        print("Embedding failed:", e)
        return None

# Load results CSV
df = pd.read_csv(csv_path)
grouped = df.groupby(['video', 'id'])

for (video_name, pid), group in tqdm(grouped):
    video_path = os.path.join(extract_path, video_name)
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Cannot open video: {video_name}")
        continue

    # Sample the middle frame for this person's track
    frame_no = int(group.iloc[len(group)//2]['frame'])
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)
    ret, frame = cap.read()

    if not ret:
        print(f"Could not read frame {frame_no} in {video_name}")
        cap.release()
        continue

    bbox = group.iloc[len(group)//2][['bbox_x', 'bbox_y', 'bbox_w', 'bbox_h']].values
    emb = get_face_embedding(frame, bbox)

    if emb is not None:
        face_embeddings[pid].append((video_name, emb))

    cap.release()

print("Face embeddings extracted for all identities.")


  1%|▏         | 1/77 [00:00<00:13,  5.76it/s]

Embedding failed: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/color.cpp:199: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'



 58%|█████▊    | 45/77 [00:15<00:05,  5.41it/s]

Embedding failed: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/color.cpp:199: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'

Embedding failed: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/color.cpp:199: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'

Embedding failed: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/color.cpp:199: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'



 77%|███████▋  | 59/77 [00:20<00:05,  3.10it/s]

Embedding failed: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/color.cpp:199: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'

Embedding failed: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/color.cpp:199: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'



 78%|███████▊  | 60/77 [00:20<00:04,  3.54it/s]

Embedding failed: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/color.cpp:199: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'



100%|██████████| 77/77 [00:28<00:00,  2.66it/s]

Face embeddings extracted for all identities.





In [14]:
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd

# Assuming face_embeddings is already available and looks like:
# face_embeddings = defaultdict(list)  # {id: [(video_name, embedding)]}

# Build embedding matrix
id_embeddings = []

for pid, embeddings in face_embeddings.items():
    for video_name, emb in embeddings:
        id_embeddings.append((pid, video_name, emb))

global_id_map = {}
next_global_id = 1

# Loop through all pairs of embeddings
for i, (pid1, vid1, emb1) in enumerate(id_embeddings):
    if pid1 in global_id_map:
        continue
    global_id_map[pid1] = next_global_id
    for j in range(i+1, len(id_embeddings)):
        pid2, vid2, emb2 = id_embeddings[j]
        if pid2 in global_id_map:
            continue
        # Compute cosine similarity
        sim = cosine_similarity([emb1], [emb2])[0][0]
        if sim > 0.8:  # Threshold can be tuned
            global_id_map[pid2] = next_global_id
    next_global_id += 1

# Convert global ID mapping to a DataFrame for easier inspection
df_global_ids = pd.DataFrame(list(global_id_map.items()), columns=['pid', 'global_id'])

# You can then inspect the mapping
df_global_ids.head()


Unnamed: 0,pid,global_id
0,33,1
1,47,1
2,86,1
3,92,1
4,128,1


In [15]:
import pandas as pd

# Load the CSV file containing results (make sure it's uploaded to Colab or mounted in Google Drive)
df = pd.read_csv("/content/results_raw.csv")  # Path to the raw CSV file

# Map the 'id' column to global_id using the previously created global_id_map
df['global_id'] = df['id'].map(global_id_map)

# Drop the old 'id' column and rename 'global_id' to 'id'
df = df.drop(columns=['id'])
df = df.rename(columns={'global_id': 'id'})

# Save the modified DataFrame as a new CSV file
df.to_csv("/content/results.csv", index=False)

print("Final CSV with global IDs saved as results.csv")


✅ Final CSV with global IDs saved as results.csv


In [46]:
from google.colab import files
uploaded = files.upload()  # This will allow you to upload files


Saving Video 1.mp4 to Video 1.mp4
Saving Video 2.mp4 to Video 2.mp4
Saving Video 3.mp4 to Video 3.mp4
Saving Video 4.mp4 to Video 4.mp4


In [47]:
import os
import cv2
import pandas as pd

# Set the path where you want to store the annotated videos
output_video_path = "/content/annotated_videos"  # Path to save annotated videos
os.makedirs(output_video_path, exist_ok=True)

# Load the final results CSV with global IDs
df = pd.read_csv("/content/results.csv")  # Make sure this is the correct path to your CSV file

# Path to the extracted videos (update this with the correct path where your videos are located)
extract_path = "/content"  # Update this if the video is located elsewhere

# Iterate through each unique video in the DataFrame and annotate
for video_name in df['video'].unique():
    video_path = os.path.join(extract_path, video_name)  # Get the full video file path
    cap = cv2.VideoCapture(video_path)

    # Check if the video file is opened successfully
    if not cap.isOpened():
        print(f"Failed to open video: {video_path}")
        continue

    # Get video properties (e.g., width, height, fps)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Create a VideoWriter to save the annotated video
    out = cv2.VideoWriter(os.path.join(output_video_path, f"annotated_{video_name}"),
                          cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    frame_num = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break  # Exit the loop if no more frames are available

        # Draw bounding boxes and IDs on the frame
        for _, row in df[(df['video'] == video_name) & (df['frame'] == frame_num)].iterrows():
            x, y, w, h = row['bbox_x'], row['bbox_y'], row['bbox_w'], row['bbox_h']
            track_id = row['id']
            # Draw a green rectangle and the track ID
            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
            cv2.putText(frame, f'ID: {track_id}', (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        # Write the annotated frame to the output video file
        out.write(frame)
        frame_num += 1

    # Release the video objects
    cap.release()
    out.release()

    print(f"Annotated video saved for {video_name} in {output_video_path}")

print("All annotated videos have been generated and saved.")


✅ Annotated video saved for Video 4.mp4 in /content/annotated_videos
✅ Annotated video saved for Video 1.mp4 in /content/annotated_videos
✅ Annotated video saved for Video 3.mp4 in /content/annotated_videos
✅ Annotated video saved for Video 2.mp4 in /content/annotated_videos
✅ All annotated videos have been generated and saved.


In [48]:
print(os.listdir("/content/annotated_videos"))


['annotated_Video 1.mp4', 'annotated_Video 2.mp4', 'annotated_Video 3.mp4', 'annotated_Video 4.mp4']


In [51]:
import shutil

# Path to the folder containing annotated videos
output_video_path = "/content/annotated_videos"

# Create a ZIP archive of the annotated videos folder
shutil.make_archive("/content/annotated_videos", 'zip', output_video_path)

print("Annotated videos have been zipped successfully.")


✅ Annotated videos have been zipped successfully.


In [52]:
from google.colab import files

# Download the ZIP file
files.download("/content/annotated_videos.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [49]:
from google.colab import files

# Path to your zip file
zip_file_path = '/content/final_deliverables.zip'

# Download the zip file
files.download(zip_file_path)
import os
import shutil

video_src = '/content/annotated_videos'
dst_path = os.path.join('/content/final_deliverables', 'annotated_videos')

# Check if the source directory exists
if os.path.exists(video_src):
    # If the destination directory doesn't exist, create it
    os.makedirs(dst_path, exist_ok=True)

    # Copy individual files from annotated_videos to the final destination
    for filename in os.listdir(video_src):
        src_file = os.path.join(video_src, filename)
        dst_file = os.path.join(dst_path, filename)
        if os.path.isfile(src_file):  # Only copy files (not directories)
            shutil.copy(src_file, dst_file)
    print("Annotated videos copied.")
else:
    print("No annotated videos to copy.")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ Annotated videos copied.


In [50]:
import shutil

# Create a zip of the final deliverables folder
final_deliverables_path = '/content/final_deliverables'
zip_file_path = '/content/final_deliverables.zip'

# Make sure the final deliverables folder exists
if os.path.exists(final_deliverables_path):
    shutil.make_archive(zip_file_path.replace('.zip', ''), 'zip', final_deliverables_path)
    print(f"Final deliverables zipped: {zip_file_path}")
else:
    print("Final deliverables folder not found.")


✅ Final deliverables zipped: /content/final_deliverables.zip
