In [None]:
import torch
from transformers import DPTFeatureExtractor, DPTForDepthEstimation

from modello_antispoofing import (
    AntiSpoofingModel,
    RGBEncoder,
    DepthEncoder
)

from insightface.app import FaceAnalysis
from ultralytics import YOLO

import os
from dotenv import load_dotenv

import cv2
import numpy as np
from collections import deque
from datetime import datetime
import time
from pathlib import Path
import pickle
from tqdm import tqdm
import requests
import pytz

import warnings
warnings.filterwarnings("ignore")

In [None]:
load_dotenv()
webhook_url = os.getenv("webhook_url")

#recupero datetime odierno
def get_current_datetime(timezone='Europe/Rome',as_utc=False):
    local_tz = pytz.timezone(timezone)
    now_local = datetime.now(local_tz)

    if as_utc:
        now_utc = now_local.astimezone(pytz.utc)
    else:
        now_utc = now_local

    return now_utc.strftime('%Y-%m-%d %H:%M:%S')

# chiamata webhook per innescare workflow n8n
def create_presence(name,recognition_score=0.0):
    try:
        name,employee_id = name.split("-") 

        datetime_utc = get_current_datetime(as_utc=True)

        data = {
            "name": name,
            "employee_id": employee_id,
            "check_in": datetime_utc,
            "recognition_score": recognition_score
        }

        response = requests.post(
            url=webhook_url,
            json=data
        )
        response.raise_for_status()
        print(f"Presenza registrata: {name} alle {datetime_utc}")
        return response.json()
    
    except Exception as e:
        return f"Collegamento a n8n fallito: {e}"

In [None]:
app = FaceAnalysis(
    name="buffalo_l",
    root='~/.insightface',
    providers = ['OpenVINOExecutionProvider']
)

app.prepare(
    ctx_id=0,
    det_size=(640,640),
    det_thresh=0.6
)

def build_db(train_dir,min_det_score=0.7):
    db = {}
    stats = {
        "total": 0,
        "valid": 0,
        "rejected": 0
    }

    for person_dir in Path(train_dir).iterdir():
        embeddings = []
        person_name = person_dir.name

        #calcolo embeddings per ogni persone presente in db
        for img_path in tqdm(person_dir,desc=person_name):
            stats["total"] += 1

            #caricamento immagine
            img = cv2.imread(img_path)
            if img is None:
                stats["rejected"] += 1
                continue

            #estrazione embeddings
            faces = app.get(img)
            if len(faces) == 0:
                stats["rejected"] += 1
                print(f"Immagine {img_path} scartata")
                continue

            f = max(faces, key=lambda x: getattr(x, "det_score", 0.0))
            det_score = getattr(f, "det_score", 0.0)

            if det_score < min_det_score:
                stats["rejected"] += 1
                print(f"Immagine {img_path} scartata")
                continue

            emb = f.embedding.astype(np.float32).ravel()
            emb = emb / (np.linalg.norm(emb) + 1e-8)

            if emb.ndim != 1 or emb.size != 128:
                stats["rejected"] += 1
                print(f"Immagine {img_path} scartata")
                continue

            stats["valid"] += 1
            embeddings.append(emb)

        if embeddings:
            db[person_name] = np.stack(embeddings,axis=0)
            print(f"{person_name}: {len(embeddings)} immagini valide")
        else:
            print(f"Nessuna immagine valida per {person_name}")
        print("\n")

    print(f"\n Statistiche DB:")
    print(f"   Totale immagini: {stats['total']}")
    print(f"   Valide: {stats['valid']}")
    print(f"   Scartate: {stats['rejected']}")

    return db

print("Costruzione db\n")
db = build_db("train",min_det_score=0.7)

with open("face_db.pkl","wb") as f:
    pickle.dump(db,f)

In [None]:
def load_model(chekpoint_path,device):
    print("Caricamento modello per stima della prodonfità")
    model = AntiSpoofingModel(RGBEncoder(),DepthEncoder())
    try:
        state_dict = torch.load(chekpoint_path,map_location=device)
        model.load_state_dict(state_dict=state_dict,strict=False)
        print("Modello anti-spoofing caricato.")
    except Exception as e:
        return f"Errore nel caricamento del modello di anti-spoofing: {e}"

In [None]:
def db_match_from_embedding(query_embedding,top_k=3):
    best_name = "Sconosciuto"
    best_score = -1.0

    for name,emb in db.items():
        arr = np.asarray(emb,dtype=np.float32)

        if arr.ndim == 0:
            print(f"{name} ha un embedding vuoto")
            continue

        elif arr.ndim ==1 :
            if arr.shape[0] != 512:
                print(f"{name} ha un embedding di dimensione errata: {arr.shape[0]} anziché 512")
                continue
            similarity = float(np.dot(query_embedding,arr))
            
        elif arr.ndim == 2:
            if arr.shape[1] != 512:
                print(f"{name} ha un embedding di dimensione errata: {arr.shape[0]} anziché 512")
                continue
            #prodotto matriciale --> (N,512) @ (512,1) = (Nresults,1)
            similarities = arr @ query_embedding
            k = np.min(top_k,len(similarities))
            if k > 0:
                top_indices = np.argpartition(similarities,-k)[-k:]
                top_similarities = similarities[top_indices]
                similarity = float(np.mean(top_similarities))
            else:
                similarity = -1.0
        
        if similarity > best_score:
            best_score = similarity
            best_name = name
    
    return best_name, best_score



def recognize_face_from_frame(frame_bgr,threshold=0.8):
    results = []

    try:
        faces = app.get(frame_bgr)
    except Exception as e:
        return f"Errore nella detection del frame: {e}"

    for f in faces: 
        ##########################
        det_score = getattr(f,"det_score",0.0)
        if det_score < 0.7:
            continue

        emb = f.embeddings.astype(np.float32)
        emb = emb / (np.linalg(emb) + 1e-8)

        matched_name, score = db_match_from_embedding(emb)

        if score < threshold:
            name = "Sconosciuto"
            label = f"{name} (sim={score:.2f})"
        else:
            name = matched_name
            label = f"{name} (sim={score}:.2f)"
        
    bbox = tuple(map(int,f.bbox))
    results.append((bbox,label,name,score))

    return results
