In [None]:
# Cell 1: Import Dependencies & Setup
import glob
import hashlib
import json
import logging
import math
import os
import random
import secrets
import time

import cv2
import numpy as np
import psutil
import requests
from skimage.morphology import skeletonize
from sklearn.decomposition import PCA

# Configure Logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

print("--- Dependencies Imported & Logging Configured ---")

In [None]:
# Cell 2: Define Helper Functions

WALLET_FILE = "wallet.json"

def log_system_resources():
    """Logs current CPU and RAM usage."""
    cpu = psutil.cpu_percent(interval=None)
    ram = psutil.virtual_memory().percent
    logging.info(f"Resource Usage -> CPU: {cpu}% | RAM: {ram}%")
    return cpu, ram

def generate_secure_salt():
    """
    Generates a cryptographically secure 250-bit random integer.
    Returns: str (to ensure JSON compatibility).
    """
    return str(secrets.randbits(250))

def generate_strong_password():
    """Generates a secure random password for 2FA."""
    return secrets.token_urlsafe(16)

def save_wallet(wallet_data):
    """Persists the user wallet (Secrets + Salts + Passwords) to a local JSON file."""
    try:
        with open(WALLET_FILE, 'w') as f:
            json.dump(wallet_data, f, indent=4)
        logging.info(f"--- [Storage] Wallet saved to {WALLET_FILE} ---")
    except Exception as e:
        logging.error(f"--- [Storage] Failed to save wallet: {e} ---")

def load_wallet():
    """
    Loads the user wallet from disk.
    Returns: dict (empty if file not found or corrupt).
    """
    if not os.path.exists(WALLET_FILE):
        logging.warning("--- [Storage] No wallet file found. Starting fresh. ---")
        return {}
    try:
        with open(WALLET_FILE, 'r') as f:
            data = json.load(f)
        logging.info(f"--- [Storage] Wallet loaded ({len(data)} users) ---")
        return data
    except Exception as e:
        logging.error(f"--- [Storage] Corrupt wallet file: {e}. Starting fresh. ---")
        return {}

print("--- Helper Functions Defined Successfully ---")

In [None]:
# Cell 3: Biometric Feature Extraction Pipeline

# [NEW] Circuit Constraints
# We must pick a fixed size for the ZKP circuit.
# We will track the top 30 minutiae points (60 coordinates total).
MAX_MINUTIAE = 30 

def get_crossing_number(img_skeleton):
    """Calculates Crossing Number (CN) to identify minutiae."""
    img = img_skeleton // 255
    img_padded = np.pad(img, 1, 'constant', constant_values=0)
    endings, bifurcations = [], []
    
    rows, cols = img.shape
    for y in range(rows):
        for x in range(cols):
            if img[y, x] == 1:
                neighbors = [
                    img_padded[y, x], img_padded[y, x+1], img_padded[y, x+2],
                    img_padded[y+1, x+2], img_padded[y+2, x+2], img_padded[y+2, x+1],
                    img_padded[y+2, x], img_padded[y+1, x]
                ]
                cn = 0
                for i in range(len(neighbors)):
                    cn += abs(int(neighbors[i]) - int(neighbors[(i + 1) % 8]))
                cn = 0.5 * cn
                
                if cn == 1: endings.append((x, y))
                elif cn == 3: bifurcations.append((x, y))
    return endings, bifurcations

