In [None]:
# %pip install opencv-python deepface mysql-connector-python numpy scipy ulid-py requests

In [None]:
import cv2
from deepface import DeepFace
import mysql.connector
import os
import time
from datetime import datetime
import socket
import numpy as np
import json
from scipy.spatial.distance import cosine
import ulid  # For ULID face IDs
import requests # added to send images to API
import time

In [None]:
def detect_faces_in_frame(frame, scaleFactor=1.1, minNeighbors=5, minSize=(60, 60)):
    """Return list of (x,y,w,h) for detected faces in BGR frame using OpenCV Haar cascade."""
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
    face_cascade = cv2.CascadeClassifier(cascade_path)
    faces = face_cascade.detectMultiScale(gray, scaleFactor=scaleFactor, minNeighbors=minNeighbors, minSize=minSize)
    return faces

def should_capture_frame(frame, min_face_area=2000):
    """Return (bool, faces). True only if at least one detected face is larger than min_face_area."""
    faces = detect_faces_in_frame(frame)
    if len(faces) == 0:
        return False, []
    big_faces = [f for f in faces if f[2] * f[3] >= min_face_area]
    return (len(big_faces) > 0), big_faces

# Usage example (call this in your capture loop BEFORE saving/sending):
# ret, frame = cap.read()
# should_capture, faces = should_capture_frame(frame)
# if should_capture:
#     # crop first face (or iterate faces) and encode/send
#     x, y, w, h = faces[0]
#     face_img = frame[y:y+h, x:x+w]
#     ret, buf = cv2.imencode('.jpg', face_img)
#     if ret:
#         img_bytes = buf.tobytes()
#         ok, body = send_image_to_api_safe(img_bytes, pc_name=socket.gethostname())
#         # handle ok/body as needed
# else:
#     # no face -> skip saving/sending
#     pass

# ==== Configuration ====


In [None]:
CHECK_INTERVAL = 10
MAX_CAMERA_USE_TIME = 8
CAPTURED_FACES_DIR = "captured_faces"
EMBEDDING_MODEL = "ArcFace"
MATCH_THRESHOLD = 0.45  # Adjust as needed
API_URL = "http://127.0.0.1:8000/upload-face"  # endpoint to send captured images
CLIENT_API_KEY = os.getenv('API_KEY', 'replace-me-with-a-secure-key')  # used for X-API-Key header

# ==== Ensure storage directory exists ====


