In [None]:
# Mount Google Drive and install required packages
from google.colab import drive
drive.mount('/content/drive')

!pip install facenet-pytorch ultralytics opencv-python-headless

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Import required libraries
import os
import cv2
import numpy as np
import pandas as pd
from datetime import datetime
from facenet_pytorch import InceptionResnetV1, extract_face
from ultralytics import YOLO

Set up paths and directories

In [None]:
# Set up paths and directories
# Update these paths to match your Google Drive structure
EMBEDDING_FOLDER = '/content/drive/MyDrive/Attandance_tracking/embeddings'
VIDEO_FOLDER = '/content/drive/MyDrive/Attandance_tracking/uploaded_videos'
CSV_FILE = '/content/drive/MyDrive/Attandance_tracking/matches.csv'
OUTPUT_VIDEO_FILE = '/content/drive/MyDrive/Attandance_tracking/output_video.mp4'

def create_directories_if_not_exist():
    if not os.path.exists(EMBEDDING_FOLDER):
        os.makedirs(EMBEDDING_FOLDER)
    if not os.path.exists(VIDEO_FOLDER):
        os.makedirs(VIDEO_FOLDER)

create_directories_if_not_exist()

In [None]:
#  Initialize models
model = YOLO('yolov8n.pt')
face_embedder = InceptionResnetV1(pretrained='vggface2').eval()

In [None]:
# Function to calculate the cosine similarity between two embeddings
def cosine_similarity(emb1, emb2):
    return np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))

# Load image embeddings from the first phase
image_embeddings = {}
for file in os.listdir(EMBEDDING_FOLDER):
    if file.endswith(".npy"):
        name = os.path.splitext(file)[0].split('_')[0]
        image_embeddings[name] = np.load(os.path.join(EMBEDDING_FOLDER, file))

# Function to delete existing CSV file
def delete_csv_file():
    if os.path.exists(CSV_FILE):
        os.remove(CSV_FILE)

In [None]:
# Load image embeddings
image_embeddings = {}
for file in os.listdir(EMBEDDING_FOLDER):
    if file.endswith(".npy"):
        name = os.path.splitext(file)[0].split('_')[0]
        image_embeddings[name] = np.load(os.path.join(EMBEDDING_FOLDER, file))

print(f"Loaded {len(image_embeddings)} embeddings.")

Loaded 3 embeddings.


In [None]:
# Define the video processing function
def process_video(video_path):
    delete_csv_file()

    video_capture = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'vp80')
    frame_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))

    output_video_file = os.path.join(VIDEO_FOLDER, 'output_video.webm')
    out = cv2.VideoWriter(output_video_file, fourcc, 20.0, (frame_width, frame_height))

    matches = {}
    assigned_names = set()  # Set to keep track of assigned names
    face_trackers = {}  # Dictionary to store face trackers

    tracker_results = model.track(video_path, tracker='bytetrack.yaml', show=False)

    for frame_idx, result in enumerate(tracker_results):
        frame_rgb = result.orig_img
        frame_rgb = cv2.cvtColor(frame_rgb, cv2.COLOR_BGR2RGB)
        boxes = result.boxes.xyxy.cpu().numpy()
        track_ids = result.boxes.id.cpu().numpy() if result.boxes.id is not None else None

        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        for i, (x1, y1, x2, y2) in enumerate(boxes):
            box = [int(x1), int(y1), int(x2), int(y2)]
            track_id = int(track_ids[i]) if track_ids is not None else None

            face = extract_face(frame_rgb, box)

            if face is not None and face.size(1) > 0 and face.size(2) > 0:
                face = face.unsqueeze(0)
                face_embedding = face_embedder(face).detach().numpy().flatten()
                video_embedding = face_embedding

                if track_id in face_trackers:
                    best_match = face_trackers[track_id]['name']
                else:
                    best_match = None
                    best_similarity = 0

                    for name, image_embedding in image_embeddings.items():
                        if name not in assigned_names:  # Only consider unassigned names
                            similarity = cosine_similarity(image_embedding, video_embedding)
                            if similarity > best_similarity:
                                best_similarity = similarity
                                best_match = name

                    if best_similarity > 0.8:  # Adjust this threshold as needed
                        assigned_names.add(best_match)  # Add to assigned names
                        face_trackers[track_id] = {'name': best_match, 'embedding': video_embedding}
                    else:
                        best_match = f"Unknown_{track_id}"
                        face_trackers[track_id] = {'name': best_match, 'embedding': video_embedding}

                if best_match.startswith("Unknown_"):
                    label_color = (255, 0, 0)  # Red for unknown faces
                else:
                    label_color = (0, 255, 0)  # Green for known faces
                    if best_match not in matches:
                        matches[best_match] = {'entry_time': current_time, 'exit_time': current_time}
                    else:
                        matches[best_match]['exit_time'] = current_time

                # Draw rectangle around the face
                cv2.rectangle(frame_rgb, (box[0], box[1]), (box[2], box[3]), label_color, 2)

                # Add name above the bounding box for matched faces
                cv2.putText(frame_rgb, best_match, (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, label_color, 2)

        # Write frame to output video
        out.write(cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR))

    video_capture.release()
    out.release()

    # Save matches to CSV (only known faces)
    df = pd.DataFrame([(name, times['entry_time'], times['exit_time']) for name, times in matches.items()],
                      columns=['Name', 'Entry Time', 'Exit Time'])
    df.index = df.index + 1
    df.to_csv(CSV_FILE, index=False)

    return output_video_file

Upload a video file from your system. After processing, the output video will be saved in the uploaded_videos directory, and the CSV file will be saved in the Attandance_tracking directory.

In [None]:
# Upload video file from your system
uploaded = files.upload()

for filename in uploaded.keys():
    video_path = os.path.join(VIDEO_FOLDER, filename)
    with open(video_path, "wb") as f:
        f.write(uploaded[filename])

    output_video_file = process_video(video_path)

    # Download the processed CSV file
    files.download(CSV_FILE)

Saving 10002892175.mp4 to 10002892175 (4).mp4


errors for large sources or long-running streams and videos. See https://docs.ultralytics.com/modes/predict/ for help.

Example:
    results = model(source=..., stream=True)  # generator of Results objects
    for r in results:
        boxes = r.boxes  # Boxes object for bbox outputs
        masks = r.masks  # Masks object for segment masks outputs
        probs = r.probs  # Class probabilities for classification outputs

video 1/1 (frame 1/317) /content/drive/MyDrive/Attandance_tracking/uploaded_videos/10002892175 (4).mp4: 384x640 4 persons, 149.9ms
video 1/1 (frame 2/317) /content/drive/MyDrive/Attandance_tracking/uploaded_videos/10002892175 (4).mp4: 384x640 4 persons, 245.6ms
video 1/1 (frame 3/317) /content/drive/MyDrive/Attandance_tracking/uploaded_videos/10002892175 (4).mp4: 384x640 4 persons, 249.7ms
video 1/1 (frame 4/317) /content/drive/MyDrive/Attandance_tracking/uploaded_videos/10002892175 (4).mp4: 384x640 4 persons, 230.8ms
vi

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>