<a href="https://colab.research.google.com/github/Tarik-mas/assessment-config-repo/blob/main/backend.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==================== INSTALLATIONS ====================

print("üì• Installing dependencies...")
!pip install -q python-doctr[torch]
!pip install -q pillow opencv-python

# ==================== IMPORTS ====================

from doctr.models import ocr_predictor
from doctr.io import DocumentFile
import cv2
import numpy as np
from PIL import Image
import json
import re
from datetime import datetime

# ==================== DOCTR MODEL SETUP ====================

print("\nüîÑ Loading DocTR OCR model...")

model = ocr_predictor(
    det_arch='db_resnet50',
    reco_arch='crnn_vgg16_bn',
    pretrained=True
)

print("‚úÖ DocTR model loaded successfully")

# ==================== FIELD EXTRACTION FUNCTIONS ====================

def extract_text_with_doctr(image_path):
    """
    Extract all text from image using DocTR
    Returns all detected text with positions and confidence scores
    """
    # Load document
    doc = DocumentFile.from_images(image_path)

    # Perform OCR
    result = model(doc)

    # Extract text and metadata
    all_text = []
    text_blocks = []

    # Iterate through pages
    for page in result.pages:
        page_text = []

        # Iterate through blocks
        for block in page.blocks:
            for line in block.lines:
                line_text = []
                line_words = []

                for word in line.words:
                    word_text = word.value
                    word_conf = word.confidence

                    line_text.append(word_text)
                    line_words.append({
                        'text': word_text,
                        'confidence': word_conf,
                        'geometry': word.geometry
                    })

                # Combine line text
                full_line = ' '.join(line_text)
                if full_line.strip():
                    page_text.append(full_line)
                    text_blocks.append({
                        'text': full_line,
                        'words': line_words,
                        'confidence': np.mean([w['confidence'] for w in line_words]) if line_words else 0
                    })

        all_text.extend(page_text)

    return all_text, text_blocks

def normalize_text(text):
    """Normalize text for matching"""
    if not text:
        return ""
    text = str(text).lower().strip()
    text = re.sub(r'[^\w\s/-]', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text

def find_field_value(field_keywords, text_blocks, all_text_combined):
    """
    Find field value using keywords and pattern matching

    Args:
        field_keywords: List of possible keywords for the field
        text_blocks: List of text blocks with metadata
        all_text_combined: All text as a single string

    Returns:
        Extracted value and confidence score
    """
    best_match = None
    best_confidence = 0

    # Try to find the field by keywords
    for i, block in enumerate(text_blocks):
        block_text_norm = normalize_text(block['text'])

        # Check if block contains any keyword
        for keyword in field_keywords:
            keyword_norm = normalize_text(keyword)

            if keyword_norm in block_text_norm:
                # Look for value in same block or next blocks
                # Try same block first (after keyword)
                parts = block['text'].split(keyword, 1)
                if len(parts) > 1 and parts[1].strip():
                    value = parts[1].strip()
                    if value and len(value) > 1:
                        best_match = value
                        best_confidence = block['confidence']
                        break

                # Try next block
                if i + 1 < len(text_blocks):
                    next_block = text_blocks[i + 1]
                    value = next_block['text'].strip()
                    if value and len(value) > 1:
                        if not best_match or next_block['confidence'] > best_confidence:
                            best_match = value
                            best_confidence = next_block['confidence']

    return best_match, best_confidence

def extract_date(text_blocks, all_text):
    """Extract dates in various formats (DD/MM/YYYY, DD-MM-YYYY, etc.)"""
    date_pattern = r'\b(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b'

    dates_found = []
    for block in text_blocks:
        matches = re.findall(date_pattern, block['text'])
        for match in matches:
            dates_found.append({
                'value': match,
                'confidence': block['confidence']
            })

    # Also search in combined text
    matches = re.findall(date_pattern, all_text)
    for match in matches:
        if match not in [d['value'] for d in dates_found]:
            dates_found.append({'value': match, 'confidence': 0.8})

    return dates_found

def extract_id_number(text_blocks, all_text):
    """Extract ID number (usually alphanumeric)"""
    # Pattern for ID numbers (adjust based on your ID card format)
    id_patterns = [
        r'\b[A-Z]{1,2}\d{6,10}\b',  # Letter(s) followed by digits
        r'\b\d{6,12}\b',             # Pure numeric ID
        r'\b[A-Z0-9]{8,15}\b'        # Alphanumeric mix
    ]

    ids_found = []
    for block in text_blocks:
        for pattern in id_patterns:
            matches = re.findall(pattern, block['text'])
            for match in matches:
                # Filter out dates that might match
                if not re.match(r'\d{1,2}[/-]\d{1,2}', match):
                    ids_found.append({
                        'value': match,
                        'confidence': block['confidence']
                    })

    return ids_found

def extract_id_card_fields(image_path, verbose=True):
    """
    Extract structured fields from ID card using DocTR

    Returns:
        Dictionary with extracted fields and confidence scores
    """
    if verbose:
        print(f"\n{'='*60}")
        print(f"üéØ ID CARD FIELD EXTRACTION WITH DOCTR")
        print(f"{'='*60}\n")

    # Step 1: Extract all text
    if verbose:
        print("üìÑ Step 1: Extracting text from ID card...")

    all_text, text_blocks = extract_text_with_doctr(image_path)
    all_text_combined = ' '.join(all_text)

    if verbose:
        print(f"‚úÖ Extracted {len(text_blocks)} text blocks")
        print(f"\nüìù Detected Text:")
        for i, block in enumerate(text_blocks[:]):  # Show first 15 blocks
            print(f"   {i+1}. '{block['text']}' (conf: {block['confidence']:.2f})")
        if len(text_blocks) > 15:
            print(f"   ... and {len(text_blocks) - 15} more")
        print()

    # Step 2: Extract specific fields
    if verbose:
        print(f"{'='*60}")
        print(f"üîç Step 2: Extracting specific fields...")
        print(f"{'='*60}\n")

    extracted_fields = {}

    # First Name
    first_name_keywords = ['first name', 'prenom', 'pr√©nom', 'given name', 'nom']
    first_name, fn_conf = find_field_value(first_name_keywords, text_blocks, all_text_combined)
    extracted_fields['first_name'] = {
        'value': first_name if first_name else '',
        'confidence': fn_conf
    }
    if verbose:
        print(f"üë§ First Name: {first_name} (confidence: {fn_conf:.2f})")

    # Last Name
    last_name_keywords = ['last name', 'nom', 'surname', 'family name']
    last_name, ln_conf = find_field_value(last_name_keywords, text_blocks, all_text_combined)
    extracted_fields['last_name'] = {
        'value': last_name if last_name else '',
        'confidence': ln_conf
    }
    if verbose:
        print(f"üë§ Last Name: {last_name} (confidence: {ln_conf:.2f})")

    # Date of Birth
    dob_keywords = ['date of birth', 'dob', 'n√© le', 'ne le', 'birth date', 'date naissance']
    dob, dob_conf = find_field_value(dob_keywords, text_blocks, all_text_combined)

    # If not found, try pattern matching
    if not dob:
        dates = extract_date(text_blocks, all_text_combined)
        if dates:
            dob = dates[0]['value']
            dob_conf = dates[0]['confidence']

    extracted_fields['date_of_birth'] = {
        'value': dob if dob else '',
        'confidence': dob_conf
    }
    if verbose:
        print(f"üìÖ Date of Birth: {dob} (confidence: {dob_conf:.2f})")

    # ID Number
    id_keywords = ['id number', 'card number', 'numero', 'num√©ro', 'cin', 'cni']
    id_num, id_conf = find_field_value(id_keywords, text_blocks, all_text_combined)

    # If not found, try pattern matching
    if not id_num:
        ids = extract_id_number(text_blocks, all_text_combined)
        if ids:
            id_num = ids[0]['value']
            id_conf = ids[0]['confidence']

    extracted_fields['id_number'] = {
        'value': id_num if id_num else '',
        'confidence': id_conf
    }
    if verbose:
        print(f"üÜî ID Number: {id_num} (confidence: {id_conf:.2f})")

    # Expiry Date
    expiry_keywords = ['expiry date', 'expiration', 'valid until', 'expire le', 'valide jusqu']
    expiry, exp_conf = find_field_value(expiry_keywords, text_blocks, all_text_combined)

    # If not found, try getting second date (if exists)
    if not expiry:
        dates = extract_date(text_blocks, all_text_combined)
        if len(dates) > 1:
            expiry = dates[1]['value']
            exp_conf = dates[1]['confidence']

    extracted_fields['expiry_date'] = {
        'value': expiry if expiry else '',
        'confidence': exp_conf
    }
    if verbose:
        print(f"üìÖ Expiry Date: {expiry} (confidence: {exp_conf:.2f})")

    # Place of Birth
    pob_keywords = ['place of birth', 'lieu de naissance', 'birthplace', 'n√© √†', 'ne a']
    pob, pob_conf = find_field_value(pob_keywords, text_blocks, all_text_combined)
    extracted_fields['place_of_birth'] = {
        'value': pob if pob else '',
        'confidence': pob_conf
    }
    if verbose:
        print(f"üìç Place of Birth: {pob} (confidence: {pob_conf:.2f})")

    # Summary
    if verbose:
        print(f"\n{'='*60}")
        print(f"üìä EXTRACTION SUMMARY")
        print(f"{'='*60}")
        filled_fields = sum(1 for f in extracted_fields.values() if f['value'])
        print(f"Fields extracted: {filled_fields}/{len(extracted_fields)}")
        avg_confidence = np.mean([f['confidence'] for f in extracted_fields.values() if f['confidence'] > 0])
        print(f"Average confidence: {avg_confidence:.2f}")
        print(f"{'='*60}\n")

    # Create simplified output
    simple_output = {k: v['value'] for k, v in extracted_fields.items()}

    return simple_output, extracted_fields, text_blocks

# ==================== MAIN USAGE ====================

print("\n‚úÖ System Ready!")
print("   DocTR OCR Model: Loaded")
print("   Ready to process ID cards")

# ==================== EXAMPLE USAGE ====================

"""
# Example: Process an ID card
image_path = "path/to/your/id_card.jpg"
fields, detailed_fields, text_blocks = extract_id_card_fields(image_path, verbose=True)

# Access extracted fields
print("\nüéØ Extracted Fields (Simple):")
print(json.dumps(fields, indent=2, ensure_ascii=False))

# Access with confidence scores
print("\nüìä Detailed Fields with Confidence:")
for field_name, field_data in detailed_fields.items():
    print(f"{field_name}: {field_data['value']} (confidence: {field_data['confidence']:.2f})")

# Save to JSON
with open('extracted_fields.json', 'w', encoding='utf-8') as f:
    json.dump(detailed_fields, f, indent=2, ensure_ascii=False)
print("\nüíæ Saved to 'extracted_fields.json'")
"""

üì• Installing dependencies...
[0m
üîÑ Loading DocTR OCR model...
‚úÖ DocTR model loaded successfully

‚úÖ System Ready!
   DocTR OCR Model: Loaded
   Ready to process ID cards


'\n# Example: Process an ID card\nimage_path = "path/to/your/id_card.jpg"\nfields, detailed_fields, text_blocks = extract_id_card_fields(image_path, verbose=True)\n\n# Access extracted fields\nprint("\nüéØ Extracted Fields (Simple):")\nprint(json.dumps(fields, indent=2, ensure_ascii=False))\n\n# Access with confidence scores\nprint("\nüìä Detailed Fields with Confidence:")\nfor field_name, field_data in detailed_fields.items():\n    print(f"{field_name}: {field_data[\'value\']} (confidence: {field_data[\'confidence\']:.2f})")\n\n# Save to JSON\nwith open(\'extracted_fields.json\', \'w\', encoding=\'utf-8\') as f:\n    json.dump(detailed_fields, f, indent=2, ensure_ascii=False)\nprint("\nüíæ Saved to \'extracted_fields.json\'")\n'

In [None]:
image_path = "cin1.jpg"
fields, detailed_fields, text_blocks = extract_id_card_fields(image_path, verbose=True)



üéØ ID CARD FIELD EXTRACTION WITH DOCTR

üìÑ Step 1: Extracting text from ID card...
‚úÖ Extracted 19 text blocks

üìù Detected Text:
   1. 'ROYAUME DU MAROC' (conf: 0.98)
   2. ' ao) aclos)' (conf: 0.47)
   3. 'CARTE NATIONALE D'IDENTITE' (conf: 0.90)
   4. 'il Aibgli Aaubl' (conf: 0.36)
   5. 'Ll' (conf: 0.30)
   6. 'EL ALAMI' (conf: 0.91)
   7. 'ZAINEB' (conf: 0.97)
   8. 'N√©ele' (conf: 0.89)
   9. '05/12/1983' (conf: 1.00)
   10. 'cuhabj' (conf: 0.18)
   11. 'aljliss Le' (conf: 0.66)
   12. '√† OUARZAZATE' (conf: 0.89)
   13. 'ibyu All yall' (conf: 0.48)
   14. 'Specien' (conf: 0.63)
   15. '-' (conf: 0.57)
   16. 'all uc' (conf: 0.76)
   17. 'Valable jusqu'au 22/07/2029 yle L' (conf: 0.85)
   18. 'No' (conf: 0.86)
   19. 'U1234567 d' (conf: 0.78)
   ... and 4 more

üîç Step 2: Extracting specific fields...

üë§ First Name: None (confidence: 0.00)
üë§ Last Name: None (confidence: 0.00)
üìÖ Date of Birth: 05/12/1983 (confidence: 1.00)
üÜî ID Number: NATIONALE (confidence: 

In [None]:
!pip install torch torchvision transformers Pillow numpy python-doctr[torch] python-dateutil accelerate

Collecting python-doctr[torch]
  Downloading python_doctr-1.0.0-py3-none-any.whl.metadata (32 kB)
[0mCollecting onnx<3.0.0,>=1.12.0 (from python-doctr[torch])
  Downloading onnx-1.19.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (7.0 kB)
Collecting pypdfium2<5.0.0,>=4.11.0 (from python-doctr[torch])
  Downloading pypdfium2-4.30.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (48 kB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m48.5/48.5 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pyclipper<2.0.0,>=1.2.0 (from python-doctr[torch])
  Downloading pyclipper-1.3.0.post6-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Collecting langdetect<2.0.0,>=1.0.9 (from python-doctr[torch])
  Downloading langdetect-1.0.9.tar.gz (981 kB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚î

In [None]:
# ==================== IMPORTS ====================

import torch
from PIL import Image, ImageEnhance
from transformers import AutoProcessor, AutoTokenizer, AutoModelForImageTextToText
from doctr.models import ocr_predictor
from doctr.io import DocumentFile
import numpy as np
import json
from datetime import datetime
import re
import os
from dateutil import parser


os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = '1'

# ==================== NANONETS OCR SETUP ====================

print("\nüîÑ Loading Nanonets OCR model...")

nanonets_model_path = "nanonets/Nanonets-OCR-s"

nanonets_model = AutoModelForImageTextToText.from_pretrained(
    nanonets_model_path,
    torch_dtype="auto",
    device_map="auto"
)
nanonets_model.eval()

nanonets_tokenizer = AutoTokenizer.from_pretrained(nanonets_model_path)
nanonets_processor = AutoProcessor.from_pretrained(nanonets_model_path)

print(f"‚úÖ Nanonets OCR model loaded")

# ==================== DOCTR OCR SETUP ====================

print("\nüîÑ Loading docTR OCR verification model...")

doctr_model = ocr_predictor(
    det_arch='db_resnet50',
    reco_arch='crnn_vgg16_bn',
    pretrained=True
)

print(f"‚úÖ docTR OCR verification model loaded")


üîÑ Loading Nanonets OCR model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json: 0.00B [00:00, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/5.00G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/2.51G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/214 [00:00<?, ?B/s]

'(ReadTimeoutError("HTTPSConnectionPool(host='huggingface.co', port=443): Read timed out. (read timeout=10)"), '(Request ID: 7ba31161-c4ee-43d5-a685-de267f46ee0a)')' thrown while requesting HEAD https://huggingface.co/nanonets/Nanonets-OCR-s/resolve/main/custom_generate/generate.py
Retrying in 1s [Retry 1/5].


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/605 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/613 [00:00<?, ?B/s]

chat_template.jinja: 0.00B [00:00, ?B/s]

preprocessor_config.json:   0%|          | 0.00/575 [00:00<?, ?B/s]

The image processor of type `Qwen2VLImageProcessor` is now loaded as a fast processor by default, even if the model checkpoint was saved with a slow processor. This is a breaking change and may produce slightly different outputs. To continue using the slow processor, instantiate this class with `use_fast=False`. Note that this behavior will be extended to all models in a future release.


video_preprocessor_config.json: 0.00B [00:00, ?B/s]

‚úÖ Nanonets OCR model loaded

üîÑ Loading docTR OCR verification model...
Downloading https://doctr-static.mindee.com/models?id=v0.7.0/db_resnet50-79bd7d70.pt&src=0 to /root/.cache/doctr/models/db_resnet50-79bd7d70.pt


  0%|          | 0/102021912 [00:00<?, ?it/s]

Downloading https://doctr-static.mindee.com/models?id=v0.12.0/crnn_vgg16_bn-0417f351.pt&src=0 to /root/.cache/doctr/models/crnn_vgg16_bn-0417f351.pt


  0%|          | 0/63303144 [00:00<?, ?it/s]

‚úÖ docTR OCR verification model loaded


In [None]:
# ==================== IMAGE PREPROCESSING ====================

def preprocess_image(image_path, save_path="preprocessed_id_card.jpg"):

    print(f"\n{'='*60}")
    print("üîß PREPROCESSING IMAGE")
    print(f"{'='*60}")

    # Load original image
    original = Image.open(image_path).convert("RGB")
    enhanced = original.copy()

    print(f"üìê Image size: {original.size[0]}x{original.size[1]} pixels")

    # Sharpness
    enhancer = ImageEnhance.Sharpness(enhanced)
    enhanced = enhancer.enhance(2.0)

    # Contrast
    enhancer = ImageEnhance.Contrast(enhanced)
    enhanced = enhancer.enhance(1.5)

    # Brightness
    enhancer = ImageEnhance.Brightness(enhanced)
    enhanced = enhancer.enhance(1.1)

    # Save preprocessed image
    enhanced.save(save_path)
    print(f"üíæ Saved to: {save_path}")
    print(f"{'='*60}\n")

    return save_path

In [None]:
# ==================== NANONETS OCR FUNCTION ====================

def extract_with_nanonets(image_path, max_new_tokens=512):
    """
    Extract ID card fields using Nanonets OCR
    Returns structured JSON with all fields
    """
    prompt = """
You are an OCR engine specialized in ID cards. Extract the following fields from the uploaded ID card image and return them in a structured JSON format:

{
    "first_name": "",
    "last_name": "",
    "date_of_birth": "",
    "id_number": "",
    "expiry_date": "",
    "place_of_birth": ""
}
place_of_birth is place_of_birth_√†
"""

    # Load and resize image
    image = Image.open(image_path).convert("RGB")
    image = image.resize((512, 512))

    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": [
            {"type": "image", "image": f"file://{image_path}"},
            {"type": "text", "text": prompt},
        ]},
    ]

    text = nanonets_processor.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )

    inputs = nanonets_processor(
        text=[text],
        images=[image],
        padding=True,
        return_tensors="pt"
    )
    inputs = inputs.to(nanonets_model.device)

    output_ids = nanonets_model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=False
    )

    generated_ids = [
        output_ids[len(input_ids):]
        for input_ids, output_ids in zip(inputs.input_ids, output_ids)
    ]

    output_text = nanonets_processor.batch_decode(
        generated_ids,
        skip_special_tokens=True,
        clean_up_tokenization_spaces=True
    )[0]

    # Parse JSON from output
    try:
        # Extract JSON from output text
        json_start = output_text.find('{')
        json_end = output_text.rfind('}') + 1
        if json_start != -1 and json_end > json_start:
            json_str = output_text[json_start:json_end]
            extracted_data = json.loads(json_str)
        else:
            extracted_data = {}
    except:
        extracted_data = {}

    return extracted_data, output_text

# ==================== DOCTR VERIFIER ====================

class DocTRVerifier:
    """
    Verifies Nanonets OCR extraction using docTR
    """

    def __init__(self, model):
        self.model = model
        self.verification_threshold = 0.6

    def extract_text_from_image(self, image_path):
        """Extract text from image using docTR"""
        # Load document
        doc = DocumentFile.from_images(image_path)

        # Perform OCR
        result = self.model(doc)

        # Extract all text
        full_text = ""
        for page in result.pages:
            for block in page.blocks:
                for line in block.lines:
                    for word in line.words:
                        full_text += word.value + " "
                    full_text += "\n"

        return full_text.strip(), result

    def normalize_date(self, text):
        """Try to parse a date string and return normalized 'DD.MM.YYYY' format"""
        try:
            date = parser.parse(text, fuzzy=True, dayfirst=True)
            return date.strftime("%d.%m.%Y")
        except Exception:
            return None

    def verify_extraction(self, id_card_image_path, nanonets_json, verbose=True):
        """
        Verify Nanonets extraction with docTR
        Returns verified fields or marks failed ones
        """
        if verbose:
            print(f"\n{'='*60}")
            print(f"üîç DOCTR VERIFICATION")
            print(f"{'='*60}")

        # Get OCR text from docTR
        if verbose:
            print("üìÑ Extracting text for verification...")

        ocr_text, ocr_result = self.extract_text_from_image(id_card_image_path)

        if verbose:
            print(f"‚úÖ Text extracted ({len(ocr_text)} characters)")
            print(f"\nüìù Extracted text:")
            print("-" * 60)
            print(ocr_text)
            print("-" * 60)
            print(f"\n{'='*60}")
            print(f"üìä FIELD VERIFICATION")
            print(f"{'='*60}\n")

        # Verify each field
        verified_data = {}
        verification_report = {
            'timestamp': datetime.now().isoformat(),
            'image_path': id_card_image_path,
            'overall_valid': True,
            'total_fields': 0,
            'valid_fields': 0,
            'invalid_fields': 0,
            'fields': {}
        }

        ocr_lower = ocr_text.lower()

        for field_name, field_value in nanonets_json.items():
            # Skip empty fields
            if not field_value or field_value.strip() == "":
                verified_data[field_name] = "EMPTY"
                continue

            verification_report['total_fields'] += 1

            # Normalize values
            value_normalized = str(field_value).lower().strip()
            value_clean = re.sub(r'[^\w\s]', '', value_normalized)

            # Matching strategies
            exact_match = value_normalized in ocr_lower
            clean_match = value_clean in re.sub(r'[^\w\s]', '', ocr_lower)

            # Word-level matching
            value_words = value_clean.split()
            word_matches = sum(1 for word in value_words if len(word) > 2 and word in ocr_lower)
            word_match_ratio = word_matches / len(value_words) if value_words else 0

            # Calculate confidence
            if exact_match:
                confidence = 1.0
                match_type = 'exact'
            elif clean_match:
                confidence = 0.95
                match_type = 'clean'
            elif word_match_ratio >= 0.8:
                confidence = 0.85
                match_type = 'partial'
            elif word_match_ratio >= 0.6:
                confidence = 0.7
                match_type = 'weak'
            else:
                confidence = word_match_ratio * 0.5
                match_type = 'failed'

            is_valid = confidence >= self.verification_threshold

            # Handle date mismatch using date normalization (only for failed matches)
            # we can use if field_value in ocr_text.replace('.', ' ') instead of normalize_date method
            if not is_valid and field_name in ["date_of_birth", "expiry_date"]:
                normalized_date = self.normalize_date(field_value)
                if normalized_date:
                    # Compare normalized date with OCR text
                    if normalized_date in ocr_text:
                        field_value = normalized_date
                        is_valid = True
                        match_type = "date_normalized"
                        confidence = 0.95
                        if verbose:
                            print(f"üóìÔ∏è  {field_name} matched after normalization ‚Üí {normalized_date}")


            # Store result
            if is_valid:
                verified_data[field_name] = field_value
                verification_report['valid_fields'] += 1
                if verbose:
                    print(f"‚úÖ {field_name}: {field_value}")
                    print(f"   Confidence: {confidence:.1%} ({match_type})\n")
            else:
                verified_data[field_name] = "VERIFICATION_FAILED"
                verification_report['invalid_fields'] += 1
                verification_report['overall_valid'] = False
                if verbose:
                    print(f"‚ùå {field_name}: {field_value}")
                    print(f"   Confidence: {confidence:.1%} (NOT VERIFIED)\n")

            verification_report['fields'][field_name] = {
                'value': field_value,
                'verified_value': verified_data[field_name],
                'valid': is_valid,
                'confidence': round(confidence, 3),
                'match_type': match_type
            }

        if verbose:
            print(f"{'='*60}")
            print(f"üìä SUMMARY")
            print(f"{'='*60}")
            print(f"Status: {'‚úÖ VERIFIED' if verification_report['overall_valid'] else '‚ö†Ô∏è PARTIAL VERIFICATION'}")
            print(f"Valid: {verification_report['valid_fields']}/{verification_report['total_fields']}")
            print(f"Failed: {verification_report['invalid_fields']}/{verification_report['total_fields']}")
            print(f"{'='*60}\n")

        return verified_data, verification_report

# ==================== INITIALIZE SYSTEM ====================

verifier = DocTRVerifier(doctr_model)

print("\n‚úÖ System Ready!")
print(f"   Nanonets OCR: Loaded")
print(f"   docTR Verifier: Loaded")

# ==================== MAIN PIPELINE ====================

def process_id_card(image_path, use_preprocessing=True, verbose=True):
    """
    Complete pipeline: Extract with Nanonets + Verify with docTR

    Returns:
        verified_data: Dict with verified values or "VERIFICATION_FAILED" for invalid fields
        report: Detailed verification report
    """
    if verbose:
        print("\n" + "="*60)
        print("üéØ ID CARD PROCESSING PIPELINE")
        print("="*60)

    # Step 0: Preprocess image (NEW!)
    if use_preprocessing:
        if verbose:
            print("\nüîß STEP 0: Preprocessing image...")
        preprocessed_path = preprocess_image(image_path)
        processing_image = preprocessed_path
    else:
        if verbose:
            print("\n‚è≠Ô∏è  Skipping preprocessing (use_preprocessing=False)")
        processing_image = image_path

    # Step 1: Extract with Nanonets
    if verbose:
        print("\nüìù STEP 1: Extracting with Nanonets OCR...")

    extracted_data, raw_output = extract_with_nanonets(processing_image)

    if verbose:
        print(f"\nüìù Raw output:")
        print(raw_output)
        print(f"\n‚úÖ Extraction complete")
        print(f"\nüìã Extracted Fields:")
        print(json.dumps(extracted_data, indent=2, ensure_ascii=False))

    # Step 2: Verify with docTR
    if verbose:
        print(f"\nüîç STEP 2: Verifying with docTR...")

    verified_data, report = verifier.verify_extraction(processing_image, extracted_data, verbose)

    # Step 3: Final output
    if verbose:
        print("\n" + "="*60)
        print("üì¶ FINAL VERIFIED DATA")
        print("="*60)
        print(json.dumps(verified_data, indent=2, ensure_ascii=False))
        print("="*60)

    return verified_data, report

# ==================== USAGE EXAMPLE ====================

print("\n" + "="*60)
print("üí° USAGE EXAMPLE")
print("="*60)

# Upload ID card
print("\nüì§ Upload your ID card image:")
from google.colab import files
uploaded = files.upload()

if uploaded:
    id_card_path = list(uploaded.keys())[0]
    print(f" Loaded: {id_card_path}")

    # Process ID card
    verified_data, report = process_id_card(id_card_path, use_preprocessing=False, verbose=True)

    # Save results
    print("\nüíæ Saving results...")

    with open('verified_data.json', 'w', encoding='utf-8') as f:
        json.dump(verified_data, f, indent=2, ensure_ascii=False)

    with open('verification_report.json', 'w', encoding='utf-8') as f:
        json.dump(report, f, indent=2, ensure_ascii=False)




‚úÖ System Ready!
   Nanonets OCR: Loaded
   docTR Verifier: Loaded

üí° USAGE EXAMPLE

üì§ Upload your ID card image:


Saving cinF.jpg to cinF (3).jpg
 Loaded: cinF (3).jpg

üéØ ID CARD PROCESSING PIPELINE

‚è≠Ô∏è  Skipping preprocessing (use_preprocessing=False)

üìù STEP 1: Extracting with Nanonets OCR...

üìù Raw output:
```json
{
    "first_name": "FADWA",
    "last_name": "AIT M'HAMED",
    "date_of_birth": "16.10.2002",
    "id_number": "EE843569",
    "expiry_date": "27.03.2031",
    "place_of_birth": "TINEJDAD ERRACHIDIA"
}
```

‚úÖ Extraction complete

üìã Extracted Fields:
{
  "first_name": "FADWA",
  "last_name": "AIT M'HAMED",
  "date_of_birth": "16.10.2002",
  "id_number": "EE843569",
  "expiry_date": "27.03.2031",
  "place_of_birth": "TINEJDAD ERRACHIDIA"
}

üîç STEP 2: Verifying with docTR...

üîç DOCTR VERIFICATION
üìÑ Extracting text for verification...
‚úÖ Text extracted (230 characters)

üìù Extracted text:
------------------------------------------------------------
ROYAUME DU MAROC 
CARTE 
aegpoo)) 
NATIONALE 
acloo 
D'IDENTITE 
IL Aibgli Aelbyi 
5aa 
FADWA 
AIT M'HAMED 
N√