In [None]:
def ensure_dir(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

# ==== Get Embeddings DB ====

In [None]:
def get_embeddings_db(cursor):
    cursor.execute("SELECT face_id, embedding FROM captured_snapshots WHERE embedding IS NOT NULL")
    known = []
    for face_id, emb_json in cursor.fetchall():
        try:
            emb = np.array(json.loads(emb_json), dtype=np.float64)
            known.append((face_id, emb))
        except Exception:
            continue
    return known

# ==== Get Face Embedding ====

In [None]:
def get_face_embedding(face_img):
    try:
        # DeepFace expects a file path or numpy array (BGR)
        reps = DeepFace.represent(face_img, model_name=EMBEDDING_MODEL, enforce_detection=False)
        if reps and isinstance(reps, list):
            emb = reps[0]['embedding']
            return np.array(emb, dtype=np.float64)
    except Exception as e:
        print(f"‚ùå Embedding error: {e}")
    return None

# ==== Match Face ID ====

In [None]:
def match_face_id(embedding, known, threshold=MATCH_THRESHOLD):
    for face_id, known_emb in known:
        dist = cosine(embedding, known_emb)
        if dist < threshold:
            return face_id
    return None

# ==== Generate ULID Face ID ====

In [None]:
# ==== Generate ULID Face ID ====
def generate_ulid():
    return str(ulid.new())

# ==== Assign and Reuse Face IDs ====

In [None]:
# def get_or_create_face_id(pc_name):
#     face_id_file = f"{pc_name}_face_id.txt"
#     if os.path.exists(face_id_file):
#         with open(face_id_file, "r") as f:
#             return f.read().strip()
#     else:
#         import uuid
#         face_id = f"{pc_name}_{uuid.uuid4().hex[:8]}"
#         with open(face_id_file, "w") as f:
#             f.write(face_id)
#         return face_id

# ====  Save to Database ====

In [None]:
def save_snapshot_to_db(self, face_id, pc_name, image_path, timestamp, embedding):
    sql = """
        INSERT INTO captured_snapshots (face_id, pc_name, image_path, timestamp, embedding)
        VALUES (%s, %s, %s, %s, %s)
    """
    emb_json = json.dumps(embedding.tolist()) if embedding is not None else None

    # Ensure timestamp is a datetime object
    if isinstance(timestamp, str):
        timestamp = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')

    self.cursor.execute(sql, (face_id, pc_name, image_path, timestamp, emb_json))
    self.db.commit()


# ==== Main Background Face Recognition Class ====


In [None]:
import requests

def send_image_to_api(img_bytes, api_url, pc_name, headers=None, timeout=10):
    """
    Sends image bytes to the API endpoint as multipart/form-data.
    Legacy helper used by save_snapshot; will use CLIENT_API_KEY if headers not provided.
    """
    if headers is None:
        headers = {'X-API-Key': CLIENT_API_KEY}
    files = {
        'file': ('face.jpg', img_bytes, 'image/jpeg')
    }
    data = {'pc_name': pc_name}
    resp = requests.post(api_url, files=files, data=data, headers=headers, timeout=timeout)
    resp.raise_for_status()
    try:
        return resp.json()
    except Exception:
        return resp.text

# ==== Start the service ====


In [None]:
if __name__ == "__main__":
    recognizer = None
    try:
        recognizer = BackgroundFaceCapture()
        recognizer.run()
    except KeyboardInterrupt:
        print("üõë Background service stopped manually.")
    finally:
        if recognizer is not None:
            recognizer.db.close()

In [None]:
# Helper: send image bytes to the API endpoint (robust)
import requests
import time

def send_image_to_api_safe(image_bytes, api_url=API_URL, pc_name=None, timeout=10, retries=2, backoff=1.0, headers=None):
    """Send a JPEG byte buffer to the API as multipart/form-data.
    Returns a tuple (ok:bool, body: dict|str).
    Logs every response to 'api_responses.log' with a timestamp.
    This helper does not raise on network errors; it returns (False, error_message).
    """
    if pc_name is None:
        pc_name = socket.gethostname()
    if headers is None:
        headers = {'X-API-Key': CLIENT_API_KEY}

    files = {
        'file': ('capture.jpg', image_bytes, 'image/jpeg')
    }
    data = {'pc_name': pc_name}

    last_exception = None
    for attempt in range(retries + 1):
        try:
            resp = requests.post(api_url, files=files, data=data, headers=headers, timeout=timeout)
            status = resp.status_code
            try:
                body = resp.json()
            except Exception:
                body = resp.text

            # Log response for auditing
            try:
                log_entry = {
                    'timestamp': datetime.utcnow().isoformat() + 'Z',
                    'api_url': api_url,
                    'status_code': status,
                    'body': body
                }
                with open('api_responses.log', 'a', encoding='utf-8') as f:
                    f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')
            except Exception as log_err:
                print(f"‚ö†Ô∏è Failed to write API log: {log_err}")

            return (status == 200, body)

        except requests.exceptions.RequestException as e:
            last_exception = e
            # simple backoff then retry
            time.sleep(backoff * (attempt + 1))
            continue

    # all retries failed ‚Äî return a clear failure tuple instead of raising
    err_msg = str(last_exception) if last_exception is not None else 'Unknown network error'
    try:
        with open('api_responses.log', 'a', encoding='utf-8') as f:
            f.write(json.dumps({'timestamp': datetime.utcnow().isoformat() + 'Z', 'error': err_msg}) + '\n')
    except Exception:
        pass
    return (False, err_msg)

In [None]:
# Small helper to check API health before starting background capture
def check_api_health(api_url=API_URL, timeout=2):
    try:
        health_url = api_url.rstrip('/') + '/health'
        r = requests.get(health_url, timeout=timeout)
        return r.status_code == 200
    except Exception:
        return False

# Use this check before starting the service; it prints clear status for the user.
if __name__ == '__main__':
    available = check_api_health(API_URL)
    if available:
        print(f"‚úÖ API appears reachable at {API_URL} ‚Äî captured images will be uploaded.")
    else:
        print(f"‚ö†Ô∏è API not reachable at {API_URL}. Images will still be saved locally and logged; the notebook will keep retrying uploads.")

    recognizer = None
    try:
        recognizer = BackgroundFaceCapture()
        recognizer.run()
    except KeyboardInterrupt:
        print("üõë Background service stopped manually.")
    finally:
        if recognizer is not None:
            recognizer.db.close()