def prune_minutiae(endings, bifurcations, img_shape, border_margin=15, dist_threshold=8):
    """Removes false minutiae based on border proximity and spatial clustering."""
    max_y, max_x = img_shape
    
    def is_valid_border(p):
        x, y = p
        return border_margin < x < (max_x - border_margin) and border_margin < y < (max_y - border_margin)
    
    all_minutiae = [{'coord': p, 'type': 'ending'} for p in endings if is_valid_border(p)] + \
                   [{'coord': p, 'type': 'bifurcation'} for p in bifurcations if is_valid_border(p)]
    
    to_remove = set()
    for i in range(len(all_minutiae)):
        for j in range(i + 1, len(all_minutiae)):
            p1, p2 = all_minutiae[i]['coord'], all_minutiae[j]['coord']
            dist = math.sqrt((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2)
            if dist < dist_threshold:
                to_remove.add(i)
                to_remove.add(j)

    clean_endings = [m['coord'] for i, m in enumerate(all_minutiae) if i not in to_remove and m['type'] == 'ending']
    clean_bifurcations = [m['coord'] for i, m in enumerate(all_minutiae) if i not in to_remove and m['type'] == 'bifurcation']
    
    return clean_endings, clean_bifurcations

# [NEW] Normalization Function with CLIENT-SIDE PCA
def normalize_and_pad(features):
    """
    1. Converts features to Numpy Array.
    2. Applies PCA to align the principal axis to the X-axis (Rotation Invariance).
    3. Fixes 180-degree ambiguity using skewness.
    4. Rounds back to integers, sorts, and pads.
    """
    if not features or len(features) < 2: 
        return [0] * (MAX_MINUTIAE * 2)

    # 1. Prepare Data
    coords = np.array([[f['x'], f['y']] for f in features])

    # 2. Apply PCA (Rotation Alignment)
    # PCA automatically centers the data (subtracts mean) and rotates it
    # so the direction of max variance is the new X-axis.
    pca = PCA(n_components=2)
    transformed = pca.fit_transform(coords)

    # 3. Fix 180-Degree Ambiguity
    # PCA lines up the axis, but doesn't know "up" from "down".
    # We check the skewness (3rd moment). If the "tail" is negative, we flip.
    x_skew = np.mean(transformed[:, 0] ** 3)
    y_skew = np.mean(transformed[:, 1] ** 3)

    if x_skew < 0:
        transformed[:, 0] = -transformed[:, 0]
    if y_skew < 0:
        transformed[:, 1] = -transformed[:, 1]

    # 4. Convert back to format
    normalized = []
    for point in transformed:
        normalized.append({'x': int(point[0]), 'y': int(point[1])})

    # 5. Sort (Deterministic Ordering)
    normalized.sort(key=lambda k: (k['x'], k['y']))

    # 6. Flatten & Pad
    vector = []
    # Take top N features
    for f in normalized[:MAX_MINUTIAE]:
        vector.append(f['x'])
        vector.append(f['y'])
    
    # Pad remainder with 0s
    while len(vector) < (MAX_MINUTIAE * 2):
        vector.append(0)
        
    return vector

def extract_full_template(image_path):
    """Orchestrates extraction and normalization."""
    img = cv2.imread(image_path, 0)
    if img is None: return None
    
    # Preprocess
    img = cv2.GaussianBlur(img, (5, 5), 0)
    img = cv2.adaptiveThreshold(img, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2)
    kernel = np.ones((3,3), np.uint8)
    img = cv2.morphologyEx(img, cv2.MORPH_CLOSE, cv2.morphologyEx(img, cv2.MORPH_OPEN, kernel))
    skeleton = (skeletonize(img / 255.0) * 255).astype(np.uint8)
    
    # Extract
    endings, bifurcations = get_crossing_number(skeleton)
    clean_endings, clean_bif = prune_minutiae(endings, bifurcations, skeleton.shape)
    
    # Combine (We ignore angles for Phase 1 Fuzzy)
    features = []
    for p in clean_endings: features.append({'x': p[0], 'y': p[1], 'type': 'ending'})
    for p in clean_bif: features.append({'x': p[0], 'y': p[1], 'type': 'bifurcation'})
    
    return features

def get_secret_from_template(raw_features):
    """
    Returns the Normalized Coordinate Vector.
    Output: List of ints [x1, y1, x2, y2...]
    """
    return normalize_and_pad(raw_features)

print("--- Biometric Pipeline Ready (PCA Rotation Invariance Enabled) ---")

In [None]:
# Cell 4: Server Communication & Proof Generation

GO_SERVICE_URL = "http://13.62.225.223:8080"
SERVER_URL = "http://13.62.225.223:5001"

# [NEW] Global list to store latency metrics for averaging later
proof_latencies = []
# [NEW] Global dict to store static circuit metrics (Constraints, Key Sizes)
zkp_static_metrics = {}

def get_server_challenge():
    """Fetches a fresh nonce from the Python Authentication Server."""
    try:
        resp = requests.get(f'{SERVER_URL}/challenge', timeout=5)
        return resp.json().get('challenge')
    except Exception as e:
        logging.error(f"Failed to fetch challenge: {e}")
        return None

def generate_proof(secret_vector, password, salt, candidate_vector=None, challenge="0"):
    """
    Calls the Go ZKP Service to generate a Groth16 proof.
    [FIXED] Now strictly binds the proof to the session challenge.
    """
    url = f'{GO_SERVICE_URL}/prove'
    if candidate_vector is None: 
        candidate_vector = secret_vector

    payload = {
        'secret': secret_vector, 
        'candidate': candidate_vector,
        'password': password, 
        'salt': salt,
        'challenge': str(challenge) # Ensure challenge is sent as string
    }
    
    try:
        start_time = time.perf_counter()
        resp = requests.post(url, json=payload, headers={'Content-Type': 'application/json'}, timeout=15)
        end_time = time.perf_counter()
        
        # [NEW METRIC] Calculate elapsed time and store it
        elapsed = end_time - start_time
        proof_latencies.append(elapsed)
        #logging.info(f"[METRICS] Proving Latency: {elapsed:.4f}s")
        
        resp.raise_for_status()
        data = resp.json()
        
        # [NEW] Silently capture Circuit Metrics from the response (if not already done)
        if not zkp_static_metrics:
            zkp_static_metrics['constraints'] = data.get('nb_constraints', 0)
            zkp_static_metrics['pk_size'] = data.get('pk_size_bytes', 0)
            zkp_static_metrics['vk_size'] = data.get('vk_size_bytes', 0)

        # Returns the proof blob and the commitment hash for enrollment/verification
        return data.get('proof'), data.get('commitment') 
    except Exception as e:
        logging.error(f"Proof Gen Error: {e}")
        return None, None

def register_user_on_server(username, commitment, salt):
    """Stores the user's static biometric commitment and salt on the server."""
    url = f'{SERVER_URL}/enroll'
    payload = {
        'username': username, 
        'commitment': commitment,
        'salt': salt # Added salt to payload
    }
    try:
        resp = requests.post(url, json=payload, headers={'Content-Type': 'application/json'}, timeout=5)
        return resp.status_code == 201
    except Exception as e:
        logging.error(f"Enrollment Error: {e}")
        return False

def verify_login(username, secret_vector, password, salt, candidate_vector):
    """
    Performs the full login flow:
    1. Get Challenge -> 2. Generate Bound Proof -> 3. Verify on Server
    """
    challenge = get_server_challenge()
    if not challenge: 
        return False

    # Generate proof bound to this specific challenge
    proof, _ = generate_proof(secret_vector, password, salt, candidate_vector, challenge)
    if not proof: 
        return False

    url = f'{SERVER_URL}/verifyCommitment'
    payload = {
        'username': username, 
        'proof': proof, 
        'challenge': challenge
    }
    
    try:
        resp = requests.post(url, json=payload, headers={'Content-Type': 'application/json'}, timeout=15)
        return resp.json().get('message') == "Login Successful"
    except Exception as e:
        logging.error(f"Verification Request Error: {e}")
        return False

print("--- Communication Layer Ready (Secure Binding Mode) ---")

In [None]:
# Cell 5: Automated Integration Test (Persistence, Security, Fuzzy, Replay)

TEST_DATA_DIR = "./SOCOfing Data/"
NUM_SUBJECTS = 10
# Equations for number of results = NUM_SUBJECTS^2 * 2 Passwords * 3 Test Types (Noise / Shift / Rotation)

def reset_server_db():
    try:
        requests.post(f'{SERVER_URL}/reset', timeout=5)
    except:
        logging.warning("Could not reset Server DB.")

def apply_synthetic_noise(vector, noise_level=2):
    """
    TEST A: Local Noise (Distortion)
    Simulates sensor noise/skin elasticity by shifting random points.
    """
    noisy_vector = list(vector)
    indices = random.sample(range(len(vector)), 5)
    for idx in indices:
        if noisy_vector[idx] != 0: 
            shift = random.randint(-noise_level, noise_level)
            noisy_vector[idx] += shift
    return noisy_vector

def apply_translation_shift(vector, shift_x=3, shift_y=3):
    """
    TEST B: Global Shift (Translation)
    Simulates placing the finger slightly off-center.
    """
    shifted_vector = []
    for i in range(0, len(vector), 2):
        if vector[i] == 0 and vector[i+1] == 0:
            shifted_vector.extend([0, 0])
            continue
        shifted_vector.append(vector[i] + shift_x)
        shifted_vector.append(vector[i+1] + shift_y)
    return shifted_vector

def apply_rotation(vector, angle_deg=70):
    """
    TEST C: Rotation
    Simulates placing the finger at an angle.
    Rotates points around the center (0,0) by angle_deg.
    """
    rad = math.radians(angle_deg)
    cos_val = math.cos(rad)
    sin_val = math.sin(rad)
    
    rotated_vector = []
    for i in range(0, len(vector), 2):
        # Skip padding (0,0) pairs
        if vector[i] == 0 and vector[i+1] == 0:
            rotated_vector.extend([0, 0])
            continue
            
        x = vector[i]
        y = vector[i+1]
        
        # Standard Rotation Matrix:
        # x' = x cos θ - y sin θ
        # y' = x sin θ + y cos θ
        x_new = x * cos_val - y * sin_val
        y_new = x * sin_val + y * cos_val
        
        rotated_vector.append(int(x_new))
        rotated_vector.append(int(y_new))
        
    return rotated_vector

# [NEW] Wrapper to Re-Normalize after simulated rotation
def simulate_client_scan(secret_vector, noise_func=None, shift_func=None, rotate_func=None):
    """
    Simulates the full Client pipeline:
    1. Take original secret.
    2. Apply physical distortion (Rotation/Shift/Noise).
    3. RE-APPLY PCA Normalization (Client handles this!).
    """
    # 1. Apply Distortion
    distorted = list(secret_vector)
    if rotate_func: distorted = rotate_func(distorted)
    if shift_func: distorted = shift_func(distorted)
    if noise_func: distorted = noise_func(distorted)

    # 2. Convert back to dict format for PCA pipeline
    features = []
    for i in range(0, len(distorted), 2):
        if distorted[i] == 0 and distorted[i+1] == 0: continue
        features.append({'x': distorted[i], 'y': distorted[i+1]})

    # 3. Re-run PCA Normalization (This fixes the rotation!)
    return normalize_and_pad(features)


def test_replay_attack(wallet):
    """
    [NEW] Phase 3: Specifically attempts to reuse a proof to log in twice.
    Verifies that the Go BoundChallenge and Python nonce removal work.
    """
    print("\n--- [SECURITY CHECK] STARTING REPLAY ATTACK SIMULATION ---")
    if not wallet: return

    username = list(wallet.keys())[0]
    creds = wallet[username]
    
    # 1. Get a fresh nonce (Legitimate Step)
    nonce = get_server_challenge()
    print(f"1. Obtained Nonce: {nonce}")
    
    # 2. Generate a valid proof bound to this nonce
    proof, _ = generate_proof(creds['secret'], creds['password'], creds['salt'], creds['secret'], nonce)
    
    # 3. Legitimate Login
    payload = {'username': username, 'proof': proof, 'challenge': nonce}
    resp = requests.post(f'{SERVER_URL}/verifyCommitment', json=payload)
    if resp.status_code == 200:
        print("2. Legitimate Login: SUCCESS (Nonce Consumed)")
    else:
        print(f"2. Legitimate Login: FAILED ({resp.text}) - Test Aborted")
        return

    # 4. REPLAY ATTACK: Use same payload again (Nonce is now missing from server's active_challenges)
    print("3. Attempting Replay with used nonce/proof...")
    resp_replay = requests.post(f'{SERVER_URL}/verifyCommitment', json=payload)
    
    if resp_replay.status_code == 403:
        print("4. Replay Attack: BLOCKED (403 Forbidden) -> SUCCESS")
        print(">> SYSTEM STATUS: SECURE AGAINST REPLAY ATTACKS <<")
    else:
        print(f"4. Replay Attack: FAILED (Status {resp_replay.status_code}) -> VULNERABLE")

def run_test():
    logging.info("=== STARTING FULL INTEGRATION TEST ===")
    reset_server_db()
    proof_latencies.clear() # [NEW] Clear previous latency metrics
    zkp_static_metrics.clear() # [NEW] Clear static metrics
    
    if os.path.exists(WALLET_FILE):
        os.remove(WALLET_FILE)
        logging.info("[Setup] Deleted old wallet.json")

    # Load Data
    images = glob.glob(os.path.join(TEST_DATA_DIR, "*.BMP"))
    if len(images) < NUM_SUBJECTS:
        logging.error("Insufficient test data found.")
        return
    
    random.seed(42)
    test_set = random.sample(images, NUM_SUBJECTS)
    enrolled_users = []
    
    # --- PHASE 1: ENROLLMENT ---
    logging.info("--- Phase 1: Enrollment ---")
    wallet = load_wallet()
    
    for i, img_path in enumerate(test_set):
        username = f"user_{i:02d}"
        
        features = extract_full_template(img_path)
        if not features: continue
        secret_vector = get_secret_from_template(features)
        
        salt = generate_secure_salt()
        password = generate_strong_password()
        
        # [SECURE MODE] Get commitment via Prover using challenge "0" for static enrollment
        _, commitment = generate_proof(secret_vector, password, salt, candidate_vector=None, challenge="0")
        
        # Register using the Commitment string (Salt added to support Argon2id storage)
        if register_user_on_server(username, commitment, salt):
            wallet[username] = {'secret': secret_vector, 'salt': salt, 'password': password}
            enrolled_users.append(username)
            
    save_wallet(wallet)
    
    # --- PHASE 2: FUZZY + SHIFT + ROTATION VERIFICATION ---
    logging.info("--- Phase 2: Biometric Verification ---")
    
    stats = {'TP': 0, 'TN': 0, 'FP': 0, 'FN': 0}
    total_tests = 0
    
    for target_user in enrolled_users:
        for attempt_user in enrolled_users:
            creds = wallet.get(attempt_user)
            
            # Simulated scans applying physical distortion and then PCA re-normalization
            test_cases = [
                ("NOISE", simulate_client_scan(creds['secret'], noise_func=lambda v: apply_synthetic_noise(v, 2))),
                ("SHIFT", simulate_client_scan(creds['secret'], shift_func=lambda v: apply_translation_shift(v, 3, 3))),
                ("ROTATION", simulate_client_scan(creds['secret'], rotate_func=lambda v: apply_rotation(v, 65)))
            ]

            for test_type, candidate in test_cases:
                # Inner Loop: Test Password Validity
                for use_wrong_password in [False, True]:
                    total_tests += 1
                    
                    if use_wrong_password:
                        attempt_password = "wrong_password_123"
                    else:
                        attempt_password = creds['password']

                    # verify_login now fetches a fresh challenge and binds the proof
                    success = verify_login(target_user, creds['secret'], attempt_password, creds['salt'], candidate)
                    
                    is_legit_attempt = (target_user == attempt_user) and (not use_wrong_password)

                    if is_legit_attempt:
                        if success: 
                            stats['TP'] += 1
                            logging.info(f"[SUCCESS] {target_user} logged in ({test_type}).")
                        else: 
                            stats['FN'] += 1
                            logging.error(f"[FAILURE] {target_user} denied access ({test_type}).")
                    else:
                        if not success: 
                            stats['TN'] += 1
                        else: 
                            stats['FP'] += 1
                            logging.critical(f"[BREACH] {attempt_user} accessed {target_user} using {test_type}!")

    print("\n=== FINAL RESULTS ===")
    print(f"Total Tests: {total_tests}")
    expected_tp = len(enrolled_users) * 3 # (Noise + Shift + Rotation) per user
    print(f"TP (Valid): {stats['TP']}/{expected_tp}")
    print(f"FP (Breaches): {stats['FP']}")
    print(f"TN (Blocked): {stats['TN']}")
    print(f"FN (Incorrectly Blocked): {stats['FN']}")
    
    # [METRIC] Calculate and Print FAR/FRR
    total_valid_attempts = stats['TP'] + stats['FN']
    total_impostor_attempts = stats['FP'] + stats['TN']
    
    far = (stats['FP'] / total_impostor_attempts * 100) if total_impostor_attempts > 0 else 0.0
    frr = (stats['FN'] / total_valid_attempts * 100) if total_valid_attempts > 0 else 0.0
    
    print(f"FAR (False Acceptance Rate): {far:.2f}%")
    print(f"FRR (False Rejection Rate): {frr:.2f}%")
    
    if stats['FP'] > 0:
        print(">> SYSTEM FAILURE: SECURITY BREACH DETECTED <<")
    elif stats['TP'] < expected_tp:
        print(">> SYSTEM FAILURE: VALID USERS BLOCKED <<")
    else:
        print(">> SYSTEM STATUS: ROBUST <<")
    
    # [NEW] Calculate and Print Average Latency
    if proof_latencies:
        avg_latency = sum(proof_latencies) / len(proof_latencies)
        print(f"Average Proving Latency: {avg_latency:.4f}s")
    else:
        print("Average Proving Latency: N/A")

    # [NEW] Print Circuit Summary (At the end)
    print(f"\n====== ZKP CIRCUIT SUMMARY ======")
    print(f"Constraints:   {zkp_static_metrics.get('constraints', 'N/A')}")
    pk_size = zkp_static_metrics.get('pk_size', 0)
    print(f"Proving Key:   {pk_size/1024:.2f} KB" if pk_size else "Proving Key:   N/A")
    print(f"Verifying Key: {zkp_static_metrics.get('vk_size', 'N/A')} Bytes")
    print(f"=================================\n")

    # --- PHASE 3: REPLAY ATTACK TEST ---
    test_replay_attack(wallet)

if __name__ == '__main__':
    run_test()