In [None]:
!pip install fastapi uvicorn nest-asyncio pyngrok pillow transformers


Collecting pyngrok
  Downloading pyngrok-7.4.0-py3-none-any.whl.metadata (8.1 kB)
Downloading pyngrok-7.4.0-py3-none-any.whl (25 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.4.0


In [None]:
!ngrok config 34KxvxbyBf4OkDpFCWN1JxkK0iP_E6PMQKpz5UFvuf3cW7vm

NAME:
  config - update or migrate ngrok's configuration file

USAGE:
  ngrok config [flags]

DESCRIPTION: 
  The config command gives a quick way to create or update ngrok's configuration
  file. Use 'add-authtoken' or 'add-api-key' to set the corresponding properties.

  Use 'check' to test a configuration file for validity. If you have an old
  configuration file, you can also use 'upgrade' to automatically migrate to the
  latest version.

COMMANDS:
  add-api-key                    save api key to configuration file
  add-authtoken                  save authtoken to configuration file
  add-connect-url                adds the connect URL (connect_url) to configuration file for custom agent ingress
  add-server-addr                alias of add-connect-url
  check                          check configuration file
  edit                           edit configuration file
  upgrade                        auto-upgrade configuration file

OPTIONS:
      --config strings   path to config f

In [None]:
import nest_asyncio
from fastapi import FastAPI, UploadFile, File
from pyngrok import ngrok
import torch
from PIL import Image
import uvicorn
import asyncio
import os
from fastapi.middleware.cors import CORSMiddleware

# Apply nest_asyncio for Colab
nest_asyncio.apply()

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

In [None]:
# FastAPI endpoint
@app.post("/ocr")
async def ocr_endpoint(image: UploadFile = File(...)):
    temp_path = f"/tmp/{image.filename}"
    with open(temp_path, "wb") as f:
        content = await image.read()
        f.write(content)

#     result= {
#     "first_name_latin": "TARlK",
#     "last_name_latin": "MASNAOUI",
#     "date_of_birth": "23.02.2002",
#     "id_number": "QA193251",
#     "expiry_date": "16.05.2029",
#     "place_of_birth_latin": "OUED ZEM KHOURIBGA"
# }

    try:
        result, report = process_id_card(temp_path,use_preprocessing=False, verbose=False)
        return result
    finally:
        # Clean up the temporary file
        if os.path.exists(temp_path):
            os.remove(temp_path)


# Expose Colab via ngrok
# Check if ngrok tunnel already exists and kill if necessary
try:
    tunnels = ngrok.get_tunnels()
    if tunnels:
        print("Killing existing ngrok tunnels...")
        ngrok.kill()
except:
    pass # Ignore errors if no tunnels exist

try:
    public_url = ngrok.connect(8000).public_url
except Exception as e:
    print(f"Error creating ngrok tunnel: {e}")
    public_url = None # Set to None if tunnel creation fails


if public_url:
    print(f"API URL: {public_url}")

    # Use uvicorn.Config and uvicorn.Server to run within the existing loop
    config = uvicorn.Config(app, host="0.0.0.0", port=8000)
    server = uvicorn.Server(config)

    # Run the server in the current loop
    asyncio.get_event_loop().run_until_complete(server.serve())
    print("Failed to create ngrok tunnel. Please check your ngrok setup and try again.")

Error creating ngrok tunnel: An error occurred while downloading ngrok from https://bin.equinox.io/c/bNyj1mQVY4c/ngrok-v3-stable-linux-amd64.zip: <urlopen error timed out>


In [None]:
!pip install -q insightface

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m62.0/62.0 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m18.2/18.2 MB[0m [31m71.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m16.5/16.5 MB[0m [31m72.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for insightface (pyproject.toml) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the follow

In [None]:
!pip install -q insightface onnxruntime-gpu opencv-python-headless

!pip install mediapipe

[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m17.4/17.4 MB[0m [31m40.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m46.0/46.0 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m86.8/86.8 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
Collecting numpy<2 (from mediapipe)
  Using cached numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
Using cached numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.0 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 2.2.6
    Uninstalling numpy-2.2.6:
      Successfully uninstall

In [None]:
# Initialize the face analysis app with Buffalo_L model
app = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
app.prepare(ctx_id=0, det_size=(640, 640))

# Initialize MediaPipe Face Mesh for accurate eye tracking and 3D depth analysis
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

# ==================== LIVENESS DETECTION ====================

# MediaPipe eye landmark indices
LEFT_EYE_INDICES = [362, 385, 387, 263, 373, 380]
RIGHT_EYE_INDICES = [33, 160, 158, 133, 153, 144]

# 3D depth landmarks (nose tip, face center, chin)
DEPTH_LANDMARKS = {
    'nose_tip': 1,
    'nose_bridge': 6,
    'left_eye_outer': 33,
    'right_eye_outer': 263,
    'chin': 152,
    'forehead': 10
}

def calculate_eye_aspect_ratio(eye_landmarks):
    """Calculate Eye Aspect Ratio (EAR) using MediaPipe landmarks"""
    A = np.linalg.norm(eye_landmarks[1] - eye_landmarks[5])
    B = np.linalg.norm(eye_landmarks[2] - eye_landmarks[4])
    C = np.linalg.norm(eye_landmarks[0] - eye_landmarks[3])
    ear = (A + B) / (2.0 * C)
    return ear

def get_eye_landmarks(face_landmarks, indices, img_width, img_height):
    """Extract eye landmarks from MediaPipe face mesh"""
    landmarks = []
    for idx in indices:
        landmark = face_landmarks.landmark[idx]
        x = int(landmark.x * img_width)
        y = int(landmark.y * img_height)
        landmarks.append([x, y])
    return np.array(landmarks, dtype=np.float32)

def calculate_head_pose(face_landmarks, img_width, img_height):
    """Calculate head pose angles (yaw, pitch, roll) from MediaPipe landmarks"""
    # Key 3D landmarks for pose estimation
    nose_tip = face_landmarks.landmark[1]
    chin = face_landmarks.landmark[152]
    left_eye = face_landmarks.landmark[33]
    right_eye = face_landmarks.landmark[263]
    left_ear = face_landmarks.landmark[234]
    right_ear = face_landmarks.landmark[454]

    # Calculate yaw (left-right rotation)
    left_x = left_ear.x * img_width
    right_x = right_ear.x * img_width
    face_center_x = (left_eye.x + right_eye.x) * img_width / 2

    # Yaw angle: negative = left, positive = right
    face_width = abs(right_x - left_x)
    if face_width > 0:
        yaw = ((face_center_x - img_width/2) / (img_width/2)) * 45
    else:
        yaw = 0

    # Calculate pitch (up-down rotation)
    nose_y = nose_tip.y * img_height
    chin_y = chin.y * img_height
    eye_center_y = (left_eye.y + right_eye.y) * img_height / 2

    # Pitch angle: negative = down, positive = up
    pitch = ((nose_y - eye_center_y) / img_height) * 90

    # Calculate roll (tilt) using eye alignment
    eye_slope = (right_eye.y - left_eye.y) / (right_eye.x - left_eye.x + 1e-6)
    roll = np.degrees(np.arctan(eye_slope))

    return yaw, pitch, roll

def calculate_3d_depth_features(face_landmarks, img_width, img_height):
    """Calculate 3D depth features using MediaPipe's z-coordinate"""
    depth_features = {}

    for name, idx in DEPTH_LANDMARKS.items():
        landmark = face_landmarks.landmark[idx]
        depth_features[name] = {
            'x': landmark.x * img_width,
            'y': landmark.y * img_height,
            'z': landmark.z * img_width,
            'visibility': landmark.visibility if hasattr(landmark, 'visibility') else 1.0
        }

    nose_depth = depth_features['nose_tip']['z']
    chin_depth = depth_features['chin']['z']
    forehead_depth = depth_features['forehead']['z']

    depth_range = abs(nose_depth - forehead_depth) + abs(nose_depth - chin_depth)

    left_eye = np.array([depth_features['left_eye_outer'][k] for k in ['x', 'y', 'z']])
    right_eye = np.array([depth_features['right_eye_outer'][k] for k in ['x', 'y', 'z']])
    interocular_3d = np.linalg.norm(left_eye - right_eye)

    face_width = np.linalg.norm(left_eye[:2] - right_eye[:2])
    depth_to_width_ratio = depth_range / (face_width + 1e-6)

    return {
        'depth_range': depth_range,
        'interocular_3d': interocular_3d,
        'depth_to_width_ratio': depth_to_width_ratio,
        'nose_protrusion': abs(nose_depth - (forehead_depth + chin_depth) / 2),
        'all_depths': depth_features
    }

def is_3d_face(depth_features_list, threshold_ratio=0.015):
    """Determine if face is 3D (real person) or 2D (photo/screen)"""
    if len(depth_features_list) < 3:
        return False, 0.0, "Not enough frames for 3D analysis"

    avg_depth_range = np.mean([f['depth_range'] for f in depth_features_list])
    avg_depth_ratio = np.mean([f['depth_to_width_ratio'] for f in depth_features_list])
    avg_nose_protrusion = np.mean([f['nose_protrusion'] for f in depth_features_list])
    depth_variance = np.std([f['depth_range'] for f in depth_features_list])

    depth_score = 0.0
    reasons = []

    if avg_depth_ratio > threshold_ratio:
        depth_score += 0.4
        reasons.append(f"‚úì Good depth ratio: {avg_depth_ratio:.4f}")
    else:
        reasons.append(f"‚úó Low depth ratio: {avg_depth_ratio:.4f} (flat surface)")

    if avg_nose_protrusion > 5.0:
        depth_score += 0.3
        reasons.append(f"‚úì Nose protrudes: {avg_nose_protrusion:.2f}px")
    else:
        reasons.append(f"‚úó Flat nose: {avg_nose_protrusion:.2f}px")

    if 0.5 < depth_variance < 20.0:
        depth_score += 0.3
        reasons.append(f"‚úì Natural depth variance: {depth_variance:.2f}")
    else:
        reasons.append(f"‚úó Abnormal variance: {depth_variance:.2f}")

    is_3d = depth_score >= 0.5

    return is_3d, depth_score, reasons

class ActiveLivenessDetector:
    """Active liveness detection with user instructions"""

    def __init__(self):
        self.EAR_THRESHOLD = 0.21
        self.YAW_THRESHOLD = 15  # degrees for left/right turn
        self.challenges = ['turn_left', 'turn_right', 'blink']
        self.current_challenge_idx = 0
        self.challenge_completed = {
            'turn_left': False,
            'turn_right': False,
            'blink': False
        }
        self.ear_history = []
        self.yaw_history = []
        self.frame_count = 0
        self.challenge_start_frame = 0

    def get_current_challenge(self):
        """Get current challenge instruction"""
        if self.current_challenge_idx >= len(self.challenges):
            return None
        return self.challenges[self.current_challenge_idx]

    def get_instruction_text(self):
        """Get instruction text for current challenge"""
        challenge = self.get_current_challenge()
        if challenge == 'turn_left':
            return "üëà Turn your head LEFT"
        elif challenge == 'turn_right':
            return "üëâ Turn your head RIGHT"
        elif challenge == 'blink':
            return "üòä BLINK your eyes"
        return "‚úÖ All challenges complete!"

    def verify_challenge(self, face_landmarks, img_width, img_height):
        """Verify if user completed current challenge"""
        self.frame_count += 1
        challenge = self.get_current_challenge()

        if challenge is None:
            return True

        # Calculate head pose
        yaw, pitch, roll = calculate_head_pose(face_landmarks, img_width, img_height)
        self.yaw_history.append(yaw)

        # Calculate eye aspect ratio for blink detection
        left_eye = get_eye_landmarks(face_landmarks, LEFT_EYE_INDICES, img_width, img_height)
        right_eye = get_eye_landmarks(face_landmarks, RIGHT_EYE_INDICES, img_width, img_height)
        left_ear = calculate_eye_aspect_ratio(left_eye)
        right_ear = calculate_eye_aspect_ratio(right_eye)
        avg_ear = (left_ear + right_ear) / 2.0
        self.ear_history.append(avg_ear)

        # Keep only last 30 frames
        if len(self.yaw_history) > 30:
            self.yaw_history.pop(0)
        if len(self.ear_history) > 30:
            self.ear_history.pop(0)

        completed = False

        # Check if challenge is completed
        if challenge == 'turn_left':
            # User should turn head left (negative yaw)
            if len(self.yaw_history) >= 10:
                recent_yaw = self.yaw_history[-10:]
                if np.mean(recent_yaw) < -self.YAW_THRESHOLD:
                    completed = True

        elif challenge == 'turn_right':
            # User should turn head right (positive yaw)
            if len(self.yaw_history) >= 10:
                recent_yaw = self.yaw_history[-10:]
                if np.mean(recent_yaw) > self.YAW_THRESHOLD:
                    completed = True

        elif challenge == 'blink':
            # Detect blink: EAR drops below threshold then rises above
            if len(self.ear_history) >= 5:
                # Look for blink pattern in last 5 frames
                for i in range(len(self.ear_history) - 2):
                    if (self.ear_history[i] > self.EAR_THRESHOLD and
                        self.ear_history[i+1] < self.EAR_THRESHOLD and
                        self.ear_history[i+2] > self.EAR_THRESHOLD):
                        completed = True
                        break

        if completed and not self.challenge_completed[challenge]:
            self.challenge_completed[challenge] = True
            self.current_challenge_idx += 1
            self.challenge_start_frame = self.frame_count
            self.yaw_history.clear()
            self.ear_history.clear()
            return True

        return False

    def is_complete(self):
        """Check if all challenges are completed"""
        return all(self.challenge_completed.values())

    def get_progress(self):
        """Get progress percentage"""
        completed_count = sum(self.challenge_completed.values())
        return int((completed_count / len(self.challenges)) * 100)

# ==================== VIDEO CAPTURE WITH ACTIVE LIVENESS ====================

def capture_video_with_active_liveness(max_duration=20):
    """Capture video stream with active liveness challenges"""

    js_code = '''
    async function captureVideoWithActiveLiveness(maxDuration) {
        const video = document.createElement('video');
        const canvas = document.createElement('canvas');
        const ctx = canvas.getContext('2d');
        const statusDiv = document.createElement('div');
        const instructionDiv = document.createElement('div');
        const progressDiv = document.createElement('div');
        const timerDiv = document.createElement('div');
        const challengeDiv = document.createElement('div');

        // Setup UI
        const container = document.createElement('div');
        container.style.textAlign = 'center';
        container.style.fontFamily = 'Arial, sans-serif';
        container.style.backgroundColor = '#f5f5f5';
        container.style.padding = '20px';
        container.style.borderRadius = '15px';

        video.style.width = '640px';
        video.style.height = '480px';
        video.style.border = '4px solid #4CAF50';
        video.style.borderRadius = '10px';
        video.style.display = 'block';
        video.style.margin = '10px auto';
        video.style.transform = 'scaleX(-1)';  // Mirror effect
        video.style.boxShadow = '0 4px 10px rgba(0,0,0,0.3)';

        instructionDiv.style.fontSize = '36px';
        instructionDiv.style.fontWeight = 'bold';
        instructionDiv.style.color = '#fff';
        instructionDiv.style.padding = '25px';
        instructionDiv.style.backgroundColor = '#2196F3';
        instructionDiv.style.borderRadius = '10px';
        instructionDiv.style.margin = '10px auto';
        instructionDiv.style.width = '600px';
        instructionDiv.style.boxShadow = '0 4px 6px rgba(0,0,0,0.2)';
        instructionDiv.style.animation = 'pulse 1s infinite';
        instructionDiv.innerHTML = 'üé• GET READY!';

        // Add CSS animation
        const style = document.createElement('style');
        style.textContent = `
            @keyframes pulse {
                0%, 100% { transform: scale(1); }
                50% { transform: scale(1.05); }
            }
        `;
        document.head.appendChild(style);

        challengeDiv.style.fontSize = '20px';
        challengeDiv.style.color = '#333';
        challengeDiv.style.padding = '15px';
        challengeDiv.style.fontWeight = 'bold';
        challengeDiv.style.backgroundColor = '#fff';
        challengeDiv.style.borderRadius = '8px';
        challengeDiv.style.margin = '10px auto';
        challengeDiv.style.width = '600px';
        challengeDiv.style.boxShadow = '0 2px 4px rgba(0,0,0,0.1)';

        statusDiv.style.fontSize = '18px';
        statusDiv.style.color = '#666';
        statusDiv.style.padding = '10px';
        statusDiv.textContent = 'Initializing camera...';

        progressDiv.style.width = '640px';
        progressDiv.style.height = '40px';
        progressDiv.style.backgroundColor = '#E0E0E0';
        progressDiv.style.borderRadius = '20px';
        progressDiv.style.margin = '15px auto';
        progressDiv.style.overflow = 'hidden';
        progressDiv.style.boxShadow = 'inset 0 2px 4px rgba(0,0,0,0.1)';

        const progressBar = document.createElement('div');
        progressBar.style.height = '100%';
        progressBar.style.backgroundColor = '#4CAF50';
        progressBar.style.width = '0%';
        progressBar.style.transition = 'width 0.5s ease';
        progressBar.style.display = 'flex';
        progressBar.style.alignItems = 'center';
        progressBar.style.justifyContent = 'center';
        progressBar.style.color = 'white';
        progressBar.style.fontWeight = 'bold';
        progressDiv.appendChild(progressBar);

        timerDiv.style.fontSize = '28px';
        timerDiv.style.fontWeight = 'bold';
        timerDiv.style.color = '#FF5722';
        timerDiv.style.padding = '10px';
        timerDiv.style.backgroundColor = '#fff';
        timerDiv.style.borderRadius = '8px';
        timerDiv.style.display = 'inline-block';
        timerDiv.style.minWidth = '150px';

        container.appendChild(instructionDiv);
        container.appendChild(challengeDiv);
        container.appendChild(video);
        container.appendChild(progressDiv);
        container.appendChild(timerDiv);
        container.appendChild(statusDiv);
        document.body.appendChild(container);

        // Get camera stream
        const stream = await navigator.mediaDevices.getUserMedia({
            video: { width: 640, height: 480, facingMode: 'user' }
        });
        video.srcObject = stream;
        await video.play();

        canvas.width = video.videoWidth;
        canvas.height = video.videoHeight;

        let frames = [];
        let startTime = Date.now();
        let isComplete = false;

        // Challenge sequence
        const challenges = [
            { text: 'üëà TURN YOUR HEAD LEFT', duration: 3, bg: '#FF9800' },
            { text: 'üëâ TURN YOUR HEAD RIGHT', duration: 3, bg: '#9C27B0' },
            { text: 'üòä BLINK YOUR EYES', duration: 3, bg: '#E91E63' }
        ];

        let currentChallengeIndex = 0;

        // Wait 2 seconds before starting
        await new Promise(resolve => setTimeout(resolve, 2000));

        instructionDiv.innerHTML = challenges[0].text;
        instructionDiv.style.backgroundColor = challenges[0].bg;
        challengeDiv.innerHTML = 'üìã Challenge 1 of 3';
        statusDiv.textContent = 'Follow the instruction above!';

        let challengeStartTime = Date.now();

        const captureInterval = setInterval(() => {
            if (isComplete) {
                clearInterval(captureInterval);
                return;
            }

            ctx.save();
            ctx.scale(-1, 1);  // Mirror the canvas
            ctx.drawImage(video, -canvas.width, 0, canvas.width, canvas.height);
            ctx.restore();

            const frameData = canvas.toDataURL('image/jpeg', 0.85);
            frames.push(frameData);

            // Update timer
            const elapsed = Math.floor((Date.now() - startTime) / 1000);
            const remaining = maxDuration - elapsed;
            timerDiv.innerHTML = `‚è±Ô∏è Time: <span style="color: ${remaining < 5 ? '#F44336' : '#FF5722'}">${remaining}s</span>`;

            // Update challenge progress
            const challengeElapsed = (Date.now() - challengeStartTime) / 1000;
            const currentChallenge = challenges[currentChallengeIndex];

            if (challengeElapsed >= currentChallenge.duration && currentChallengeIndex < challenges.length - 1) {
                currentChallengeIndex++;
                challengeStartTime = Date.now();
                instructionDiv.innerHTML = challenges[currentChallengeIndex].text;
                instructionDiv.style.backgroundColor = challenges[currentChallengeIndex].bg;
                challengeDiv.innerHTML = `üìã Challenge ${currentChallengeIndex + 1} of 3`;
            }

            // Update progress bar
            const progress = (currentChallengeIndex / challenges.length) * 100 +
                           (challengeElapsed / currentChallenge.duration) * (100 / challenges.length);
            progressBar.style.width = Math.min(progress, 100) + '%';
            progressBar.textContent = Math.floor(progress) + '%';

            // Check timeout
            if (remaining <= 0) {
                isComplete = true;
                stream.getTracks().forEach(track => track.stop());
                instructionDiv.innerHTML = '‚è∞ CAPTURE COMPLETE!';
                instructionDiv.style.backgroundColor = '#4CAF50';
                challengeDiv.innerHTML = '‚úÖ Processing your video...';
                statusDiv.textContent = 'Analyzing liveness...';
                setTimeout(() => {
                    container.remove();
                }, 2000);
            }

        }, 100);  // Capture at ~10 FPS

        return new Promise((resolve) => {
            const checkComplete = setInterval(() => {
                if (isComplete) {
                    clearInterval(checkComplete);
                    clearInterval(captureInterval);
                    stream.getTracks().forEach(track => track.stop());
                    resolve(frames);
                }
            }, 100);
        });
    }
    '''

    display(Javascript(js_code))

    print("\nüé• ACTIVE LIVENESS DETECTION")
    print("=" * 60)
    print("üìã Instructions:")
    print("  1. Allow camera access when prompted")
    print("  2. Position your face in the center")
    print("  3. Follow the on-screen instructions:")
    print("     ‚Ä¢ Turn your head LEFT when asked")
    print("     ‚Ä¢ Turn your head RIGHT when asked")
    print("     ‚Ä¢ BLINK your eyes when asked")
    print("  4. Complete all challenges within the time limit")
    print("=" * 60)

    # Capture frames
    frames_data = eval_js(f'captureVideoWithActiveLiveness({max_duration})')

    print(f"\n‚úÖ Captured {len(frames_data)} frames")

    # Save frames
    saved_frames = []
    frames_dir = 'liveness_frames'
    if not os.path.exists(frames_dir):
        os.makedirs(frames_dir)

    for i, frame_data in enumerate(frames_data):
        binary = b64decode(frame_data.split(',')[1])
        frame_path = f'{frames_dir}/frame_{i:03d}.jpg'

        with open(frame_path, 'wb') as f:
            f.write(binary)

        saved_frames.append(frame_path)

    print(f"üìÅ Saved {len(saved_frames)} frames to {frames_dir}/")

    return saved_frames

# ==================== ACTIVE LIVENESS ANALYSIS ====================

def analyze_active_liveness(frame_paths):
    """Analyze captured frames with active challenge verification"""

    print("\nüîç Analyzing active liveness challenges...")

    detector = ActiveLivenessDetector()
    depth_features_list = []
    face_embeddings = []
    frame_quality_scores = []

    challenge_frames = {
        'turn_left': [],
        'turn_right': [],
        'blink': []
    }

    for i, frame_path in enumerate(frame_paths):
        img = cv2.imread(frame_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        h, w = img.shape[:2]

        # Process with MediaPipe
        results = face_mesh.process(img_rgb)

        if results.multi_face_landmarks:
            face_landmarks = results.multi_face_landmarks[0]

            # Get current challenge before verification
            current_challenge = detector.get_current_challenge()

            # Verify challenge
            challenge_completed = detector.verify_challenge(face_landmarks, w, h)

            if challenge_completed:
                print(f"  ‚úÖ Challenge '{current_challenge}' completed at frame {i}")

            # Store frame for completed challenge
            if current_challenge and not detector.challenge_completed.get(current_challenge, True):
                challenge_frames[current_challenge].append(frame_path)

            # Calculate 3D depth features
            depth_features = calculate_3d_depth_features(face_landmarks, w, h)
            depth_features_list.append(depth_features)

            # Get embedding for face quality
            faces = app.get(img)
            if len(faces) > 0:
                face = faces[0]
                face_embeddings.append(face.embedding)

                bbox = face.bbox.astype(int)
                face_size = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])

                # Calculate frame quality
                yaw, pitch, roll = calculate_head_pose(face_landmarks, w, h)
                pose_score = 1.0 - (abs(pitch) + abs(roll)) / 60.0

                quality_score = (face_size / 10000) * 0.6 + pose_score * 0.4
                frame_quality_scores.append((i, quality_score, frame_path))

    # Select best frame (frontal pose with good quality)
    if len(frame_quality_scores) > 0:
        frame_quality_scores.sort(key=lambda x: x[1], reverse=True)
        best_frame_idx, best_quality, best_frame_path = frame_quality_scores[0]
    else:
        best_frame_path = frame_paths[len(frame_paths) // 2] if frame_paths else None

    # Analyze 3D depth
    is_3d, depth_score, depth_reasons = is_3d_face(depth_features_list)

    # Calculate final liveness score
    all_challenges_passed = detector.is_complete()
    challenges_passed = sum(detector.challenge_completed.values())

    challenge_score = challenges_passed / len(detector.challenges)
    final_score = (challenge_score * 0.6 + depth_score * 0.4)

    is_live = all_challenges_passed and is_3d

    print(f"\n{'='*60}")
    print(f"ACTIVE LIVENESS DETECTION RESULTS")
    print(f"{'='*60}")
    print(f"Challenge Verification:")
    print(f"  - Turn Left: {'‚úÖ PASS' if detector.challenge_completed['turn_left'] else '‚ùå FAIL'}")
    print(f"  - Turn Right: {'‚úÖ PASS' if detector.challenge_completed['turn_right'] else '‚ùå FAIL'}")
    print(f"  - Blink: {'‚úÖ PASS' if detector.challenge_completed['blink'] else '‚ùå FAIL'}")
    print(f"\n3D Depth Validation:")
    print(f"  - 3D Face Detection: {'‚úÖ PASS' if is_3d else '‚ùå FAIL'} (Score: {depth_score:.2f})")
    print(f"\nOverall:")
    print(f"  - Challenges Passed: {challenges_passed}/3")
    print(f"  - Final Liveness Score: {final_score:.2f}")
    print(f"  - Result: {'‚úÖ LIVE PERSON DETECTED' if is_live else '‚ùå LIVENESS CHECK FAILED'}")
    print(f"{'='*60}")

    return is_live, final_score, best_frame_path, {
        'challenges': detector.challenge_completed,
        'challenges_passed': challenges_passed,
        'is_3d': is_3d,
        'depth_score': depth_score,
        'all_passed': all_challenges_passed
    }

# ==================== FACE VERIFICATION FUNCTIONS ====================

def enhance_image_quality(img):
    """Apply image enhancement techniques"""
    img_float = img.astype(np.float32) / 255.0
    lab = cv2.cvtColor((img_float * 255).astype(np.uint8), cv2.COLOR_BGR2LAB)
    lab[:,:,0] = cv2.equalizeHist(lab[:,:,0])
    enhanced = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
    enhanced = cv2.GaussianBlur(enhanced, (3, 3), 0.5)
    return enhanced

def extract_face_embedding(img, enhance=True, padding_ratio=0.15):
    """Extract face embedding from image"""
    original_img = img.copy()
    display_img = img.copy()

    if enhance:
        img = enhance_image_quality(img)

    faces = app.get(img)
    if len(faces) == 0:
        faces = app.get(original_img)
        if len(faces) == 0:
            return None, None, None, None

    face = max(faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]))

    bbox = face.bbox.astype(int)
    h, w = original_img.shape[:2]

    face_w = bbox[2] - bbox[0]
    face_h = bbox[3] - bbox[1]
    pad_w = int(face_w * padding_ratio)
    pad_h = int(face_h * padding_ratio)

    padded_bbox = [
        max(0, bbox[0] - pad_w),
        max(0, bbox[1] - pad_h),
        min(w, bbox[2] + pad_w),
        min(h, bbox[3] + pad_h)
    ]

    cv2.rectangle(display_img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 0, 255), 2)
    cv2.rectangle(display_img, (padded_bbox[0], padded_bbox[1]), (padded_bbox[2], padded_bbox[3]), (0, 255, 0), 3)

    face_crop = original_img[padded_bbox[1]:padded_bbox[3], padded_bbox[0]:padded_bbox[2]]

    quality_info = {
        'face_area': (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]),
        'pose': face.pose,
        'bbox': bbox,
    }

    return face.embedding, display_img, face_crop, quality_info

def calculate_similarity(embedding1, embedding2):
    """Calculate similarity between two embeddings"""
    if embedding1 is None or embedding2 is None:
        return 0.0

    embedding1 = embedding1 / np.linalg.norm(embedding1)
    embedding2 = embedding2 / np.linalg.norm(embedding2)

    cosine_sim = np.dot(embedding1, embedding2)
    return cosine_sim

def verify_with_active_liveness(id_card_path, live_frame_path, liveness_passed, liveness_data):
    """Verify face with active liveness check results"""

    print("\nüîÑ Performing face verification...")

    if not liveness_passed:
        print(f"\n{'='*60}")
        print(f"VERIFICATION FAILED - LIVENESS CHECK NOT PASSED")
        print(f"{'='*60}")
        print(f"Challenge Results:")
        print(f"  - Turn Left: {'‚úÖ' if liveness_data.get('challenges', {}).get('turn_left') else '‚ùå'}")
        print(f"  - Turn Right: {'‚úÖ' if liveness_data.get('challenges', {}).get('turn_right') else '‚ùå'}")
        print(f"  - Blink: {'‚úÖ' if liveness_data.get('challenges', {}).get('blink') else '‚ùå'}")
        print(f"  - 3D Depth: {'‚úÖ' if liveness_data.get('is_3d') else '‚ùå'}")
        print(f"\nResult: ‚ùå SPOOFING ATTEMPT DETECTED")
        print(f"{'='*60}")
        return False, 0.0

    img1 = cv2.imread(id_card_path)
    img2 = cv2.imread(live_frame_path)

    embedding1, img1_box, crop1, q1 = extract_face_embedding(img1)
    embedding2, img2_box, crop2, q2 = extract_face_embedding(img2)

    if embedding1 is None or embedding2 is None:
        print("‚ùå Could not extract face embeddings")
        return False, 0.0

    similarity = calculate_similarity(embedding1, embedding2)
    threshold = 0.25
    is_match = similarity > threshold

    # Display results
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 3, 1)
    plt.imshow(cv2.cvtColor(img1_box, cv2.COLOR_BGR2RGB))
    plt.title("ID Card", fontsize=12)
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.imshow(cv2.cvtColor(img2_box, cv2.COLOR_BGR2RGB))
    plt.title("Live Capture (Best Frame)", fontsize=12)
    plt.axis('off')

    plt.subplot(1, 3, 3)
    if crop1 is not None and crop2 is not None:
        combined = np.hstack([
            cv2.resize(crop1, (150, 150)),
            cv2.resize(crop2, (150, 150))
        ])
        plt.imshow(cv2.cvtColor(combined, cv2.COLOR_BGR2RGB))
        plt.title("Face Comparison", fontsize=12)
    plt.axis('off')

    plt.tight_layout()
    plt.show()

    print(f"\n{'='*60}")
    print(f"FINAL VERIFICATION RESULTS")
    print(f"{'='*60}")
    print(f"Active Liveness Challenges:")
    print(f"  - Turn Left: ‚úÖ PASS")
    print(f"  - Turn Right: ‚úÖ PASS")
    print(f"  - Blink: ‚úÖ PASS")
    print(f"  - 3D Depth Validation: ‚úÖ PASS (Score: {liveness_data.get('depth_score', 0):.2f})")
    print(f"\nFace Matching:")
    print(f"  - Similarity Score: {similarity:.4f}")
    print(f"  - Threshold: {threshold:.4f}")
    print(f"  - Result: {'‚úÖ MATCH - Same Person' if is_match else '‚ùå NO MATCH - Different Person'}")
    print(f"  - Confidence: {similarity * 100:.1f}%")
    print(f"{'='*60}")

    # Save results
    if crop2 is not None:
        save_dir = "verification_results"
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        cv2.imwrite(f"{save_dir}/live_face.jpg", crop2)

        with open(f"{save_dir}/results.txt", 'w') as f:
            f.write(f"Face Verification with Active Liveness\n")
            f.write(f"======================================\n\n")
            f.write(f"Active Liveness Challenges:\n")
            f.write(f"  - Turn Left: PASS\n")
            f.write(f"  - Turn Right: PASS\n")
            f.write(f"  - Blink: PASS\n")
            f.write(f"  - 3D Depth: PASS (Score: {liveness_data.get('depth_score', 0):.2f})\n\n")
            f.write(f"Face Matching:\n")
            f.write(f"  - Similarity: {similarity:.4f}\n")
            f.write(f"  - Result: {'MATCH' if is_match else 'NO MATCH'}\n")
            f.write(f"  - Confidence: {similarity * 100:.1f}%\n")

        print(f"\nüìÅ Results saved: {save_dir}/live_face.jpg, {save_dir}/results.txt")

    return is_match, similarity

# ==================== MAIN EXECUTION ====================

print("\n" + "=" * 70)
print("üé• FACE VERIFICATION WITH ACTIVE LIVENESS DETECTION")
print("=" * 70)
print("\nThis system uses:")
print("  ‚úì Real-time video stream capture")
print("  ‚úì MediaPipe Face Mesh for precise landmark tracking")
print("  ‚úì Active liveness challenges (turn left, turn right, blink)")
print("  ‚úì 3D depth analysis for anti-spoofing")
print("  ‚úì Face recognition with InsightFace Buffalo_L")
print("\nüîí Security Features:")
print("  ‚úì Multi-challenge verification (prevents replay attacks)")
print("  ‚úì Head pose estimation (detects 2D photos)")
print("  ‚úì Eye blink detection (verifies live person)")
print("  ‚úì 3D depth analysis (distinguishes real face from screen/photo)")
print("  ‚úì Real-time instruction following (active participation required)")
print("=" * 70)

# Step 1: Upload ID card
print("\nüì§ STEP 1: Upload ID Card Photo")
print("-" * 70)
uploaded = files.upload()
id_card_path = list(uploaded.keys())[0]
print(f"‚úÖ Loaded: {id_card_path}")

# Step 2: Capture video with active liveness
print("\nüìπ STEP 2: Active Liveness Detection")
print("-" * 70)
print("‚ö†Ô∏è  Important: Follow all instructions carefully")
print("    The system will ask you to:")
print("    1. Turn your head LEFT")
print("    2. Turn your head RIGHT")
print("    3. BLINK your eyes")
print("\nüé¨ Starting camera in 3 seconds...")
time.sleep(1)
print("    2...")
time.sleep(1)
print("    1...")
time.sleep(1)
print("    GO! üé•\n")

frame_paths = capture_video_with_active_liveness(max_duration=15)

# Step 3: Analyze active liveness
print("\nüîç STEP 3: Analyzing Active Liveness Challenges")
print("-" * 70)
is_live, liveness_score, best_frame, liveness_data = analyze_active_liveness(frame_paths)

# Step 4: Verify if liveness passed
print("\n‚úÖ STEP 4: Face Verification")
print("-" * 70)
if is_live:
    is_match, similarity = verify_with_active_liveness(id_card_path, best_frame, is_live, liveness_data)

    if is_match:
        print("\nüéâ VERIFICATION SUCCESSFUL!")
        print("=" * 70)
        print("‚úÖ Identity Confirmed")
        print("‚úÖ Live Person Detected")
        print("‚úÖ All Security Checks Passed")
        print("=" * 70)
    else:
        print("\n‚ö†Ô∏è  VERIFICATION FAILED")
        print("=" * 70)
        print("‚úÖ Live Person Detected")
        print("‚ùå Face Does Not Match ID Card")
        print("=" * 70)
else:
    print(f"\n{'='*70}")
    print(f"VERIFICATION ABORTED - LIVENESS CHECK FAILED")
    print(f"{'='*70}")
    print(f"\nChallenge Results:")
    challenges = liveness_data.get('challenges', {})
    print(f"  - Turn Left: {'‚úÖ PASS' if challenges.get('turn_left') else '‚ùå FAIL'}")
    print(f"  - Turn Right: {'‚úÖ PASS' if challenges.get('turn_right') else '‚ùå FAIL'}")
    print(f"  - Blink: {'‚úÖ PASS' if challenges.get('blink') else '‚ùå FAIL'}")
    print(f"  - 3D Depth: {'‚úÖ PASS' if liveness_data.get('is_3d') else '‚ùå FAIL'}")
    print(f"\nPossible Issues:")

    if not challenges.get('turn_left'):
        print("  ‚ö†Ô∏è  Did not detect head turn to the LEFT")
    if not challenges.get('turn_right'):
        print("  ‚ö†Ô∏è  Did not detect head turn to the RIGHT")
    if not challenges.get('blink'):
        print("  ‚ö†Ô∏è  Did not detect eye BLINK")
    if not liveness_data.get('is_3d'):
        print("  ‚ö†Ô∏è  Possible photo or screen attack detected (no 3D depth)")

    print(f"\n{'='*70}")
    print("‚ùå SPOOFING ATTEMPT DETECTED")
    print("Please try again and follow all instructions carefully")
    print(f"{'='*70}")

print("\n" + "=" * 70)
print("üèÅ VERIFICATION PROCESS COMPLETE")
print("=" * 70)

# Cleanup frames
import shutil
if os.path.exists('liveness_frames'):
    shutil.rmtree('liveness_frames')
    print("üßπ Cleaned up temporary frames")

NameError: name 'FaceAnalysis' is not defined

In [None]:
# ==================== HEAD POSE ESTIMATION WITH solvePnP ====================

# 3D model points (generic human face model in cm)
MODEL_POINTS = np.array([
    (0.0, 0.0, 0.0),           # Nose tip
    (0.0, -3.3, -2.5),         # Chin
    (-2.3, 1.0, -1.4),         # Left eye left corner
    (2.3, 1.0, -1.4),          # Right eye right corner
    (-1.5, -1.5, -1.0),        # Left mouth corner
    (1.5, -1.5, -1.0)          # Right mouth corner
], dtype=np.float64)

# MediaPipe landmark indices for the corresponding 3D model points
POSE_LANDMARKS = [1, 152, 33, 263, 61, 291]

def calculate_head_pose_solvepnp(face_landmarks, img_width, img_height):
    """Calculate head pose angles using cv2.solvePnP"""
    # Extract 2D image points
    image_points = np.array([
        [face_landmarks.landmark[idx].x * img_width,
         face_landmarks.landmark[idx].y * img_height]
        for idx in POSE_LANDMARKS
    ], dtype=np.float64)

    # Camera matrix (assuming centered principal point)
    focal_length = img_width
    center = (img_width / 2, img_height / 2)
    camera_matrix = np.array([
        [focal_length, 0, center[0]],
        [0, focal_length, center[1]],
        [0, 0, 1]
    ], dtype=np.float64)

    # Assuming no lens distortion
    dist_coeffs = np.zeros((4, 1))

    # Solve PnP
    success, rotation_vec, translation_vec = cv2.solvePnP(
        MODEL_POINTS,
        image_points,
        camera_matrix,
        dist_coeffs,
        flags=cv2.SOLVEPNP_ITERATIVE
    )

    if not success:
        return 0, 0, 0

    # Convert rotation vector to rotation matrix
    rotation_mat, _ = cv2.Rodrigues(rotation_vec)

    # Calculate Euler angles
    pose_mat = cv2.hconcat((rotation_mat, translation_vec))
    _, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)

    pitch = euler_angles[0][0]
    yaw = euler_angles[1][0]
    roll = euler_angles[2][0]

    return yaw, pitch, roll

class ActiveLivenessDetector:
    """Active liveness detection with head turn challenges only"""

    def __init__(self):
        self.YAW_THRESHOLD = 15  # degrees for left/right turn
        self.STABLE_FRAMES = 15  # frames needed to confirm movement
        self.HOLD_FRAMES = 10  # frames to hold position after detection before moving to next
        self.challenges = ['turn_left', 'turn_right']
        self.current_challenge_idx = 0
        self.challenge_completed = {
            'turn_left': False,
            'turn_right': False
        }
        self.yaw_history = []
        self.frame_count = 0
        self.challenge_start_frame = 0
        self.last_feedback = ""
        self.hold_counter = 0  # Counter for holding position
        self.challenge_detected_but_not_confirmed = False

    def get_current_challenge(self):
        """Get current challenge instruction"""
        if self.current_challenge_idx >= len(self.challenges):
            return None
        return self.challenges[self.current_challenge_idx]

    def get_instruction_text(self):
        """Get instruction text for current challenge"""
        challenge = self.get_current_challenge()
        if challenge == 'turn_left':
            return "üëà Turn your head LEFT"
        elif challenge == 'turn_right':
            return "üëâ Turn your head RIGHT"
        return "‚úÖ All challenges complete!"

    def verify_challenge(self, face_landmarks, img_width, img_height):
        """Verify if user completed current challenge using solvePnP"""
        self.frame_count += 1
        challenge = self.get_current_challenge()

        if challenge is None:
            return True, "All challenges completed!"

        # Calculate head pose using solvePnP
        yaw, pitch, roll = calculate_head_pose_solvepnp(face_landmarks, img_width, img_height)
        self.yaw_history.append(yaw)

        # Keep only last 30 frames
        if len(self.yaw_history) > 30:
            self.yaw_history.pop(0)

        completed = False
        feedback = ""

        # Check if we have enough frames for stable detection
        if len(self.yaw_history) >= self.STABLE_FRAMES:
            recent_yaw = self.yaw_history[-self.STABLE_FRAMES:]
            avg_yaw = np.mean(recent_yaw)

            # Check TURN LEFT challenge
            if challenge == 'turn_left':
                # Negative yaw = turning left (user's left, camera mirrored)
                if avg_yaw < -self.YAW_THRESHOLD:
                    if not self.challenge_detected_but_not_confirmed:
                        self.challenge_detected_but_not_confirmed = True
                        self.hold_counter = 0
                        feedback = f"‚úÖ LEFT turn detected! Hold position... ({self.HOLD_FRAMES} frames)"
                    else:
                        self.hold_counter += 1
                        remaining = self.HOLD_FRAMES - self.hold_counter
                        feedback = f"‚úÖ Hold LEFT position... ({remaining} frames remaining)"

                        if self.hold_counter >= self.HOLD_FRAMES:
                            completed = True
                            feedback = f"‚úÖ LEFT turn CONFIRMED! (Yaw: {avg_yaw:.1f}¬∞)"
                elif avg_yaw > self.YAW_THRESHOLD:
                    self.challenge_detected_but_not_confirmed = False
                    self.hold_counter = 0
                    feedback = f"‚ùå Wrong direction! Turn LEFT not right (Yaw: {avg_yaw:.1f}¬∞)"
                else:
                    self.challenge_detected_but_not_confirmed = False
                    self.hold_counter = 0
                    feedback = f"‚è≥ Turn more to the LEFT (Yaw: {avg_yaw:.1f}¬∞)"

            # Check TURN RIGHT challenge
            elif challenge == 'turn_right':
                # Positive yaw = turning right (user's right, camera mirrored)
                if avg_yaw > self.YAW_THRESHOLD:
                    if not self.challenge_detected_but_not_confirmed:
                        self.challenge_detected_but_not_confirmed = True
                        self.hold_counter = 0
                        feedback = f"‚úÖ RIGHT turn detected! Hold position... ({self.HOLD_FRAMES} frames)"
                    else:
                        self.hold_counter += 1
                        remaining = self.HOLD_FRAMES - self.hold_counter
                        feedback = f"‚úÖ Hold RIGHT position... ({remaining} frames remaining)"

                        if self.hold_counter >= self.HOLD_FRAMES:
                            completed = True
                            feedback = f"‚úÖ RIGHT turn CONFIRMED! (Yaw: {avg_yaw:.1f}¬∞)"
                elif avg_yaw < -self.YAW_THRESHOLD:
                    self.challenge_detected_but_not_confirmed = False
                    self.hold_counter = 0
                    feedback = f"‚ùå Wrong direction! Turn RIGHT not left (Yaw: {avg_yaw:.1f}¬∞)"
                else:
                    self.challenge_detected_but_not_confirmed = False
                    self.hold_counter = 0
                    feedback = f"‚è≥ Turn more to the RIGHT (Yaw: {avg_yaw:.1f}¬∞)"
        else:
            feedback = f"‚è≥ Detecting movement... ({len(self.yaw_history)}/{self.STABLE_FRAMES} frames)"

        # Only move to next challenge if current one is completed
        if completed and not self.challenge_completed[challenge]:
            self.challenge_completed[challenge] = True
            self.current_challenge_idx += 1
            self.challenge_start_frame = self.frame_count
            self.yaw_history.clear()  # Clear history for next challenge
            self.challenge_detected_but_not_confirmed = False
            self.hold_counter = 0
            self.last_feedback = feedback

            # Add a pause before next challenge
            time.sleep(0.5)
            return True, feedback

        self.last_feedback = feedback
        return False, feedback

    def is_complete(self):
        """Check if all challenges are completed"""
        return all(self.challenge_completed.values())

    def get_progress(self):
        """Get progress percentage"""
        completed_count = sum(self.challenge_completed.values())
        return int((completed_count / len(self.challenges)) * 100)

# ==================== ANALYSIS FUNCTION ====================

def analyze_active_liveness(frame_paths):
    """Analyze captured frames with active challenge verification"""

    print("\nüîç Analyzing active liveness challenges...")
    print("=" * 60)

    detector = ActiveLivenessDetector()
    face_embeddings = []
    frame_quality_scores = []
    challenge_verification_frames = {'turn_left': None, 'turn_right': None}

    last_printed_feedback = ""

    for i, frame_path in enumerate(frame_paths):
        img = cv2.imread(frame_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        h, w = img.shape[:2]

        results = face_mesh.process(img_rgb)

        if results.multi_face_landmarks:
            face_landmarks = results.multi_face_landmarks[0]

            current_challenge = detector.get_current_challenge()
            challenge_completed, feedback = detector.verify_challenge(face_landmarks, w, h)

            # Print feedback only when it changes (reduce spam)
            if feedback != last_printed_feedback:
                print(f"Frame {i:3d}: {feedback}")
                last_printed_feedback = feedback

            if challenge_completed:
                print(f"\n{'='*60}")
                print(f"‚úÖ CHALLENGE COMPLETED: '{current_challenge}' at frame {i}")
                print(f"   {feedback}")
                challenge_verification_frames[current_challenge] = i
                print(f"{'='*60}\n")

                # Show next challenge
                next_challenge = detector.get_current_challenge()
                if next_challenge:
                    print(f"üìã NEXT CHALLENGE: {detector.get_instruction_text()}")
                    print(f"{'-'*60}\n")

            # Extract face for quality scoring
            faces = app.get(img)
            if len(faces) > 0:
                face = faces[0]
                face_embeddings.append(face.embedding)

                bbox = face.bbox.astype(int)
                face_size = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])

                yaw, pitch, roll = calculate_head_pose_solvepnp(face_landmarks, w, h)
                pose_score = 1.0 - (abs(pitch) + abs(roll)) / 60.0

                quality_score = (face_size / 10000) * 0.6 + pose_score * 0.4
                frame_quality_scores.append((i, quality_score, frame_path))

    # Select best frame
    if len(frame_quality_scores) > 0:
        frame_quality_scores.sort(key=lambda x: x[1], reverse=True)
        best_frame_idx, best_quality, best_frame_path = frame_quality_scores[0]
    else:
        best_frame_path = frame_paths[len(frame_paths) // 2] if frame_paths else None

    # Calculate results
    all_challenges_passed = detector.is_complete()
    challenges_passed = sum(detector.challenge_completed.values())

    final_score = challenges_passed / len(detector.challenges)
    is_live = all_challenges_passed

    print(f"\n{'='*60}")
    print(f"ACTIVE LIVENESS DETECTION RESULTS")
    print(f"{'='*60}")
    print(f"Challenge Verification:")
    print(f"  - Turn Left: {'‚úÖ PASS' if detector.challenge_completed['turn_left'] else '‚ùå FAIL'}")
    if challenge_verification_frames['turn_left']:
        print(f"    ‚îî‚îÄ Verified at frame {challenge_verification_frames['turn_left']}")
    print(f"  - Turn Right: {'‚úÖ PASS' if detector.challenge_completed['turn_right'] else '‚ùå FAIL'}")
    if challenge_verification_frames['turn_right']:
        print(f"    ‚îî‚îÄ Verified at frame {challenge_verification_frames['turn_right']}")
    print(f"\nOverall:")
    print(f"  - Challenges Passed: {challenges_passed}/2")
    print(f"  - Final Liveness Score: {final_score:.2f}")
    print(f"  - Result: {'‚úÖ LIVE PERSON DETECTED' if is_live else '‚ùå LIVENESS CHECK FAILED'}")
    print(f"{'='*60}")

    return is_live, final_score, best_frame_path, {
        'challenges': detector.challenge_completed,
        'challenges_passed': challenges_passed,
        'all_passed': all_challenges_passed,
        'verification_frames': challenge_verification_frames
    }
# ==================== MAIN EXECUTION ====================

print("\n" + "=" * 70)
print("üé• ACTIVE LIVENESS DETECTION WITH HEAD TURNS")
print("=" * 70)
print("\nThis system uses:")
print("  ‚úì cv2.solvePnP() for accurate 3D head pose estimation")
print("  ‚úì MediaPipe Face Mesh for facial landmark detection")
print("  ‚úì Active head turn challenges (left and right)")
print("=" * 70)

# Step 1: Capture video with active liveness
print("\nüìπ STEP 1: Active Liveness Detection")
print("-" * 70)
print("‚ö†Ô∏è  Important: Follow all instructions carefully")
print("    The system will ask you to:")
print("    1. Turn your head LEFT (and keep it there)")
print("    2. Turn your head RIGHT (and keep it there)")
print("\nüé¨ Starting camera in 3 seconds...")
time.sleep(1)
print("    2...")
time.sleep(1)
print("    1...")
time.sleep(1)
print("    GO! üé•\n")

frame_paths = capture_video_with_active_liveness(max_duration=20)

# Step 2: Analyze active liveness
print("\nüîç STEP 2: Analyzing Active Liveness Challenges")
print("-" * 70)
is_live, liveness_score, best_frame, liveness_data = analyze_active_liveness(frame_paths)

# Step 3: Final Results
print("\nüìä STEP 3: Final Results")
print("-" * 70)
if is_live:
    print("\nüéâ LIVENESS VERIFICATION SUCCESSFUL!")
    print("=" * 70)
    print("‚úÖ Live Person Detected")
    print("‚úÖ All Challenges Passed")
    print(f"‚úÖ Liveness Score: {liveness_score:.2f}")
    print("=" * 70)

    # Save best frame
    if best_frame:
        save_dir = "liveness_results"
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        import shutil
        shutil.copy(best_frame, f"{save_dir}/best_frame.jpg")

        with open(f"{save_dir}/results.txt", 'w') as f:
            f.write(f"Active Liveness Detection Results\n")
            f.write(f"==================================\n\n")
            f.write(f"Status: PASS\n")
            f.write(f"Liveness Score: {liveness_score:.2f}\n")
            f.write(f"Challenges Passed: {liveness_data['challenges_passed']}/2\n")
            f.write(f"\nChallenge Details:\n")
            f.write(f"  - Turn Left: PASS\n")
            f.write(f"  - Turn Right: PASS\n")

        print(f"\nüìÅ Results saved to: {save_dir}/")

        # Display best frame
        img = cv2.imread(best_frame)
        plt.figure(figsize=(8, 6))
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        plt.title("Best Quality Frame", fontsize=14)
        plt.axis('off')
        plt.tight_layout()
        plt.show()
else:
    print(f"\n{'='*70}")
    print(f"LIVENESS VERIFICATION FAILED")
    print(f"{'='*70}")
    print(f"\nChallenge Results:")
    challenges = liveness_data.get('challenges', {})
    print(f"  - Turn Left: {'‚úÖ PASS' if challenges.get('turn_left') else '‚ùå FAIL'}")
    print(f"  - Turn Right: {'‚úÖ PASS' if challenges.get('turn_right') else '‚ùå FAIL'}")
    print(f"\nLiveness Score: {liveness_score:.2f}")
    print(f"\nPossible Issues:")
    print(f"  - User may not have turned head far enough")
    print(f"  - User may have turned in wrong direction")
    print(f"  - Time limit may have expired")
    print(f"  - Poor lighting or camera angle")
    print(f"{'='*70}")


üé• ACTIVE LIVENESS DETECTION WITH HEAD TURNS

This system uses:
  ‚úì cv2.solvePnP() for accurate 3D head pose estimation
  ‚úì MediaPipe Face Mesh for facial landmark detection
  ‚úì Active head turn challenges (left and right)

üìπ STEP 1: Active Liveness Detection
----------------------------------------------------------------------
‚ö†Ô∏è  Important: Follow all instructions carefully
    The system will ask you to:
    1. Turn your head LEFT (and keep it there)
    2. Turn your head RIGHT (and keep it there)

üé¨ Starting camera in 3 seconds...


NameError: name 'time' is not defined

In [None]:
!pip install -q insightface onnxruntime opencv-python-headless


[?25l     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/439.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m [32m430.1/439.5 kB[0m [31m23.6 MB/s[0m eta [36m0:00:01[0m[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m439.5/439.5 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m62.0/62.0 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚î

In [None]:
!kill -9 $(lsof -t -i:8000)


kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... or kill -l [sigspec]


In [None]:
!pip install fastapi uvicorn nest-asyncio pyngrok pillow transformers

!ngrok authtoken 33jcr7UNwSYu9mWfljJaV82jLrp_xSotDnd4AJuQupiqMoRY

import nest_asyncio
from fastapi import FastAPI, UploadFile, File
from pyngrok import ngrok
import torch
from PIL import Image
import uvicorn
import asyncio
import os
from fastapi.middleware.cors import CORSMiddleware

# Apply nest_asyncio for Colab
nest_asyncio.apply()

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
!pip install mediapipe

Collecting numpy<2 (from mediapipe)
  Using cached numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
Using cached numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.0 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 2.2.6
    Uninstalling numpy-2.2.6:
      Successfully uninstalled numpy-2.2.6
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pytensor 2.35.1 requires numpy>=2.0, but you have numpy 1.26.4 which is incompatible.
opencv-python 4.12.0.88 requires numpy<2.3.0,>=2; python_version >= "3.9", but you have numpy 1.26.4 which is incompatible.
ydf 0.13.0 requires protobuf<7.0.0,>=5.29.1, but you have protobuf 4.25.8 which is incompatible.
opencv-python-headless 4.12.0.88 requires numpy<2.3.0,>=2; python_version >= "3.9", but you h

In [None]:
from flask import Flask, request, jsonify

import cv2
import numpy as np
import mediapipe as mp
import base64
from io import BytesIO
from PIL import Image

NameError: name 'audio_classifier' is not defined

In [None]:
###good ver

In [None]:
# Install required packages (run this first in a separate cell)
# !pip install fastapi uvicorn pyngrok nest-asyncio opencv-python mediapipe pillow numpy

import cv2
import numpy as np
import mediapipe as mp
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import base64
from io import BytesIO
from PIL import Image
import nest_asyncio
from pyngrok import ngrok
import uvicorn
from threading import Thread

# Apply nest_asyncio to allow nested event loops in Colab
nest_asyncio.apply()

# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=False,
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

# Initialize FastAPI
app = FastAPI()

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 3D model points (generic human face model in cm)
MODEL_POINTS = np.array([
    (0.0, 0.0, 0.0),           # Nose tip
    (0.0, -3.3, -2.5),         # Chin
    (-2.3, 1.0, -1.4),         # Left eye left corner
    (2.3, 1.0, -1.4),          # Right eye right corner
    (-1.5, -1.5, -1.0),        # Left mouth corner
    (1.5, -1.5, -1.0)          # Right mouth corner
], dtype=np.float64)

POSE_LANDMARKS = [1, 152, 33, 263, 61, 291]

# Eye landmarks for blink detection
LEFT_EYE_LANDMARKS = [362, 385, 387, 263, 373, 380]
RIGHT_EYE_LANDMARKS = [33, 160, 158, 133, 153, 144]

def calculate_head_pose_solvepnp(face_landmarks, img_width, img_height):
    """Calculate head pose angles using cv2.solvePnP"""
    image_points = np.array([
        [face_landmarks.landmark[idx].x * img_width,
         face_landmarks.landmark[idx].y * img_height]
        for idx in POSE_LANDMARKS
    ], dtype=np.float64)

    focal_length = img_width
    center = (img_width / 2, img_height / 2)
    camera_matrix = np.array([
        [focal_length, 0, center[0]],
        [0, focal_length, center[1]],
        [0, 0, 1]
    ], dtype=np.float64)

    dist_coeffs = np.zeros((4, 1))

    success, rotation_vec, translation_vec = cv2.solvePnP(
        MODEL_POINTS,
        image_points,
        camera_matrix,
        dist_coeffs,
        flags=cv2.SOLVEPNP_ITERATIVE
    )

    if not success:
        return 0, 0, 0

    rotation_mat, _ = cv2.Rodrigues(rotation_vec)
    pose_mat = cv2.hconcat((rotation_mat, translation_vec))
    _, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)

    pitch = euler_angles[0][0]
    yaw = -euler_angles[1][0]  # to match user perspective
    roll = euler_angles[2][0]
    return yaw, pitch, roll

def calculate_eye_aspect_ratio(eye_landmarks, face_landmarks, img_width, img_height):
    """Calculate Eye Aspect Ratio (EAR) for blink detection"""
    # Get eye landmark coordinates
    points = []
    for idx in eye_landmarks:
        landmark = face_landmarks.landmark[idx]
        points.append([landmark.x * img_width, landmark.y * img_height])

    points = np.array(points)

    # Calculate vertical distances
    vertical_1 = np.linalg.norm(points[1] - points[5])
    vertical_2 = np.linalg.norm(points[2] - points[4])

    # Calculate horizontal distance
    horizontal = np.linalg.norm(points[0] - points[3])

    # Calculate EAR
    ear = (vertical_1 + vertical_2) / (2.0 * horizontal)
    return ear

def detect_blink(face_landmarks, img_width, img_height):
    """Detect if eyes are closed (blink)"""
    left_ear = calculate_eye_aspect_ratio(LEFT_EYE_LANDMARKS, face_landmarks, img_width, img_height)
    right_ear = calculate_eye_aspect_ratio(RIGHT_EYE_LANDMARKS, face_landmarks, img_width, img_height)

    # Average EAR
    avg_ear = (left_ear + right_ear) / 2.0

    # Threshold for closed eyes (lower value = more closed)
    EAR_THRESHOLD = 0.2

    return avg_ear < EAR_THRESHOLD, avg_ear

class ActiveLivenessDetector:
    def __init__(self):
        self.YAW_THRESHOLD = 20  # Degrees to require for head turn
        self.STABLE_FRAMES = 10
        self.HOLD_FRAMES = 8

        # Blink detection parameters
        self.BLINK_CONSECUTIVE_FRAMES = 2  # Frames with eyes closed
        self.BLINK_REQUIRED = 2  # Number of blinks needed

        self.challenges = ['blink', 'turn_left', 'turn_right']
        self.current_challenge_idx = 0
        self.challenge_completed = {
            'blink': False,
            'turn_left': False,
            'turn_right': False
        }

        self.yaw_history = []
        self.hold_counter = 0
        self.challenge_detected = False

        # Blink tracking
        self.blink_counter = 0
        self.eye_closed_frames = 0
        self.was_eye_open = True
        self.ear_history = []

    def get_current_challenge(self):
        if self.current_challenge_idx >= len(self.challenges):
            return None
        return self.challenges[self.current_challenge_idx]

    def reset(self):
        self.__init__()

    def verify_frame(self, face_landmarks, img_width, img_height):
        challenge = self.get_current_challenge()

        if challenge is None:
            return {
                'status': 'completed',
                'message': 'üéâ All challenges completed!',
                'current_challenge': None,
                'progress': 100,
                'yaw': 0,
                'ear': 0
            }

        # Calculate head pose
        yaw, pitch, roll = calculate_head_pose_solvepnp(face_landmarks, img_width, img_height)
        self.yaw_history.append(yaw)

        if len(self.yaw_history) > 30:
            self.yaw_history.pop(0)

        # Detect blink
        is_eye_closed, ear = detect_blink(face_landmarks, img_width, img_height)
        self.ear_history.append(ear)
        if len(self.ear_history) > 30:
            self.ear_history.pop(0)

        # Handle BLINK challenge
        if challenge == 'blink':
            if is_eye_closed:
                self.eye_closed_frames += 1
            else:
                # Eye just opened after being closed
                if self.eye_closed_frames >= self.BLINK_CONSECUTIVE_FRAMES and not self.was_eye_open:
                    self.blink_counter += 1
                    if self.blink_counter >= self.BLINK_REQUIRED:
                        self.challenge_completed['blink'] = True
                        self.current_challenge_idx += 1
                        next_challenge = self.get_current_challenge()

                        return {
                            'status': 'challenge_completed',
                            'message': f"‚úÖ BLINK COMPLETE! Now: {next_challenge.replace('_', ' ').upper() if next_challenge else 'DONE'}",
                            'current_challenge': next_challenge,
                            'progress': self.get_progress(),
                            'yaw': yaw,
                            'ear': ear,
                            'completed_challenge': 'blink'
                        }

                self.eye_closed_frames = 0

            self.was_eye_open = not is_eye_closed

            remaining = self.BLINK_REQUIRED - self.blink_counter
            message = f"üëÅÔ∏è Blink {remaining} more time(s) (EAR: {ear:.3f})"
            if is_eye_closed:
                message = f"üëÅÔ∏è Eyes closed... ({self.eye_closed_frames})"

            return {
                'status': 'in_progress',
                'message': message,
                'current_challenge': challenge,
                'progress': self.get_progress(),
                'yaw': yaw,
                'ear': ear,
                'blinks': self.blink_counter
            }

        # Handle HEAD TURN challenges
        if len(self.yaw_history) >= self.STABLE_FRAMES:
            recent_yaw = self.yaw_history[-self.STABLE_FRAMES:]
            avg_yaw = np.mean(recent_yaw)

            challenge_met = False
            message = ""

            if challenge == 'turn_left':
                if avg_yaw < -self.YAW_THRESHOLD:
                    challenge_met = True
                    if not self.challenge_detected:
                        self.challenge_detected = True
                        self.hold_counter = 0
                        message = f"‚úÖ LEFT detected! Hold... ({self.HOLD_FRAMES})"
                    else:
                        self.hold_counter += 1
                        remaining = self.HOLD_FRAMES - self.hold_counter
                        message = f"‚úÖ Hold LEFT... ({remaining} remaining)"
                elif avg_yaw > self.YAW_THRESHOLD:
                    self.challenge_detected = False
                    self.hold_counter = 0
                    message = f"‚ùå Wrong way! Turn LEFT (Yaw: {avg_yaw:.1f}¬∞)"
                else:
                    self.challenge_detected = False
                    self.hold_counter = 0
                    message = f"üëà Turn more LEFT (Yaw: {avg_yaw:.1f}¬∞)"

            elif challenge == 'turn_right':
                if avg_yaw > self.YAW_THRESHOLD:
                    challenge_met = True
                    if not self.challenge_detected:
                        self.challenge_detected = True
                        self.hold_counter = 0
                        message = f"‚úÖ RIGHT detected! Hold... ({self.HOLD_FRAMES})"
                    else:
                        self.hold_counter += 1
                        remaining = self.HOLD_FRAMES - self.hold_counter
                        message = f"‚úÖ Hold RIGHT... ({remaining} remaining)"
                elif avg_yaw < -self.YAW_THRESHOLD:
                    self.challenge_detected = False
                    self.hold_counter = 0
                    message = f"‚ùå Wrong way! Turn RIGHT (Yaw: {avg_yaw:.1f}¬∞)"
                else:
                    self.challenge_detected = False
                    self.hold_counter = 0
                    message = f"üëâ Turn more RIGHT (Yaw: {avg_yaw:.1f}¬∞)"

            if challenge_met and self.hold_counter >= self.HOLD_FRAMES:
                self.challenge_completed[challenge] = True
                self.current_challenge_idx += 1
                self.yaw_history.clear()
                self.challenge_detected = False
                self.hold_counter = 0

                next_challenge = self.get_current_challenge()
                if next_challenge:
                    message = f"‚úÖ {challenge.replace('_', ' ').upper()} COMPLETE! Now: {next_challenge.replace('_', ' ').upper()}"
                else:
                    message = "üéâ ALL CHALLENGES COMPLETE!"

                return {
                    'status': 'challenge_completed',
                    'message': message,
                    'current_challenge': next_challenge,
                    'progress': self.get_progress(),
                    'yaw': avg_yaw,
                    'ear': ear,
                    'completed_challenge': challenge
                }
        else:
            message = f"‚è≥ Detecting... ({len(self.yaw_history)}/{self.STABLE_FRAMES})"
            avg_yaw = np.mean(self.yaw_history) if self.yaw_history else 0

        return {
            'status': 'in_progress',
            'message': message,
            'current_challenge': challenge,
            'progress': self.get_progress(),
            'yaw': avg_yaw,
            'ear': ear
        }

    def get_progress(self):
        completed = sum(self.challenge_completed.values())
        return int((completed / len(self.challenges)) * 100)

# Global detector instance
detector = ActiveLivenessDetector()

# Define request body model
class VerifyFrameRequest(BaseModel):
    image: str

# API Endpoints
@app.post("/api/reset")
def reset_detector():
    global detector
    detector = ActiveLivenessDetector()
    return {
        "status": "success",
        "message": "Detector reset"
    }

@app.post("/api/verify-frame")
async def verify_frame(request_data: VerifyFrameRequest):
    try:
        image_data = request_data.image

        # Decode base64 image
        image_data = image_data.split(",")[1]
        image_bytes = base64.b64decode(image_data)
        image = Image.open(BytesIO(image_bytes))
        img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)

        h, w = img.shape[:2]
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Process with MediaPipe
        results = face_mesh.process(img_rgb)

        if not results.multi_face_landmarks:
            return {
                "status": "no_face",
                "message": "‚ùå No face detected",
                "current_challenge": detector.get_current_challenge(),
                "progress": detector.get_progress(),
                "yaw": 0,
                "ear": 0
            }

        face_landmarks = results.multi_face_landmarks[0]
        result = detector.verify_frame(face_landmarks, w, h)
        return result

    except Exception as e:
        raise HTTPException(status_code=500, detail={
            "status": "error",
            "message": f"Error: {str(e)}",
            "current_challenge": detector.get_current_challenge(),
            "progress": 0,
            "yaw": 0,
            "ear": 0
        })

@app.get("/api/status")
def get_status():
    return {
        "current_challenge": detector.get_current_challenge(),
        "progress": detector.get_progress(),
        "challenges": detector.challenge_completed
    }

@app.get("/")
def root():
    return {
        "message": "Active Liveness Detection API (Blink + Head Turns)",
        "endpoints": {
            "POST /api/verify-frame": "Verify a frame with face detection",
            "POST /api/reset": "Reset the detector",
            "GET /api/status": "Get current status"
        },
        "challenges": ["blink", "turn_left", "turn_right"]
    }

# ngrok and Server Setup
# Kill existing ngrok tunnels
try:
    ngrok.kill()
except:
    pass

# Set your ngrok auth token (get it from https://dashboard.ngrok.com/get-started/your-authtoken)
# Uncomment and add your token:
# ngrok.set_auth_token("YOUR_NGROK_TOKEN_HERE")

# Create ngrok tunnel
port = 8000
public_url = ngrok.connect(port, bind_tls=True)
print(f"\n{'='*60}")
print(f"üöÄ FastAPI server is running!")
print(f"{'='*60}")
print(f"üì° Public URL: {public_url}")
print(f"{'='*60}\n")
print(f"Use this URL in your frontend to connect to the API")
print(f"Example endpoints:")
print(f"  - POST {public_url}/api/verify-frame")
print(f"  - POST {public_url}/api/reset")
print(f"  - GET  {public_url}/api/status")
print(f"  - GET  {public_url}/")
print(f"\n{'='*60}\n")

# Run uvicorn in a separate thread to avoid blocking
def run_server():
    uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")

# Start server in background thread
thread = Thread(target=run_server, daemon=True)
thread.start()

print("‚úÖ Server started successfully! Keep this cell running.")
print("‚ö†Ô∏è  Stop the cell to shut down the server.")

# Keep the cell running
try:
    thread.join()
except KeyboardInterrupt:
    print("\nüõë Server stopped.")
    ngrok.kill()

NameError: name 'core' is not defined

In [None]:
# Install required packages (run this first in a separate cell)
# !pip install fastapi uvicorn pyngrok nest-asyncio opencv-python mediapipe pillow numpy insightface onnxruntime-gpu

import cv2
import numpy as np
import mediapipe as mp
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import base64
from io import BytesIO
from PIL import Image
import nest_asyncio
from pyngrok import ngrok
import uvicorn
from threading import Thread
from insightface.app import FaceAnalysis

# Apply nest_asyncio to allow nested event loops in Colab
nest_asyncio.apply()

# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=False,
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

# Initialize InsightFace for face verification
face_app = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
face_app.prepare(ctx_id=0, det_size=(640, 640))

# Initialize FastAPI
app = FastAPI()

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 3D model points (generic human face model in cm)
MODEL_POINTS = np.array([
    (0.0, 0.0, 0.0),           # Nose tip
    (0.0, -3.3, -2.5),         # Chin
    (-2.3, 1.0, -1.4),         # Left eye left corner
    (2.3, 1.0, -1.4),          # Right eye right corner
    (-1.5, -1.5, -1.0),        # Left mouth corner
    (1.5, -1.5, -1.0)          # Right mouth corner
], dtype=np.float64)

POSE_LANDMARKS = [1, 152, 33, 263, 61, 291]

# Eye landmarks for blink detection
LEFT_EYE_LANDMARKS = [362, 385, 387, 263, 373, 380]
RIGHT_EYE_LANDMARKS = [33, 160, 158, 133, 153, 144]

def calculate_head_pose_solvepnp(face_landmarks, img_width, img_height):
    """Calculate head pose angles using cv2.solvePnP"""
    image_points = np.array([
        [face_landmarks.landmark[idx].x * img_width,
         face_landmarks.landmark[idx].y * img_height]
        for idx in POSE_LANDMARKS
    ], dtype=np.float64)

    focal_length = img_width
    center = (img_width / 2, img_height / 2)
    camera_matrix = np.array([
        [focal_length, 0, center[0]],
        [0, focal_length, center[1]],
        [0, 0, 1]
    ], dtype=np.float64)

    dist_coeffs = np.zeros((4, 1))

    success, rotation_vec, translation_vec = cv2.solvePnP(
        MODEL_POINTS,
        image_points,
        camera_matrix,
        dist_coeffs,
        flags=cv2.SOLVEPNP_ITERATIVE
    )

    if not success:
        return 0, 0, 0

    rotation_mat, _ = cv2.Rodrigues(rotation_vec)
    pose_mat = cv2.hconcat((rotation_mat, translation_vec))
    _, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)

    pitch = euler_angles[0][0]
    yaw = -euler_angles[1][0]
    roll = euler_angles[2][0]

        # Normalize pitch to -180 to 180 range
    # If pitch is > 90, it means the face is looking down (normalize it)
    if pitch > 90:
        pitch = pitch - 180
    elif pitch < -90:
        pitch = pitch + 180

    return yaw, pitch, roll

def calculate_eye_aspect_ratio(eye_landmarks, face_landmarks, img_width, img_height):
    """Calculate Eye Aspect Ratio (EAR) for blink detection"""
    points = []
    for idx in eye_landmarks:
        landmark = face_landmarks.landmark[idx]
        points.append([landmark.x * img_width, landmark.y * img_height])

    points = np.array(points)

    vertical_1 = np.linalg.norm(points[1] - points[5])
    vertical_2 = np.linalg.norm(points[2] - points[4])
    horizontal = np.linalg.norm(points[0] - points[3])

    ear = (vertical_1 + vertical_2) / (2.0 * horizontal)
    return ear

def detect_blink(face_landmarks, img_width, img_height):
    """Detect if eyes are closed (blink)"""
    left_ear = calculate_eye_aspect_ratio(LEFT_EYE_LANDMARKS, face_landmarks, img_width, img_height)
    right_ear = calculate_eye_aspect_ratio(RIGHT_EYE_LANDMARKS, face_landmarks, img_width, img_height)

    avg_ear = (left_ear + right_ear) / 2.0
    EAR_THRESHOLD = 0.2

    return avg_ear < EAR_THRESHOLD, avg_ear

def extract_face_embedding(img):
    """Extract face embedding using InsightFace"""
    try:
        faces = face_app.get(img)
        if len(faces) == 0:
            return None, "No face detected"

        # Return the embedding of the first (largest) face
        return faces[0].embedding, None
    except Exception as e:
        return None, f"Error extracting embedding: {str(e)}"

def compare_faces(embedding1, embedding2, threshold=0.4):
    """Compare two face embeddings using cosine similarity"""
    # Calculate cosine similarity
    similarity = np.dot(embedding1, embedding2) / (np.linalg.norm(embedding1) * np.linalg.norm(embedding2))

    # Convert to distance (lower is more similar)
    distance = 1 - similarity

    is_match = distance < threshold
    confidence = (1 - distance) * 100  # Convert to percentage

    return is_match, confidence, distance

class ActiveLivenessDetector:
    def __init__(self):
        self.YAW_THRESHOLD = 20
        self.PITCH_THRESHOLD = 20  # For frontal face detection
        self.STABLE_FRAMES = 10
        self.HOLD_FRAMES = 8
        self.FRONTAL_STABLE_FRAMES = 15  # More frames for frontal capture

        self.BLINK_CONSECUTIVE_FRAMES = 2
        self.BLINK_REQUIRED = 2

        self.challenges = ['blink', 'turn_left', 'turn_right', 'frontal_face']
        self.current_challenge_idx = 0
        self.challenge_completed = {
            'blink': False,
            'turn_left': False,
            'turn_right': False,
            'frontal_face': False
        }

        self.yaw_history = []
        self.pitch_history = []
        self.hold_counter = 0
        self.challenge_detected = False

        # Blink tracking
        self.blink_counter = 0
        self.eye_closed_frames = 0
        self.was_eye_open = True
        self.ear_history = []

        # Frontal face capture
        self.frontal_counter = 0
        self.captured_frontal_image = None
        self.liveness_embedding = None  # Embedding from frontal capture

    def get_current_challenge(self):
        if self.current_challenge_idx >= len(self.challenges):
            return None
        return self.challenges[self.current_challenge_idx]

    def reset(self):
        self.__init__()

    def verify_frame(self, face_landmarks, img_width, img_height, img):
        challenge = self.get_current_challenge()

        if challenge is None:
            return {
                'status': 'completed',
                'message': 'üéâ All challenges completed! Ready for ID verification.',
                'current_challenge': None,
                'progress': 100,
                'yaw': 0,
                'pitch': 0,
                'ear': 0,
                'has_frontal_image': self.captured_frontal_image is not None
            }

        # Calculate head pose
        yaw, pitch, roll = calculate_head_pose_solvepnp(face_landmarks, img_width, img_height)
        self.yaw_history.append(yaw)
        self.pitch_history.append(pitch)

        if len(self.yaw_history) > 30:
            self.yaw_history.pop(0)
        if len(self.pitch_history) > 30:
            self.pitch_history.pop(0)

        # Detect blink
        is_eye_closed, ear = detect_blink(face_landmarks, img_width, img_height)
        self.ear_history.append(ear)
        if len(self.ear_history) > 30:
            self.ear_history.pop(0)

        # Handle BLINK challenge
        if challenge == 'blink':
            if is_eye_closed:
                self.eye_closed_frames += 1
            else:
                if self.eye_closed_frames >= self.BLINK_CONSECUTIVE_FRAMES and not self.was_eye_open:
                    self.blink_counter += 1
                    if self.blink_counter >= self.BLINK_REQUIRED:
                        self.challenge_completed['blink'] = True
                        self.current_challenge_idx += 1
                        next_challenge = self.get_current_challenge()

                        return {
                            'status': 'challenge_completed',
                            'message': f"‚úÖ BLINK COMPLETE! Now: {next_challenge.replace('_', ' ').upper() if next_challenge else 'DONE'}",
                            'current_challenge': next_challenge,
                            'progress': self.get_progress(),
                            'yaw': yaw,
                            'pitch': pitch,
                            'ear': ear,
                            'completed_challenge': 'blink'
                        }

                self.eye_closed_frames = 0

            self.was_eye_open = not is_eye_closed

            remaining = self.BLINK_REQUIRED - self.blink_counter
            message = f"üëÅÔ∏è Blink {remaining} more time(s)"
            if is_eye_closed:
                message = f"üëÅÔ∏è Eyes closed... ({self.eye_closed_frames})"

            return {
                'status': 'in_progress',
                'message': message,
                'current_challenge': challenge,
                'progress': self.get_progress(),
                'yaw': yaw,
                'pitch': pitch,
                'ear': ear,
                'blinks': self.blink_counter
            }

        # Handle FRONTAL FACE challenge
        if challenge == 'frontal_face':
            if len(self.yaw_history) >= self.FRONTAL_STABLE_FRAMES and len(self.pitch_history) >= self.FRONTAL_STABLE_FRAMES:
                recent_yaw = self.yaw_history[-self.FRONTAL_STABLE_FRAMES:]
                recent_pitch = self.pitch_history[-self.FRONTAL_STABLE_FRAMES:]
                avg_yaw = np.mean(recent_yaw)
                avg_pitch = np.mean(recent_pitch)

                # Check if face is frontal (near 0 degrees for both yaw and pitch)
                is_frontal = abs(avg_yaw) < self.PITCH_THRESHOLD and abs(avg_pitch) < self.PITCH_THRESHOLD

                if is_frontal:
                    self.frontal_counter += 1
                    remaining = self.FRONTAL_STABLE_FRAMES - self.frontal_counter

                    if self.frontal_counter >= self.FRONTAL_STABLE_FRAMES:
                        # Capture the frontal image
                        self.captured_frontal_image = img.copy()

                        # Extract face embedding for later verification
                        embedding, error = extract_face_embedding(img)
                        if embedding is not None:
                            self.liveness_embedding = embedding
                            self.challenge_completed['frontal_face'] = True
                            self.current_challenge_idx += 1

                            return {
                                'status': 'challenge_completed',
                                'message': '‚úÖ FRONTAL FACE CAPTURED! Liveness verification complete.',
                                'current_challenge': None,
                                'progress': self.get_progress(),
                                'yaw': avg_yaw,
                                'pitch': avg_pitch,
                                'ear': ear,
                                'completed_challenge': 'frontal_face',
                                'has_frontal_image': True
                            }
                        else:
                            self.frontal_counter = 0
                            return {
                                'status': 'in_progress',
                                'message': f'‚ö†Ô∏è Face extraction failed. Please try again.',
                                'current_challenge': challenge,
                                'progress': self.get_progress(),
                                'yaw': avg_yaw,
                                'pitch': avg_pitch,
                                'ear': ear
                            }

                    message = f"üì∏ Hold steady... ({remaining} remaining)"
                else:
                    self.frontal_counter = 0
                    message = f"üì∑ Look straight at camera (Yaw: {avg_yaw:.1f}¬∞, Pitch: {avg_pitch:.1f}¬∞)"
            else:
                message = f"‚è≥ Stabilizing... ({len(self.yaw_history)}/{self.FRONTAL_STABLE_FRAMES})"
                avg_yaw = np.mean(self.yaw_history) if self.yaw_history else 0
                avg_pitch = np.mean(self.pitch_history) if self.pitch_history else 0

            return {
                'status': 'in_progress',
                'message': message,
                'current_challenge': challenge,
                'progress': self.get_progress(),
                'yaw': avg_yaw if len(self.yaw_history) > 0 else yaw,
                'pitch': avg_pitch if len(self.pitch_history) > 0 else pitch,
                'ear': ear
            }

        # Handle HEAD TURN challenges
        if len(self.yaw_history) >= self.STABLE_FRAMES:
            recent_yaw = self.yaw_history[-self.STABLE_FRAMES:]
            avg_yaw = np.mean(recent_yaw)

            challenge_met = False
            message = ""

            if challenge == 'turn_left':
                if avg_yaw < -self.YAW_THRESHOLD:
                    challenge_met = True
                    if not self.challenge_detected:
                        self.challenge_detected = True
                        self.hold_counter = 0
                        message = f"‚úÖ LEFT detected! Hold... ({self.HOLD_FRAMES})"
                    else:
                        self.hold_counter += 1
                        remaining = self.HOLD_FRAMES - self.hold_counter
                        message = f"‚úÖ Hold LEFT... ({remaining} remaining)"
                elif avg_yaw > self.YAW_THRESHOLD:
                    self.challenge_detected = False
                    self.hold_counter = 0
                    message = f"‚ùå Wrong way! Turn LEFT (Yaw: {avg_yaw:.1f}¬∞)"
                else:
                    self.challenge_detected = False
                    self.hold_counter = 0
                    message = f"üëà Turn more LEFT (Yaw: {avg_yaw:.1f}¬∞)"

            elif challenge == 'turn_right':
                if avg_yaw > self.YAW_THRESHOLD:
                    challenge_met = True
                    if not self.challenge_detected:
                        self.challenge_detected = True
                        self.hold_counter = 0
                        message = f"‚úÖ RIGHT detected! Hold... ({self.HOLD_FRAMES})"
                    else:
                        self.hold_counter += 1
                        remaining = self.HOLD_FRAMES - self.hold_counter
                        message = f"‚úÖ Hold RIGHT... ({remaining} remaining)"
                elif avg_yaw < -self.YAW_THRESHOLD:
                    self.challenge_detected = False
                    self.hold_counter = 0
                    message = f"‚ùå Wrong way! Turn RIGHT (Yaw: {avg_yaw:.1f}¬∞)"
                else:
                    self.challenge_detected = False
                    self.hold_counter = 0
                    message = f"üëâ Turn more RIGHT (Yaw: {avg_yaw:.1f}¬∞)"

            if challenge_met and self.hold_counter >= self.HOLD_FRAMES:
                self.challenge_completed[challenge] = True
                self.current_challenge_idx += 1
                self.yaw_history.clear()
                self.pitch_history.clear()
                self.challenge_detected = False
                self.hold_counter = 0

                next_challenge = self.get_current_challenge()
                if next_challenge:
                    message = f"‚úÖ {challenge.replace('_', ' ').upper()} COMPLETE! Now: {next_challenge.replace('_', ' ').upper()}"
                else:
                    message = "üéâ ALL CHALLENGES COMPLETE!"

                return {
                    'status': 'challenge_completed',
                    'message': message,
                    'current_challenge': next_challenge,
                    'progress': self.get_progress(),
                    'yaw': avg_yaw,
                    'pitch': pitch,
                    'ear': ear,
                    'completed_challenge': challenge
                }
        else:
            message = f"‚è≥ Detecting... ({len(self.yaw_history)}/{self.STABLE_FRAMES})"
            avg_yaw = np.mean(self.yaw_history) if self.yaw_history else 0

        return {
            'status': 'in_progress',
            'message': message,
            'current_challenge': challenge,
            'progress': self.get_progress(),
            'yaw': avg_yaw,
            'pitch': pitch,
            'ear': ear
        }

    def get_progress(self):
        completed = sum(self.challenge_completed.values())
        return int((completed / len(self.challenges)) * 100)

# Global detector instance
detector = ActiveLivenessDetector()

# Define request body models
class VerifyFrameRequest(BaseModel):
    image: str

class VerifyIDRequest(BaseModel):
    id_image: str
# FastAPI endpoint
@app.post("/api/ocr")
async def ocr_endpoint(image: UploadFile = File(...)):
    temp_path = f"/tmp/{image.filename}"
    with open(temp_path, "wb") as f:
        content = await image.read()
        f.write(content)

    result= {
    "first_name_latin": "TARlK",
    "last_name_latin": "MASNAOUI",
    "date_of_birth": "23.02.2002",
    "id_number": "QA193251",
    "expiry_date": "16.05.2029",
    "place_of_birth_latin": "OUED ZEM KHOURIBGA"
}

    try:
        return result
    finally:
        # Clean up the temporary file
        if os.path.exists(temp_path):
            os.remove(temp_path)
# API Endpoints
@app.post("/api/reset")
def reset_detector():
    global detector
    detector = ActiveLivenessDetector()
    return {
        "status": "success",
        "message": "Detector reset"
    }

@app.post("/api/verify-frame")
async def verify_frame(request_data: VerifyFrameRequest):
    try:
        image_data = request_data.image

        # Decode base64 image
        image_data = image_data.split(",")[1]
        image_bytes = base64.b64decode(image_data)
        image = Image.open(BytesIO(image_bytes))
        img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)

        h, w = img.shape[:2]
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Process with MediaPipe
        results = face_mesh.process(img_rgb)

        if not results.multi_face_landmarks:
            return {
                "status": "no_face",
                "message": "‚ùå No face detected",
                "current_challenge": detector.get_current_challenge(),
                "progress": detector.get_progress(),
                "yaw": 0,
                "pitch": 0,
                "ear": 0
            }

        face_landmarks = results.multi_face_landmarks[0]
        result = detector.verify_frame(face_landmarks, w, h, img)
        return result

    except Exception as e:
        raise HTTPException(status_code=500, detail={
            "status": "error",
            "message": f"Error: {str(e)}",
            "current_challenge": detector.get_current_challenge(),
            "progress": 0,
            "yaw": 0,
            "pitch": 0,
            "ear": 0
        })

@app.post("/api/verify-id")
async def verify_id(request_data: VerifyIDRequest):
    """Compare ID card photo with captured frontal face"""
    try:
        if detector.liveness_embedding is None:
            return {
                "status": "error",
                "message": "‚ùå No liveness verification completed. Please complete liveness check first.",
                "match": False
            }

        # Decode ID image
        id_image_data = request_data.id_image.split(",")[1]
        id_image_bytes = base64.b64decode(id_image_data)
        id_image = Image.open(BytesIO(id_image_bytes))
        id_img = cv2.cvtColor(np.array(id_image), cv2.COLOR_RGB2BGR)

        # Extract embedding from ID photo
        id_embedding, error = extract_face_embedding(id_img)

        if id_embedding is None:
            return {
                "status": "error",
                "message": f"‚ùå Could not detect face in ID card: {error}",
                "match": False
            }

        # Compare embeddings
        is_match, confidence, distance = compare_faces(detector.liveness_embedding, id_embedding)

        return {
      "status": "success",
      "match": bool(is_match),          # ‚úÖ convert numpy.bool_ ‚Üí bool
      "confidence": float(confidence),  # ‚úÖ convert numpy.float_ ‚Üí float
      "distance": float(distance),
      "message": f"{'‚úÖ Match!' if bool(is_match) else '‚ùå No match'} (Confidence: {float(confidence):.1f}%)"
  }


    except Exception as e:
        raise HTTPException(status_code=500, detail={
            "status": "error",
            "message": f"Error during ID verification: {str(e)}",
            "match": False
        })

@app.get("/api/status")
def get_status():
    return {
        "current_challenge": detector.get_current_challenge(),
        "progress": detector.get_progress(),
        "challenges": detector.challenge_completed,
        "has_frontal_image": detector.captured_frontal_image is not None,
        "ready_for_id_verification": detector.liveness_embedding is not None
    }

@app.get("/api/frontal-image")
def get_frontal_image():
    """Get the captured frontal face image"""
    if detector.captured_frontal_image is None:
        raise HTTPException(status_code=404, detail="No frontal image captured yet")

    # Convert to base64
    _, buffer = cv2.imencode('.jpg', detector.captured_frontal_image)
    img_base64 = base64.b64encode(buffer).decode('utf-8')

    return {
        "status": "success",
        "image": f"data:image/jpeg;base64,{img_base64}"
    }

@app.get("/")
def root():
    return {
        "message": "Active Liveness Detection API with Face Verification",
        "endpoints": {
            "POST /api/verify-frame": "Verify a frame with face detection",
            "POST /api/verify-id": "Compare ID photo with captured frontal face",
            "POST /api/reset": "Reset the detector",
            "GET /api/status": "Get current status",
            "GET /api/frontal-image": "Get captured frontal face image"
        },
        "challenges": ["blink", "turn_left", "turn_right", "frontal_face"],
        "face_verification": "InsightFace Buffalo_L model"
    }

# ngrok and Server Setup
try:
    ngrok.kill()
except:
    pass

# Set your ngrok auth token
# ngrok.set_auth_token("YOUR_NGROK_TOKEN_HERE")

port = 8000
public_url = ngrok.connect(port, bind_tls=True)
print(f"\n{'='*60}")
print(f"üöÄ FastAPI server is running!")
print(f"{'='*60}")
print(f"üì° Public URL: {public_url}")
print(f"{'='*60}\n")
print(f"Use this URL in your frontend to connect to the API")
print(f"Example endpoints:")
print(f"  - POST {public_url}/api/verify-frame")
print(f"  - POST {public_url}/api/verify-id")
print(f"  - POST {public_url}/api/reset")
print(f"  - GET  {public_url}/api/status")
print(f"  - GET  {public_url}/api/frontal-image")
print(f"  - GET  {public_url}/")
print(f"\n{'='*60}\n")

def run_server():
    uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")

thread = Thread(target=run_server, daemon=True)
thread.start()

print("‚úÖ Server started successfully! Keep this cell running.")
print("‚ö†Ô∏è  Stop the cell to shut down the server.")

try:
    thread.join()
except KeyboardInterrupt:
    print("\nüõë Server stopped.")
    ngrok.kill()

ModuleNotFoundError: No module named 'mediapipe'

In [None]:
!kill -9 $(lsof -t -i:8000)


kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... or kill -l [sigspec]


In [1]:
!pip install -q insightface onnxruntime deepface

[?25l     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/439.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m307.2/439.5 kB[0m [31m9.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m439.5/439.5 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m17.4/17.4 MB[0m [31m69.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚

In [2]:
!pip install fastapi uvicorn nest-asyncio pyngrok pillow transformers

!ngrok authtoken 33jcr7UNwSYu9mWfljJaV82jLrp_xSotDnd4AJuQupiqMoRY


Collecting pyngrok
  Downloading pyngrok-7.5.0-py3-none-any.whl.metadata (8.1 kB)
Downloading pyngrok-7.5.0-py3-none-any.whl (24 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.5.0
Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
!kill -9 $(lsof -t -i:8000)


kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... or kill -l [sigspec]


In [None]:
import torch
from PIL import Image, ImageEnhance
from transformers import AutoProcessor, AutoTokenizer, AutoModelForImageTextToText
from doctr.models import ocr_predictor
from doctr.io import DocumentFile
import numpy as np
import json
from datetime import datetime
import re
import os
from dateutil import parser


os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = '1'

nanonets_model_path = "nanonets/Nanonets-OCR-s"

nanonets_model = AutoModelForImageTextToText.from_pretrained(
    nanonets_model_path,
    torch_dtype="auto",
    device_map="auto"
)
nanonets_model.eval()

nanonets_tokenizer = AutoTokenizer.from_pretrained(nanonets_model_path)
nanonets_processor = AutoProcessor.from_pretrained(nanonets_model_path)

doctr_model = ocr_predictor(
    det_arch='db_resnet50',
    reco_arch='crnn_vgg16_bn',
    pretrained=True
)

def preprocess_image(image_path, save_path="preprocessed_id_card.jpg"):

    original = Image.open(image_path).convert("RGB")
    enhanced = original.copy()

    enhancer = ImageEnhance.Sharpness(enhanced)
    enhanced = enhancer.enhance(2.0)

    enhancer = ImageEnhance.Contrast(enhanced)
    enhanced = enhancer.enhance(1.5)

    enhancer = ImageEnhance.Brightness(enhanced)
    enhanced = enhancer.enhance(1.1)

    enhanced.save(save_path)

    return save_path


def extract_with_nanonets(image_path, max_new_tokens=512):

    prompt = """
    You are an OCR engine specialized in ID cards. Extract the following fields from the uploaded ID card image and return them in a structured JSON format:

    {
        "first_name": "",
        "last_name": "",
        "date_of_birth": "",
        "id_number": "",
        "expiry_date": "",
        "place_of_birth": ""
    }
    place_of_birth is place_of_birth_√†
    """
    image = Image.open(image_path).convert("RGB")
    image = image.resize((512, 512))

    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": [
            {"type": "image", "image": f"file://{image_path}"},
            {"type": "text", "text": prompt},
        ]},
    ]

    text = nanonets_processor.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )

    inputs = nanonets_processor(
        text=[text],
        images=[image],
        padding=True,
        return_tensors="pt"
    )
    inputs = inputs.to(nanonets_model.device)

    output_ids = nanonets_model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=False
    )

    generated_ids = [
        output_ids[len(input_ids):]
        for input_ids, output_ids in zip(inputs.input_ids, output_ids)
    ]

    output_text = nanonets_processor.batch_decode(
        generated_ids,
        skip_special_tokens=True,
        clean_up_tokenization_spaces=True
    )[0]

    try:
        json_start = output_text.find('{')
        json_end = output_text.rfind('}') + 1
        if json_start != -1 and json_end > json_start:
            json_str = output_text[json_start:json_end]
            extracted_data = json.loads(json_str)
        else:
            extracted_data = {}
    except:
        extracted_data = {}

    return extracted_data, output_text

class DocTRVerifier:

    def __init__(self, model):
        self.model = model
        self.verification_threshold = 0.6

    def extract_text_from_image(self, image_path):

        doc = DocumentFile.from_images(image_path)
        result = self.model(doc)

        full_text = ""
        for page in result.pages:
            for block in page.blocks:
                for line in block.lines:
                    for word in line.words:
                        full_text += word.value + " "
                    full_text += "\n"

        return full_text.strip(), result

    def normalize_date(self, text):
        try:
            date = parser.parse(text, fuzzy=True, dayfirst=True)
            return date.strftime("%d.%m.%Y")
        except Exception:
            return None

    def verify_extraction(self, id_card_image_path, nanonets_json):

        ocr_text, ocr_result = self.extract_text_from_image(id_card_image_path)

        verified_data = {}
        verification_report = {
            'timestamp': datetime.now().isoformat(),
            'image_path': id_card_image_path,
            'overall_valid': True,
            'total_fields': 0,
            'valid_fields': 0,
            'invalid_fields': 0,
            'fields': {}
        }

        ocr_lower = ocr_text.lower()

        for field_name, field_value in nanonets_json.items():

            if not field_value or field_value.strip() == "":
                verified_data[field_name] = "EMPTY"
                continue

            verification_report['total_fields'] += 1

            value_normalized = str(field_value).lower().strip()
            value_clean = re.sub(r'[^\w\s]', '', value_normalized)

            exact_match = value_normalized in ocr_lower
            clean_match = value_clean in re.sub(r'[^\w\s]', '', ocr_lower)

            value_words = value_clean.split()
            word_matches = sum(1 for word in value_words if len(word) > 2 and word in ocr_lower)
            word_match_ratio = word_matches / len(value_words) if value_words else 0

            if exact_match:
                confidence = 1.0
                match_type = 'exact'
            elif clean_match:
                confidence = 0.95
                match_type = 'clean'
            elif word_match_ratio >= 0.8:
                confidence = 0.85
                match_type = 'partial'
            elif word_match_ratio >= 0.6:
                confidence = 0.7
                match_type = 'weak'
            else:
                confidence = word_match_ratio * 0.5
                match_type = 'failed'

            is_valid = confidence >= self.verification_threshold

            if not is_valid and field_name in ["date_of_birth", "expiry_date"]:
                normalized_date = self.normalize_date(field_value)
                if normalized_date:
                    if normalized_date in ocr_text:
                        field_value = normalized_date
                        is_valid = True
                        match_type = "date_normalized"
                        confidence = 0.95

            if is_valid:
                verified_data[field_name] = field_value
                verification_report['valid_fields'] += 1
            else:
                verified_data[field_name] = "VERIFICATION_FAILED"
                verification_report['invalid_fields'] += 1
                verification_report['overall_valid'] = False

            verification_report['fields'][field_name] = {
                'value': field_value,
                'verified_value': verified_data[field_name],
                'valid': is_valid,
                'confidence': round(confidence, 3),
                'match_type': match_type
            }

        return verified_data, verification_report

verifier = DocTRVerifier(doctr_model)

def process_id_card(image_path, use_preprocessing=True):

    if use_preprocessing:
        preprocessed_path = preprocess_image(image_path)
        processing_image = preprocessed_path
    else:
        processing_image = image_path

    extracted_data, raw_output = extract_with_nanonets(processing_image)

    verified_data, report = verifier.verify_extraction(processing_image, extracted_data)

    return verified_data, report

In [3]:
import nest_asyncio
from fastapi import FastAPI, UploadFile, File
from pyngrok import ngrok
import torch
from PIL import Image
import uvicorn
import asyncio
import os
from fastapi.middleware.cors import CORSMiddleware

nest_asyncio.apply()

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
!mkdir -p /content/models

!cp /content/drive/MyDrive/antelopev2.zip /content/models/


In [5]:
!unzip -q /content/models/antelopev2.zip -d /content/models

In [None]:
import cv2
import numpy as np
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import base64
from io import BytesIO
from PIL import Image
import nest_asyncio
from pyngrok import ngrok
import uvicorn
from threading import Thread
from insightface.app import FaceAnalysis
from deepface import DeepFace
import os
import uuid
from typing import List

nest_asyncio.apply()

face_app = FaceAnalysis(name='/content/models/antelopev2', providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
face_app.prepare(ctx_id=0, det_size=(640, 640))

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

def extract_face_embedding(img):
    try:
        faces = face_app.get(img)
        if len(faces) == 0:
            return None, "No face detected"

        return faces[0].embedding, None
    except Exception as e:
        return None, f"Error extracting embedding: {str(e)}"

def compare_faces(embedding1, embedding2, threshold=0.4):

    similarity = np.dot(embedding1, embedding2) / (np.linalg.norm(embedding1) * np.linalg.norm(embedding2))

    distance = 1 - similarity
    is_match = similarity > threshold
    confidence = (1 - distance) * 100

    return is_match, confidence, distance

def check_anti_spoofing(img):
    try:

        temp_path = "/tmp/temp_face_check.jpg"
        cv2.imwrite(temp_path, img)

        faces = DeepFace.extract_faces(img_path=temp_path, align=True, detector_backend='mtcnn', anti_spoofing=True)
        if os.path.exists(temp_path):
            os.remove(temp_path)

        if len(faces) == 0:
            return False, 0.0, "No face detected"

        is_real = faces[0].get('is_real', False)
        antispoof_score = faces[0].get('antispoof_score', 0.0)

        return is_real, antispoof_score, None

    except Exception as e:
        return False, 0.0, f"Anti-spoofing error: {str(e)}"


class VerifyIDRequest(BaseModel):
    id_image: str
    frontal_image: str

class AntiSpoofRequest(BaseModel):
    image: str

class TemporalLivenessRequest(BaseModel):
    liveness_frames: List[str]

@app.post("/api/verify-id")
async def verify_id(request_data: VerifyIDRequest):
    try:
        id_image_data = request_data.id_image.split(",")[1] if "," in request_data.id_image else request_data.id_image
        id_image_bytes = base64.b64decode(id_image_data)
        id_image = Image.open(BytesIO(id_image_bytes))
        id_img = cv2.cvtColor(np.array(id_image), cv2.COLOR_RGB2BGR)

        frontal_image_data = request_data.frontal_image.split(",")[1] if "," in request_data.frontal_image else request_data.frontal_image
        frontal_image_bytes = base64.b64decode(frontal_image_data)
        frontal_image = Image.open(BytesIO(frontal_image_bytes))
        frontal_img = cv2.cvtColor(np.array(frontal_image), cv2.COLOR_RGB2BGR)

        id_embedding, id_error = extract_face_embedding(id_img)

        if id_embedding is None:
            return {
                "status": "error",
                "message": f"Could not detect face in ID card: {id_error}",
                "match": False,
                "confidence": 0,
            }

        frontal_embedding, frontal_error = extract_face_embedding(frontal_img)

        if frontal_embedding is None:
            return {
                "status": "error",
                "message": f"Could not detect face in frontal image: {frontal_error}",
                "match": False,
                "confidence": 0,
            }

        is_match, confidence, distance = compare_faces(frontal_embedding, id_embedding)

        combined_image = None
        try:
            id_faces = face_app.get(id_img)
            frontal_faces = face_app.get(frontal_img)

            if len(id_faces) > 0 and len(frontal_faces) > 0:

                id_bbox = id_faces[0].bbox.astype(int)
                crop1 = id_img[id_bbox[1]:id_bbox[3], id_bbox[0]:id_bbox[2]]

                frontal_bbox = frontal_faces[0].bbox.astype(int)
                crop2 = frontal_img[frontal_bbox[1]:frontal_bbox[3],
                                    frontal_bbox[0]:frontal_bbox[2]]

                combined = np.hstack([
                    cv2.resize(crop1, (150, 150)),
                    cv2.resize(crop2, (150, 150))
                ])

                _, buffer = cv2.imencode('.jpg', combined)
                combined_image = f"data:image/jpeg;base64,{base64.b64encode(buffer).decode('utf-8')}"

        except Exception as crop_error:
            print(f"Error creating comparison image: {crop_error}")

        return {
            "status": "success",
            "match": bool(is_match),
            "confidence": float(confidence),
            "message": f"{'Match!' if bool(is_match) else 'No match'} (Confidence: {float(confidence):.1f}%)",
            "comparison_image": combined_image
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail={
            "status": "error",
            "message": f"Error during ID verification: {str(e)}",
            "match": False,
            "confidence": 0,
        })

@app.post("/api/check-anti-spoofing")
async def check_anti_spoofing_endpoint(request_data: AntiSpoofRequest):

    try:
        image_data = request_data.image.split(",")[1] if "," in request_data.image else request_data.image
        image_bytes = base64.b64decode(image_data)
        image = Image.open(BytesIO(image_bytes))
        img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)

        is_real, antispoof_score, error = check_anti_spoofing(img)

        if error:
            return {
                "status": "error",
                "message": error,
                "is_real": False,
                "antispoof_score": 0.0
            }

        return {
            "status": "success",
            "is_real": is_real,
            "antispoof_score": float(antispoof_score),
            "message": f"{'Real face detected' if is_real else 'Possible spoof detected'} (Score: {antispoof_score:.3f})"
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail={
            "status": "error",
            "message": f"Error during anti-spoofing check: {str(e)}",
            "is_real": False
        })

@app.post("/api/verify-temporal-liveness")
async def verify_liveness(request_data: TemporalLivenessRequest):
    """
    Verify liveness by analyzing temporal consistency across video frames.
    Expects base64 encoded frames captured during liveness session.
    """
    try:
        if len(request_data.liveness_frames) < 3:
            return {
                "status": "error",
                "message": "At least 3 frames required for liveness verification",
                "is_live": False
            }

        embeddings = []

        for idx, frame_b64 in enumerate(request_data.liveness_frames):

            frame_data = frame_b64.split(",")[1] if "," in frame_b64 else frame_b64
            frame_bytes = base64.b64decode(frame_data)
            frame_image = Image.open(BytesIO(frame_bytes))
            img = cv2.cvtColor(np.array(frame_image), cv2.COLOR_RGB2BGR)

            embedding, error = extract_face_embedding(img)
            if embedding is None:
                return {
                    "status": "error",
                    "message": f"No face detected in frame {idx}: {error}",
                    "is_live": False
                }

            embeddings.append(embedding)

        similarities = []
        for i in range(len(embeddings) - 1):
            emb1 = embeddings[i]
            emb2 = embeddings[i + 1]
            similarity = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
            similarities.append({
                "frame_pair": f"{i}-{i+1}",
                "similarity": float(similarity)
            })
        print(similarities)
        avg_similarity = np.mean([s["similarity"] for s in similarities])
        std_similarity = np.std([s["similarity"] for s in similarities])


        is_live = bool(avg_similarity > 0.6 and std_similarity < 0.15)

        return {
            "status": "success",
            "is_live": is_live,
            "average_similarity": float(avg_similarity),
            "message": f"{'‚úì Live person detected' if is_live else '‚úó Possible spoof detected'} (Avg: {avg_similarity:.3f}, Std: {std_similarity:.3f})"
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail={
            "status": "error",
            "message": f"Error during liveness verification: {str(e)}",
            "is_live": False
        })
@app.post("/api/ocr")
async def ocr_endpoint(image: UploadFile = File(...)):
    unique_id = uuid.uuid4().hex
    temp_path = f"/tmp/{unique_id}_{image.filename}"

    with open(temp_path, "wb") as f:
        content = await image.read()
        f.write(content)

    result = {
        "firstName": "TARIK",
        "lastName": "MASNAOUI",
        "dateOfBirth": "2002-02-23",
        "documentNumber": "QA193251",
        "expiryDate": "2029-05-16",
        "placeOfBirth": "OUED ZEM KHOURIBGA"
    }

    try:
        return result
    finally:
        if os.path.exists(temp_path):
            os.remove(temp_path)

try:
    ngrok.kill()
except:
    pass

port = 8000
public_url = ngrok.connect(port, bind_tls=True)

def run_server():
    uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")

thread = Thread(target=run_server, daemon=True)
thread.start()

try:
    thread.join()
except KeyboardInterrupt:
    ngrok.kill()

25-11-25 13:07:07 - Directory /root/.deepface has been created
25-11-25 13:07:07 - Directory /root/.deepface/weights has been created
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /content/models/antelopev2/1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /content/models/antelopev2/2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /content/models/antelopev2/genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /content/models/antelopev2/glintr100.onnx recognition ['None', 3, 112, 112] 127.5 127.5
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /content/models/antelopev2/

INFO:     Started server process [274]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     197.230.156.150:0 - "POST /api/ocr HTTP/1.1" 200 OK
INFO:     197.230.156.150:0 - "OPTIONS /api/check-anti-spoofing HTTP/1.1" 200 OK
25-11-25 13:08:38 - üîó 2.7_80x80_MiniFASNetV2.pth will be downloaded from https://github.com/minivision-ai/Silent-Face-Anti-Spoofing/raw/master/resources/anti_spoof_models/2.7_80x80_MiniFASNetV2.pth to /root/.deepface/weights/2.7_80x80_MiniFASNetV2.pth...


Downloading...
From: https://github.com/minivision-ai/Silent-Face-Anti-Spoofing/raw/master/resources/anti_spoof_models/2.7_80x80_MiniFASNetV2.pth
To: /root/.deepface/weights/2.7_80x80_MiniFASNetV2.pth
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1.85M/1.85M [00:00<00:00, 31.1MB/s]


25-11-25 13:08:39 - üîó 4_0_0_80x80_MiniFASNetV1SE.pth will be downloaded from https://github.com/minivision-ai/Silent-Face-Anti-Spoofing/raw/master/resources/anti_spoof_models/4_0_0_80x80_MiniFASNetV1SE.pth to /root/.deepface/weights/4_0_0_80x80_MiniFASNetV1SE.pth...


Downloading...
From: https://github.com/minivision-ai/Silent-Face-Anti-Spoofing/raw/master/resources/anti_spoof_models/4_0_0_80x80_MiniFASNetV1SE.pth
To: /root/.deepface/weights/4_0_0_80x80_MiniFASNetV1SE.pth
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1.86M/1.86M [00:00<00:00, 31.3MB/s]


INFO:     197.230.156.150:0 - "POST /api/check-anti-spoofing HTTP/1.1" 200 OK
INFO:     197.230.156.150:0 - "OPTIONS /api/verify-temporal-liveness HTTP/1.1" 200 OK
[{'frame_pair': '0-1', 'similarity': 0.8938692808151245}, {'frame_pair': '1-2', 'similarity': 0.980778694152832}, {'frame_pair': '2-3', 'similarity': 0.8848994374275208}, {'frame_pair': '3-4', 'similarity': 0.9744458198547363}, {'frame_pair': '4-5', 'similarity': 0.9201822280883789}, {'frame_pair': '5-6', 'similarity': 0.9905585050582886}, {'frame_pair': '6-7', 'similarity': 0.9865413904190063}]
INFO:     197.230.156.150:0 - "POST /api/verify-temporal-liveness HTTP/1.1" 200 OK
INFO:     197.230.156.150:0 - "OPTIONS /api/verify-id HTTP/1.1" 200 OK
INFO:     197.230.156.150:0 - "POST /api/verify-id HTTP/1.1" 200 OK
INFO:     197.230.156.150:0 - "POST /api/ocr HTTP/1.1" 200 OK
INFO:     197.230.156.150:0 - "POST /api/check-anti-spoofing HTTP/1.1" 200 OK
INFO:     197.230.156.150:0 - "POST /api/check-anti-spoofing HTTP/1.1" 200 

In [None]:
!git clone https://github.com/minivision-ai/Silent-Face-Anti-Spoofing.git
%cd Silent-Face-Anti-Spoofing
!pip install -r requirements.txt

Cloning into 'Silent-Face-Anti-Spoofing'...
remote: Enumerating objects: 376, done.[K
remote: Counting objects: 100% (116/116), done.[K
remote: Compressing objects: 100% (31/31), done.[K
remote: Total 376 (delta 92), reused 85 (delta 85), pack-reused 260 (from 2)[K
Receiving objects: 100% (376/376), 26.04 MiB | 46.05 MiB/s, done.
Resolving deltas: 100% (166/166), done.
/content/Silent-Face-Anti-Spoofing
Collecting easydict==1.9 (from -r requirements.txt (line 1))
  Downloading easydict-1.9.tar.gz (6.4 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting numpy==1.17.0 (from -r requirements.txt (line 2))
  Downloading numpy-1.17.0.zip (6.5 MB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m6.5/6.5 MB[0m [31m79.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting tqdm==4.31.1 (from -r requirements.txt (line 3))
  Downloading

In [None]:
import cv2
import matplotlib.pyplot as plt
from deepface import DeepFace

# Extract faces
temp_path = "sp.jpg"
faces = DeepFace.extract_faces(
    img_path=temp_path,
    detector_backend='retinaface',
    anti_spoofing=True
)

face = faces[0]
# Get the face array (already in RGB format)
face_img = face['face']

# Get anti-spoofing info
is_real = face.get('is_real', 'N/A')
antispoof_score = face.get('antispoof_score', 'N/A')

# Display using matplotlib
plt.figure(figsize=(6, 6))
plt.imshow(face_img)
plt.title(f"Real: {is_real} | Score: {antispoof_score:.3f}")
plt.axis('off')
plt.show()

# Print details
print(f"  Is Real: {is_real}")
print(f"  Anti-spoof Score: {antispoof_score}")
print(f"  Shape: {face_img.shape}")
print(f"  Confidence: {faces.get('confidence', 'N/A')}")

ValueError: Confirm that sp.jpg exists

In [None]:
from google.colab import files
import cv2
import os

uploaded = files.upload()
img_path = list(uploaded.keys())[0]

# Read image
img = cv2.imread(img_path)

# Resize to proper 4:3 ratio (width:height = 3:4)
target_height = 640
target_width = int(target_height * 3 / 4)  # 3:4 ratio (width:height)

# Resize image
img_resized = cv2.resize(img, (target_width, target_height))

# Save to correct directory
os.makedirs('./images/sample', exist_ok=True)
cv2.imwrite(f'./images/sample/{img_path}', img_resized)
# is_real = check_anti_spoofing(img)
# print(is_real)
import sys
sys.path.append(os.path.join(os.getcwd(), 'src'))
!python test.py --image_name "{img_path}"
if os.path.exists(img_path):
            os.remove(img_path)

Saving sp.jpg to sp.jpg
Image 'sp.jpg' is Fake Face. Score: 0.58.
Prediction cost 1.14 s


In [None]:
%cd Silent-Face-Anti-Spoofing/

/content/Silent-Face-Anti-Spoofing
