<a href="https://colab.research.google.com/github/SB24-28-48-ss/Face_Recogn_demo/blob/main/final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install deepface opencv-python-headless pandas scikit-learn pytz

import cv2
import sqlite3
import numpy as np
import pandas as pd
import os
from datetime import datetime, timedelta
from deepface import DeepFace
import pytz
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# ----------------- DATABASE SETUP -----------------
def init_db():
    conn = sqlite3.connect("attendance.db")
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS users
                 (id INTEGER PRIMARY KEY AUTOINCREMENT,
                  name TEXT UNIQUE,
                  embedding BLOB)''')
    c.execute('''CREATE TABLE IF NOT EXISTS attendance
                 (id INTEGER PRIMARY KEY AUTOINCREMENT,
                  user_id INTEGER,
                  timestamp TEXT,
                  FOREIGN KEY(user_id) REFERENCES users(id))''')
    conn.commit()
    conn.close()

# ----------------- ENROLLMENT -----------------
def enroll_all_from_folder(base_folder):
    conn = sqlite3.connect("attendance.db")
    c = conn.cursor()

    for person_name in os.listdir(base_folder):
        person_folder = os.path.join(base_folder, person_name)
        if not os.path.isdir(person_folder):
            continue

        print(f"[INFO] Enrolling {person_name}...")

        for img_file in os.listdir(person_folder):
            img_path = os.path.join(person_folder, img_file)
            try:
                result = DeepFace.represent(img_path=img_path, model_name="Facenet")[0]["embedding"]
                embedding = np.array(result, dtype=np.float32).tobytes()

                c.execute("INSERT OR IGNORE INTO users (name, embedding) VALUES (?, ?)", (person_name, embedding))
                conn.commit()
            except Exception as e:
                print(f"[WARNING] Skipping {img_file}: {e}")

    conn.close()

# ----------------- ATTENDANCE -----------------
def mark_attendance(name):
    conn = sqlite3.connect("attendance.db")
    c = conn.cursor()
    c.execute("SELECT id FROM users WHERE name=?", (name,))
    user = c.fetchone()

    if user:
        user_id = user[0]
        now = datetime.now()
        cutoff_time = now - timedelta(minutes=8)

        # Check if already marked within the last 55 minutes
        c.execute("SELECT timestamp FROM attendance WHERE user_id=? ORDER BY timestamp DESC LIMIT 1", (user_id,))
        last_record = c.fetchone()

        if last_record:
            last_time = datetime.strptime(last_record[0], "%Y-%m-%d %H:%M:%S.%f")
            if last_time > cutoff_time:
                print(f"[ATTENDANCE] {name} already marked within last 8 minutes, skipping.")
                conn.close()
                return

        # Insert new attendance
        timestamp = now.strftime("%Y-%m-%d %H:%M:%S.%f")
        c.execute("INSERT INTO attendance (user_id, timestamp) VALUES (?, ?)", (user_id, timestamp))
        conn.commit()
        print(f"[ATTENDANCE] Marked attendance for {name} at {timestamp}")
    else:
        print(f"[ATTENDANCE] User {name} not found in database, cannot mark attendance.")

    conn.close()

# ----------------- RECOGNITION -----------------
def recognize_from_video(video_path):
    conn = sqlite3.connect("attendance.db")
    c = conn.cursor()
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"[ERROR] Could not open video file: {video_path}")
        conn.close()
        return

    print(f"[INFO] Processing video: {video_path}")
    frame_count = 0
    recognition_threshold = 10.0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        if frame_count > 30:  # process only 30 frames for testing
            break

        try:
            detections = DeepFace.extract_faces(frame, detector_backend="opencv", enforce_detection=False)
            if not detections:
                continue

            for i, face in enumerate(detections):
                x = face["facial_area"]["x"]
                y = face["facial_area"]["y"]
                w = face["facial_area"]["w"]
                h = face["facial_area"]["h"]

                if w > 0 and h > 0:
                    face_img = frame[y:y+h, x:x+w]
                    try:
                        representations = DeepFace.represent(face_img, model_name="Facenet", enforce_detection=False)
                        if not representations:
                            continue
                        rep = representations[0]["embedding"]
                    except Exception as embed_e:
                        print(f"[WARNING] Could not generate embedding: {embed_e}")
                        continue

                    min_dist = float("inf")
                    best_match = None
                    for row in c.execute("SELECT name, embedding FROM users"):
                        db_name, db_embedding = row
                        db_embedding = np.frombuffer(db_embedding, dtype=np.float32)
                        dist = np.linalg.norm(np.array(db_embedding) - np.array(rep))
                        if dist < min_dist:
                            min_dist = dist
                            best_match = db_name

                    if best_match and min_dist < recognition_threshold:
                        mark_attendance(best_match)

        except Exception as e:
            print(f"[ERROR] Frame {frame_count} failed: {e}")

    cap.release()
    conn.close()
    print("[INFO] Video processing finished.")

# ----------------- EXPORT REPORT -----------------
def export_report():
    conn = sqlite3.connect("attendance.db")
    df = pd.read_sql_query("SELECT u.name, a.timestamp FROM attendance a JOIN users u ON a.user_id=u.id", conn)
    tz = pytz.timezone('Asia/Kolkata')
    df["timestamp"]= pd.to_datetime(df["timestamp"], format="%Y-%m-%d %H:%M:%S.%f", errors='coerce').apply(
        lambda t: t.tz_localize('UTC').astimezone(tz).strftime("%Y-%m-%d %H:%M:%S") if pd.notnull(t) else None
    )
    conn.close()
    os.makedirs("reports", exist_ok=True)
    report_path = "reports/attendance_report.csv"
    df.to_csv(report_path, index=False)
    print(f"[INFO] Report exported to {report_path}")
    display(df)

# ----------------- MAIN -----------------
if __name__ == "__main__":
    init_db()
    enroll_all_from_folder("/content/drive/MyDrive/Photo")   # 📂 Your photo folder path
    video_file_path = "/content/drive/MyDrive/Video/VID_20250912_002956901.mp4"   # 🎥 Your video path
    recognize_from_video(video_file_path)
    export_report()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[INFO] Enrolling Virat Kohli ...
[INFO] Enrolling MS Dhoni...
[INFO] Enrolling Lionel Messi ...
[INFO] Enrolling Cristiano Ronaldo ...
[INFO] Enrolling SubhaSs...
[INFO] Processing video: /content/drive/MyDrive/Video/VID_20250912_002956901.mp4
[ATTENDANCE] Marked attendance for SubhaSs at 2025-09-12 18:45:40.541005
[ATTENDANCE] SubhaSs already marked within last 8 minutes, skipping.
[ATTENDANCE] SubhaSs already marked within last 8 minutes, skipping.
[ATTENDANCE] SubhaSs already marked within last 8 minutes, skipping.
[ATTENDANCE] SubhaSs already marked within last 8 minutes, skipping.
[ATTENDANCE] SubhaSs already marked within last 8 minutes, skipping.
[ATTENDANCE] SubhaSs already marked within last 8 minutes, skipping.
[ATTENDANCE] SubhaSs already marked within last 8 minutes, skipping.
[ATTENDANCE] SubhaSs already marked within last 8 minutes, skipping.
[A

Unnamed: 0,name,timestamp
0,SubhaSs,2025-09-13 00:04:35
1,SubhaSs,2025-09-13 00:15:40
