In [5]:
import cv2
import os
import sys
import numpy as np
import joblib
from insightface.app import FaceAnalysis
import firebase_admin
from firebase_admin import credentials, firestore
from datetime import datetime
import pytz

# === Attendance Threshold Setting ===
ATTENDANCE_THRESHOLD = 0.7  # 70% of session time required to be marked Present

# === Silent-Face-Anti-Spoofing Imports ===
sys.path.append('./Silent-Face-Anti-Spoofing')
from src.anti_spoof_predict import AntiSpoofPredict
from src.generate_patches import CropImage
from src.utility import parse_model_name

# === Firebase Setup ===
cred = credentials.Certificate("serviceAccountKey.json")
if not firebase_admin._apps:
    firebase_admin.initialize_app(cred)
db = firestore.client()

# === Face Recognition Setup ===
model_path = 'svm_face_recognition_model_v4.joblib'
data_dir = 'data/preprocessed'
classifier = joblib.load(model_path)
class_names = sorted(os.listdir(data_dir))

app = FaceAnalysis(name='antelopev2', providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
app.prepare(ctx_id=0, det_size=(640, 640))


# === Anti-Spoofing Check ===
def is_real_face(frame, model_dir="./Silent-Face-Anti-Spoofing/resources/anti_spoof_models", device_id=0):
    model_test = AntiSpoofPredict(device_id)
    image_cropper = CropImage()

    try:
        height, width, _ = frame.shape
        if height == 0 or width == 0:
            print("⚠️ Invalid face image dimensions.")
            return False, 0.0
    except Exception as e:
        print("⚠️ Error reading image shape:", e)
        return False, 0.0

    # keep aspect similar to model’s expected input
    if width / height != 3 / 4:
        new_width = int(height * 3 / 4)
        frame = cv2.resize(frame, (new_width, height))

    try:
        image_bbox = model_test.get_bbox(frame)
        if image_bbox[2] == 0 or image_bbox[3] == 0:
            print("⚠️ Invalid bounding box.")
            return False, 0.0
    except Exception as e:
        print("⚠️ Error getting bounding box:", e)
        return False, 0.0

    prediction = np.zeros((1, 3))
    for model_name in os.listdir(model_dir):
        h_input, w_input, model_type, scale = parse_model_name(model_name)
        param = {
            "org_img": frame,
            "bbox": image_bbox,
            "scale": scale,
            "out_w": w_input,
            "out_h": h_input,
            "crop": True if scale else False,
        }
        img = image_cropper.crop(**param)
        prediction += model_test.predict(img, os.path.join(model_dir, model_name))

    label = np.argmax(prediction)
    confidence = prediction[0][label] / 2
    return label == 1, confidence  # True if REAL (label 1)


# === Firebase Attendance Helpers ===
def get_active_session():
    tz = pytz.timezone("Africa/Mogadishu")
    now = datetime.now(tz)

    faculties = db.collection("faculties").stream()
    for fac in faculties:
        departments = db.collection("faculties").document(fac.id).collection("departments").stream()
        for dep in departments:
            classes = db.collection("faculties").document(fac.id).collection("departments").document(dep.id).collection("classes").stream()
            for cls in classes:
                courses = db.collection("faculties").document(fac.id).collection("departments").document(dep.id).collection("classes").document(cls.id).collection("courses").stream()
                for course in courses:
                    # NOTE: If Firestore complains about 2 range filters on different fields,
                    # you may need to fetch with one inequality and filter the other in memory.
                    sessions = db.collection("faculties").document(fac.id) \
                        .collection("departments").document(dep.id) \
                        .collection("classes").document(cls.id) \
                        .collection("courses").document(course.id) \
                        .collection("sessions") \
                        .where("start_time", "<=", now) \
                        .where("end_time", ">=", now).stream()

                    for session in sessions:
                        data = session.to_dict()
                        data.update({
                            "id": session.id,
                            "faculty_id": fac.id,
                            "department_id": dep.id,
                            "class_id": cls.id,
                            "course_id": course.id
                        })
                        print(f"📚 Found session: {data.get('subject')} in {cls.id} "
                              f"from {data.get('start_time')} to {data.get('end_time')}")
                        return data
    return None


def mark_attendance(student_name, session_meta):
    now = datetime.now(pytz.timezone("Africa/Mogadishu"))
    session_id = session_meta["id"]
    attendance_ref = db.collection("faculties").document(session_meta["faculty_id"]) \
        .collection("departments").document(session_meta["department_id"]) \
        .collection("classes").document(session_meta["class_id"]) \
        .collection("courses").document(session_meta["course_id"]) \
        .collection("sessions").document(session_id) \
        .collection("attendance")

    existing = attendance_ref.where("student_name", "==", student_name).limit(1).stream()
    for doc in existing:
        return doc.id

    doc_ref = attendance_ref.document()
    doc_ref.set({
        "student_name": student_name,
        "entry_time": now,
        "exit_time": None,
        "duration": None,
        "status": None
    })
    return doc_ref.id


def finalize_attendance(active_sessions, session_meta):
    tz = pytz.timezone("Africa/Mogadishu")
    attendance_ref = db.collection("faculties").document(session_meta["faculty_id"]) \
        .collection("departments").document(session_meta["department_id"]) \
        .collection("classes").document(session_meta["class_id"]) \
        .collection("courses").document(session_meta["course_id"]) \
        .collection("sessions").document(session_meta["id"]) \
        .collection("attendance")

    session_start = session_meta["start_time"].astimezone(tz)
    session_end = session_meta["end_time"].astimezone(tz)
    session_duration = (session_end - session_start).total_seconds() / 60
    required_minutes = session_duration * ATTENDANCE_THRESHOLD

    for name, data in active_sessions.items():
        doc_id = data["doc_id"]
        last_seen = data["last_seen"]
        doc_ref = attendance_ref.document(doc_id)
        entry_time = doc_ref.get().to_dict().get("entry_time")

        if entry_time and last_seen:
            duration = (last_seen - entry_time).total_seconds() / 60
            status = "Present" if duration >= required_minutes else "Absent"
            doc_ref.update({
                "exit_time": last_seen,
                "duration": duration,
                "status": status
            })
            print(f"📝 {name}: Duration = {duration:.1f} min → {status} "
                  f"(Required: {required_minutes:.1f} min)")


def mark_absent_students(session_meta):
    print("🔎 Marking absent students...")
    students_ref = db.collection("faculties").document(session_meta["faculty_id"]) \
        .collection("departments").document(session_meta["department_id"]) \
        .collection("classes").document(session_meta["class_id"]) \
        .collection("students")

    attendance_ref = db.collection("faculties").document(session_meta["faculty_id"]) \
        .collection("departments").document(session_meta["department_id"]) \
        .collection("classes").document(session_meta["class_id"]) \
        .collection("courses").document(session_meta["course_id"]) \
        .collection("sessions").document(session_meta["id"]) \
        .collection("attendance")

    all_students = [doc.to_dict()["full_name"] for doc in students_ref.stream()]
    present_students = [doc.to_dict()["student_name"] for doc in attendance_ref.stream()]

    for name in all_students:
        if name not in present_students:
            attendance_ref.add({
                "student_name": name,
                "entry_time": None,
                "exit_time": None,
                "duration": 0,
                "status": "Absent"
            })
            print(f"❌ {name} marked absent.")


# === MAIN ===
if __name__ == "__main__":
    active_session = get_active_session()

    if not active_session:
        print("❌ No active session found.")
    else:
        print(f"✅ Active session: {active_session['subject']}")
        active_sessions = {}
        tz = pytz.timezone("Africa/Mogadishu")
        session_end = active_session["end_time"].astimezone(tz)

        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            print("❌ Unable to open camera.")
        else:
            while True:
                ret, frame = cap.read()
                if not ret:
                    print("⚠️ Failed to read frame from camera.")
                    break

                now = datetime.now(tz)
                if now >= session_end:
                    print("⏰ Session has ended.")
                    break

                # InsightFace expects RGB
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                faces = app.get(frame_rgb)

                for face in faces:
                    x1, y1, x2, y2 = map(int, face['bbox'])
                    # Ensure bounding box is inside frame
                    x1 = max(x1, 0); y1 = max(y1, 0)
                    x2 = min(x2, frame.shape[1]); y2 = min(y2, frame.shape[0])
                    if x2 <= x1 or y2 <= y1:
                        continue

                    face_img = frame[y1:y2, x1:x2]
                    real, score = is_real_face(face_img)

                    # Defaults
                    label = "Unknown"
                    color = (0, 0, 255)  # Red for Unknown/Spoof
                    confidence = 0.0

                    if not real:
                        # 🚨 SPOOF DETECTED
                        label = "Spoof Attack"
                        color = (0, 0, 255)  # Red
                    else:
                        # Recognize
                        embedding = face['embedding'].reshape(1, -1)
                        if hasattr(classifier, "predict_proba"):
                            probs = classifier.predict_proba(embedding)[0]
                            best_idx = np.argmax(probs)
                            confidence = probs[best_idx]
                            if confidence > 0.3:
                                label = class_names[best_idx]
                                color = (0, 255, 0)  # Green
                        else:
                            decision = classifier.decision_function(embedding)[0]
                            best_idx = np.argmax(decision)
                            confidence = decision[best_idx]
                            if confidence > 0.3:
                                label = class_names[best_idx]
                                color = (0, 255, 0)  # Green

                        # Mark attendance only for REAL recognized faces
                        if label != "Unknown":
                            if label not in active_sessions:
                                doc_id = mark_attendance(label, active_session)
                                active_sessions[label] = {"doc_id": doc_id, "last_seen": now}
                                print(f"✅ {label} marked present at {now.time()}")
                            else:
                                active_sessions[label]["last_seen"] = now

                    # Draw rectangle and label (Name / Unknown / Spoof Attack)
                    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                    display_text = f"{label} ({confidence:.2f})" if label not in ["Unknown", "Spoof Attack"] else label
                    cv2.putText(frame, display_text, (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

                cv2.imshow("Attendance System", frame)
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    break

            cap.release()
            cv2.destroyAllWindows()

        # Finalize after video loop
        finalize_attendance(active_sessions, active_session)
        mark_absent_students(active_session)


Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\abdirahman/.insightface\models\antelopev2\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\abdirahman/.insightface\models\antelopev2\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\abdirahman/.insightface\models\antelopev2\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\abdirahman/.insightface\models\antelopev2\glintr100.onnx recognition ['None', 3, 112, 112] 127.5 127.5
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\abdirahman/.insightface\models\antelopev2\scrfd_10g_bnkp

  .where("end_time", ">=", now).stream()


📚 Found session: Ethics in 542nAO1705nO3HBEGWyH from 2025-08-11 08:27:12.737000+00:00 to 2025-08-11 08:47:12.737000+00:00
✅ Active session: Ethics


  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  return query.where(field_path, op_string, value)


✅ abdirahman_muse_adan marked present at 11:28:20.810780


  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  result = F.softmax(result).cpu().numpy()
  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4
  r

📝 abdirahman_muse_adan: Duration = 2.0 min → Absent (Required: 14.0 min)
🔎 Marking absent students...
❌ mohamed_bashiir_ali marked absent.
❌ mohamed_kasim_hussein marked absent.
❌ hanad_ahmed_mohamed marked absent.
❌ abdulahi_abdirahman_isse marked absent.
❌ salah_subane marked absent.
❌ marwaan marked absent.
❌ abdimajiid_mohamed_dahir marked absent.
❌ ayup_mohamed_abshir marked absent.
❌ khalid marked absent.
❌ sakariye_mohamed_zubeyr marked absent.
❌ mohamed_kasim_hussein marked absent.
❌ zakariye_said_abdulahi marked absent.
❌ abdirahman_hassan_mohamud marked absent.
