In [None]:
# key_generation/dimensionality_reduction/base_reducer.py

import numpy as np
from abc import ABC, abstractmethod

class DimensionalityReducer(ABC):
    """Base class for all dimensionality reduction techniques"""
    
    def __init__(self, target_dims=32, random_state=42):
        """
        Initialize the reducer
        
        Args:
            target_dims: Target dimensionality for reduction
            random_state: Random seed for reproducibility
        """
        self.target_dims = target_dims
        self.random_state = random_state
        self.model = None
        self.is_fitted = False
    
    @abstractmethod
    def fit(self, embeddings):
        """
        Fit the dimensionality reduction model
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
        """
        pass
    
    @abstractmethod
    def transform(self, embeddings):
        """
        Transform embeddings to lower dimension
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            
        Returns:
            reduced_embeddings: Array of reduced embeddings [n_samples, target_dims]
        """
        pass
    
    def fit_transform(self, embeddings):
        """
        Fit and transform in one step
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            
        Returns:
            reduced_embeddings: Array of reduced embeddings [n_samples, target_dims]
        """
        self.fit(embeddings)
        return self.transform(embeddings)
    
    def save(self, path):
        """
        Save the fitted model
        
        Args:
            path: Path to save the model
        """
        if not self.is_fitted:
            raise ValueError("Model is not fitted yet")
        
        # Implement in subclasses
        pass
    
    @classmethod
    def load(cls, path):
        """
        Load a fitted model
        
        Args:
            path: Path to load the model from
            
        Returns:
            reducer: Loaded dimensionality reducer
        """
        # Implement in subclasses
        pass

In [None]:
# key_generation/dimensionality_reduction/pca_reducer.py

import os
import numpy as np
from sklearn.decomposition import PCA
import joblib

from key_generation.dimensionality_reduction.base_reducer import DimensionalityReducer

class PCAReducer(DimensionalityReducer):
    """PCA-based dimensionality reduction"""
    
    def __init__(self, target_dims=32, random_state=42, whiten=True):
        """
        Initialize PCA reducer
        
        Args:
            target_dims: Target dimensionality for reduction
            random_state: Random seed for reproducibility
            whiten: Whether to whiten the data (decorrelate features)
        """
        super().__init__(target_dims=target_dims, random_state=random_state)
        self.whiten = whiten
    
    def fit(self, embeddings):
        """
        Fit PCA model to embeddings
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
        """
        # Initialize the PCA model
        self.model = PCA(
            n_components=self.target_dims,
            whiten=self.whiten,
            random_state=self.random_state
        )
        
        # Fit the model
        self.model.fit(embeddings)
        
        # Calculate and store explained variance
        self.explained_variance_ratio = self.model.explained_variance_ratio_
        self.cumulative_variance = np.cumsum(self.explained_variance_ratio)
        
        self.is_fitted = True
        
        return self
    
    def transform(self, embeddings):
        """
        Transform embeddings to lower dimension using PCA
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            
        Returns:
            reduced_embeddings: Array of reduced embeddings [n_samples, target_dims]
        """
        if not self.is_fitted:
            raise ValueError("PCA model is not fitted yet")
        
        reduced_embeddings = self.model.transform(embeddings)
        return reduced_embeddings
    
    def save(self, path):
        """
        Save the fitted PCA model
        
        Args:
            path: Path to save the model
        """
        if not self.is_fitted:
            raise ValueError("PCA model is not fitted yet")
        
        os.makedirs(os.path.dirname(path), exist_ok=True)
        joblib.dump(self.model, path)
    
    @classmethod
    def load(cls, path):
        """
        Load a fitted PCA model
        
        Args:
            path: Path to load the model from
            
        Returns:
            reducer: Loaded PCA reducer
        """
        # Load model
        model = joblib.load(path)
        
        # Create reducer instance
        reducer = cls(
            target_dims=model.n_components,
            random_state=model.random_state,
            whiten=model.whiten
        )
        
        # Restore model state
        reducer.model = model
        reducer.explained_variance_ratio = model.explained_variance_ratio_
        reducer.cumulative_variance = np.cumsum(reducer.explained_variance_ratio)
        reducer.is_fitted = True
        
        return reducer

In [None]:
# key_generation/dimensionality_reduction/autoencoder_reducer.py

import os
import numpy as np
import joblib
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam

from key_generation.dimensionality_reduction.base_reducer import DimensionalityReducer

class AutoencoderReducer(DimensionalityReducer):
    """Autoencoder-based dimensionality reduction"""

    def __init__(self, target_dims=32, random_state=42, hidden_layer_sizes=None, epochs=50, batch_size=32, learning_rate=0.001):
        """
        Initialize Autoencoder reducer

        Args:
            target_dims: Target dimensionality for reduction
            random_state: Random seed for reproducibility
            hidden_layer_sizes: List of hidden layer sizes for the encoder
            epochs: Number of training epochs
            batch_size: Batch size for training
            learning_rate: Learning rate for the optimizer
        """
        super().__init__(target_dims=target_dims, random_state=random_state)
        self.hidden_layer_sizes = hidden_layer_sizes or [128, 64]
        self.epochs = epochs
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.encoder = None
        self.autoencoder = None

    def _build_autoencoder(self, input_dim):
        """
        Build the autoencoder model

        Args:
            input_dim: Dimensionality of the input data
        """
        # Input layer
        input_layer = Input(shape=(input_dim,))
        
        # Encoder
        x = input_layer
        for size in self.hidden_layer_sizes:
            x = Dense(size, activation='relu')(x)
        encoded = Dense(self.target_dims, activation='relu')(x)
        
        # Decoder
        x = encoded
        for size in reversed(self.hidden_layer_sizes):
            x = Dense(size, activation='relu')(x)
        decoded = Dense(input_dim, activation='sigmoid')(x)
        
        # Autoencoder and encoder models
        self.autoencoder = Model(input_layer, decoded)
        self.encoder = Model(input_layer, encoded)
        
        # Compile the autoencoder
        self.autoencoder.compile(optimizer=Adam(learning_rate=self.learning_rate), loss='mse')

    def fit(self, embeddings):
        """
        Fit the autoencoder model to embeddings

        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
        """
        input_dim = embeddings.shape[1]
        self._build_autoencoder(input_dim)
        
        # Train the autoencoder
        self.autoencoder.fit(
            embeddings, embeddings,
            epochs=self.epochs,
            batch_size=self.batch_size,
            shuffle=True,
            verbose=1
        )
        self.is_fitted = True

        return self

    def transform(self, embeddings):
        """
        Transform embeddings to lower dimension using the encoder

        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            
        Returns:
            reduced_embeddings: Array of reduced embeddings [n_samples, target_dims]
        """
        if not self.is_fitted:
            raise ValueError("Autoencoder model is not fitted yet")
        
        reduced_embeddings = self.encoder.predict(embeddings)
        return reduced_embeddings

    def save(self, path):
        """
        Save the fitted autoencoder model

        Args:
            path: Path to save the model
        """
        if not self.is_fitted:
            raise ValueError("Autoencoder model is not fitted yet")
        
        os.makedirs(os.path.dirname(path), exist_ok=True)
        self.autoencoder.save(path)

    @classmethod
    def load(cls, path):
        """
        Load a fitted autoencoder model

        Args:
            path: Path to load the model from
            
        Returns:
            reducer: Loaded Autoencoder reducer
        """
        # Load the autoencoder model
        autoencoder = load_model(path)
        
        # Extract the encoder part
        input_layer = autoencoder.input
        encoded_layer = autoencoder.layers[len(autoencoder.layers) // 2].output
        encoder = Model(input_layer, encoded_layer)
        
        # Create reducer instance
        reducer = cls(
            target_dims=encoded_layer.shape[-1],
            random_state=None  # Random state is not needed for a loaded model
        )
        reducer.autoencoder = autoencoder
        reducer.encoder = encoder
        reducer.is_fitted = True
        
        return reducer

In [None]:
# key_generation/dimensionality_reduction/random_projection_reducer.py

import os
import numpy as np
from sklearn.random_projection import GaussianRandomProjection
import joblib

from key_generation.dimensionality_reduction.base_reducer import DimensionalityReducer

class RandomProjectionReducer(DimensionalityReducer):
    """Random Projection-based dimensionality reduction"""

    def __init__(self, target_dims=32, random_state=42):
        """
        Initialize Random Projection reducer

        Args:
            target_dims: Target dimensionality for reduction
            random_state: Random seed for reproducibility
        """
        super().__init__(target_dims=target_dims, random_state=random_state)

    def fit(self, embeddings):
        """
        Fit Random Projection model to embeddings

        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
        """
        # Initialize the Random Projection model
        self.model = GaussianRandomProjection(
            n_components=self.target_dims,
            random_state=self.random_state
        )
        
        # Fit the model (Random Projection doesn't require explicit fitting)
        self.model.fit(embeddings)
        self.is_fitted = True

        return self

    def transform(self, embeddings):
        """
        Transform embeddings to lower dimension using Random Projection

        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            
        Returns:
            reduced_embeddings: Array of reduced embeddings [n_samples, target_dims]
        """
        if not self.is_fitted:
            raise ValueError("Random Projection model is not fitted yet")
        
        reduced_embeddings = self.model.transform(embeddings)
        return reduced_embeddings

    def save(self, path):
        """
        Save the fitted Random Projection model

        Args:
            path: Path to save the model
        """
        if not self.is_fitted:
            raise ValueError("Random Projection model is not fitted yet")
        
        os.makedirs(os.path.dirname(path), exist_ok=True)
        joblib.dump(self.model, path)

    @classmethod
    def load(cls, path):
        """
        Load a fitted Random Projection model

        Args:
            path: Path to load the model from
            
        Returns:
            reducer: Loaded Random Projection reducer
        """
        # Load model
        model = joblib.load(path)
        
        # Create reducer instance
        reducer = cls(
            target_dims=model.n_components,
            random_state=model.random_state
        )
        
        # Restore model state
        reducer.model = model
        reducer.is_fitted = True
        
        return reducer

In [None]:
# key_generation/dimensionality_reduction/umap_reducer.py

import os
import numpy as np
import joblib
import umap

from key_generation.dimensionality_reduction.base_reducer import DimensionalityReducer

class UMAPReducer(DimensionalityReducer):
    """UMAP-based dimensionality reduction"""

    def __init__(self, target_dims=32, random_state=42, n_neighbors=15, min_dist=0.1):
        """
        Initialize UMAP reducer

        Args:
            target_dims: Target dimensionality for reduction
            random_state: Random seed for reproducibility
            n_neighbors: Number of neighbors for UMAP
            min_dist: Minimum distance between points in the reduced space
        """
        super().__init__(target_dims=target_dims, random_state=random_state)
        self.n_neighbors = n_neighbors
        self.min_dist = min_dist

    def fit(self, embeddings):
        """
        Fit UMAP model to embeddings

        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
        """
        # Initialize the UMAP model
        self.model = umap.UMAP(
            n_components=self.target_dims,
            random_state=self.random_state,
            n_neighbors=self.n_neighbors,
            min_dist=self.min_dist
        )
        
        # Fit the model
        self.model.fit(embeddings)
        self.is_fitted = True

        return self

    def transform(self, embeddings):
        """
        Transform embeddings to lower dimension using UMAP

        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            
        Returns:
            reduced_embeddings: Array of reduced embeddings [n_samples, target_dims]
        """
        if not self.is_fitted:
            raise ValueError("UMAP model is not fitted yet")
        
        reduced_embeddings = self.model.transform(embeddings)
        return reduced_embeddings

    def save(self, path):
        """
        Save the fitted UMAP model

        Args:
            path: Path to save the model
        """
        if not self.is_fitted:
            raise ValueError("UMAP model is not fitted yet")
        
        os.makedirs(os.path.dirname(path), exist_ok=True)
        joblib.dump(self.model, path)

    @classmethod
    def load(cls, path):
        """
        Load a fitted UMAP model

        Args:
            path: Path to load the model from
            
        Returns:
            reducer: Loaded UMAP reducer
        """
        # Load model
        model = joblib.load(path)
        
        # Create reducer instance
        reducer = cls(
            target_dims=model.n_components,
            random_state=model.random_state,
            n_neighbors=model.n_neighbors,
            min_dist=model.min_dist
        )
        
        # Restore model state
        reducer.model = model
        reducer.is_fitted = True
        
        return reducer

In [None]:
# key_generation/dimensionality_reduction/tsne_reducer.py

import os
import numpy as np
from sklearn.manifold import TSNE
import joblib

from key_generation.dimensionality_reduction.base_reducer import DimensionalityReducer

class TSNEReducer(DimensionalityReducer):
    """t-SNE based dimensionality reduction"""
    
    def __init__(self, target_dims=2, random_state=42, perplexity=30, n_iter=1000):
        """
        Initialize t-SNE reducer
        
        Args:
            target_dims: Target dimensionality for reduction
            random_state: Random seed for reproducibility
            perplexity: Perplexity parameter for t-SNE
            n_iter: Number of iterations for optimization
        """
        super().__init__(target_dims=target_dims, random_state=random_state)
        self.perplexity = perplexity
        self.n_iter = n_iter
    
    def fit(self, embeddings):
        """
        Fit t-SNE model to embeddings
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
        """
        # Initialize the t-SNE model
        self.model = TSNE(
            n_components=self.target_dims,
            perplexity=self.perplexity,
            n_iter=self.n_iter,
            random_state=self.random_state
        )
        
        # Fit the model
        self.model.fit(embeddings)
        self.is_fitted = True
        
        return self
    
    def transform(self, embeddings):
        """
        Transform embeddings to lower dimension using t-SNE
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            
        Returns:
            reduced_embeddings: Array of reduced embeddings [n_samples, target_dims]
        """
        if not self.is_fitted:
            raise ValueError("t-SNE model is not fitted yet")
        
        reduced_embeddings = self.model.fit_transform(embeddings)
        return reduced_embeddings
    
    def save(self, path):
        """
        Save the fitted t-SNE model
        
        Args:
            path: Path to save the model
        """
        if not self.is_fitted:
            raise ValueError("t-SNE model is not fitted yet")
        
        os.makedirs(os.path.dirname(path), exist_ok=True)
        joblib.dump(self.model, path)
    
    @classmethod
    def load(cls, path):
        """
        Load a fitted t-SNE model
        
        Args:
            path: Path to load the model from
            
        Returns:
            reducer: Loaded t-SNE reducer
        """
        # Load model
        model = joblib.load(path)
        
        # Create reducer instance
        reducer = cls(
            target_dims=model.n_components,
            random_state=model.random_state,
            perplexity=model.perplexity,
            n_iter=model.n_iter
        )
        
        # Restore model state
        reducer.model = model
        reducer.is_fitted = True
        
        return reducer

In [None]:
# key_generation/dimensionality_reduction/reducer_factory.py

from key_generation.dimensionality_reduction.pca_reducer import PCAReducer
from key_generation.dimensionality_reduction.tsne_reducer import TSNEReducer
from key_generation.dimensionality_reduction.umap_reducer import UMAPReducer
from key_generation.dimensionality_reduction.random_projection_reducer import RandomProjectionReducer
from key_generation.dimensionality_reduction.autoencoder_reducer import AutoencoderReducer

def get_reducer(method, target_dims=32, **kwargs):
    """
    Factory function to get a dimensionality reducer
    
    Args:
        method: Reduction method ('pca', 'tsne', 'umap', 'random_projection', 'autoencoder')
        target_dims: Target dimensionality for reduction
        **kwargs: Additional arguments for specific reducers
        
    Returns:
        reducer: Dimensionality reducer
    """
    if method == 'pca':
        return PCAReducer(target_dims=target_dims, **kwargs)
    elif method == 'tsne':
        return TSNEReducer(target_dims=target_dims, **kwargs)
    elif method == 'umap':
        return UMAPReducer(target_dims=target_dims, **kwargs)
    elif method == 'random_projection':
        return RandomProjectionReducer(target_dims=target_dims, **kwargs)
    elif method == 'autoencoder':
        return AutoencoderReducer(target_dims=target_dims, **kwargs)
    else:
        raise ValueError(f"Unknown reduction method: {method}")

def load_reducer(method, path):
    """
    Load a fitted reducer
    
    Args:
        method: Reduction method
        path: Path to load the model from
        
    Returns:
        reducer: Loaded dimensionality reducer
    """
    if method == 'pca':
        return PCAReducer.load(path)
    elif method == 'tsne':
        return TSNEReducer.load(path)
    elif method == 'umap':
        return UMAPReducer.load(path)
    elif method == 'random_projection':
        return RandomProjectionReducer.load(path)
    elif method == 'autoencoder':
        return AutoencoderReducer.load(path)
    else:
        raise ValueError(f"Unknown reduction method: {method}")

In [None]:
# key_generation/dimensionality_reduction/compare_reducers.py

import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import NearestNeighbors
from sklearn.manifold import trustworthiness
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import time
import pandas as pd

from key_generation.dimensionality_reduction.reducer_factory import get_reducer

def compare_reducers(embeddings, methods=None, target_dims=32, n_neighbors=5):
    """
    Compare different dimensionality reduction methods
    
    Args:
        embeddings: Array of face embeddings [n_samples, n_features]
        methods: List of reduction methods to compare
        target_dims: Target dimensionality
        n_neighbors: Number of neighbors for evaluation
        
    Returns:
        results: DataFrame with comparison results
    """
    if methods is None:
        methods = ['pca', 'umap', 'random_projection']
    
    # Initialize result metrics
    results = {
        'Method': [],
        'Neighbor Preservation (%)': [],
        'Trustworthiness': [],
        'Silhouette Score': [],
        'Processing Time (ms)': []
    }
    
    # Precompute neighbors in original space
    nn_original = NearestNeighbors(n_neighbors=n_neighbors+1)  # +1 because first match is self
    nn_original.fit(embeddings)
    _, indices_original = nn_original.kneighbors(embeddings)
    indices_original = indices_original[:, 1:]  # Remove self match
    
    # Compare each method
    for method in methods:
        print(f"Evaluating {method}...")
        
        # Get reducer
        reducer = get_reducer(method, target_dims=target_dims)
        
        # Measure time
        start_time = time.time()
        reduced = reducer.fit_transform(embeddings)
        processing_time = (time.time() - start_time) * 1000  # Convert to ms
        
        # Calculate neighbor preservation
        nn_reduced = NearestNeighbors(n_neighbors=n_neighbors+1)
        nn_reduced.fit(reduced)
        _, indices_reduced = nn_reduced.kneighbors(reduced)
        indices_reduced = indices_reduced[:, 1:]  # Remove self match
        
        # Calculate preservation rate
        neighbor_preservation = 0
        for i in range(len(embeddings)):
            intersection = np.intersect1d(indices_original[i], indices_reduced[i])
            neighbor_preservation += len(intersection) / n_neighbors
        
        neighbor_preservation = 100 * neighbor_preservation / len(embeddings)
        
        # Calculate trustworthiness
        trust = trustworthiness(embeddings, reduced, n_neighbors=n_neighbors)
        
        # Calculate silhouette score (clustering quality)
        # Use fewer clusters if samples are limited
        n_clusters = min(5, len(embeddings) // 10)
        if n_clusters >= 2:  # Need at least 2 clusters
            kmeans = KMeans(n_clusters=n_clusters, random_state=42)
            clusters = kmeans.fit_predict(reduced)
            silhouette = silhouette_score(reduced, clusters)
        else:
            silhouette = float('nan')
        
        # Store results
        results['Method'].append(method)
        results['Neighbor Preservation (%)'].append(round(neighbor_preservation, 2))
        results['Trustworthiness'].append(round(trust, 4))
        results['Silhouette Score'].append(round(silhouette, 4))
        results['Processing Time (ms)'].append(round(processing_time, 2))
    
    # Convert to DataFrame
    results_df = pd.DataFrame(results)
    
    return results_df


In [None]:
# key_generation/similarity_component/base_lsh.py

import numpy as np
from abc import ABC, abstractmethod

class LocalitySensitiveHash(ABC):
    """Base class for all LSH implementations"""
    
    def __init__(self, hash_bits=192, random_state=42):
        """
        Initialize the LSH hasher
        
        Args:
            hash_bits: Number of bits in the output hash
            random_state: Random seed for reproducibility
        """
        self.hash_bits = hash_bits
        self.random_state = random_state
        self.model = None
        self.is_fitted = False
    
    @abstractmethod
    def fit(self, embeddings):
        """
        Fit the LSH model to the data
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
        """
        pass
    
    @abstractmethod
    def hash(self, embeddings):
        """
        Generate hashes for the given embeddings
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            
        Returns:
            hashes: Binary array of shape [n_samples, hash_bits]
        """
        pass
    
    def save(self, path):
        """
        Save the fitted LSH model
        
        Args:
            path: Path to save the model
        """
        if not self.is_fitted:
            raise ValueError("LSH model is not fitted yet")
        
        # Implement in subclasses
        pass
    
    @classmethod
    def load(cls, path):
        """
        Load a fitted LSH model
        
        Args:
            path: Path to load the model from
            
        Returns:
            lsh: Loaded LSH model
        """
        # Implement in subclasses
        pass
    
    def hamming_distance(self, hash1, hash2):
        """
        Calculate Hamming distance between two hashes
        
        Args:
            hash1: First hash (binary array)
            hash2: Second hash (binary array)
            
        Returns:
            distance: Hamming distance (number of different bits)
        """
        return np.sum(hash1 != hash2)
    
    def hamming_similarity(self, hash1, hash2):
        """
        Calculate Hamming similarity between two hashes
        
        Args:
            hash1: First hash (binary array)
            hash2: Second hash (binary array)
            
        Returns:
            similarity: Hamming similarity (0-1, where 1 is identical)
        """
        return 1 - (self.hamming_distance(hash1, hash2) / self.hash_bits)

In [None]:
# key_generation/similarity_component/simhash.py

import os
import numpy as np
import joblib

from key_generation.similarity_component.base_lsh import LocalitySensitiveHash

class SimHash(LocalitySensitiveHash):
    """SimHash implementation for similarity-preserving hashing"""
    
    def __init__(self, hash_bits=192, random_state=42):
        """
        Initialize SimHash
        
        Args:
            hash_bits: Number of bits in the output hash
            random_state: Random seed for reproducibility
        """
        super().__init__(hash_bits=hash_bits, random_state=random_state)
        
        # Set random seed
        np.random.seed(self.random_state)
    
    def fit(self, embeddings):
        """
        Fit SimHash by generating random hyperplanes
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
        """
        # Get embedding dimension
        self.input_dim = embeddings.shape[1]
        
        # Generate random hyperplanes
        self.hyperplanes = np.random.randn(self.hash_bits, self.input_dim)
        
        # Normalize hyperplanes
        self.hyperplanes = self.hyperplanes / np.linalg.norm(self.hyperplanes, axis=1, keepdims=True)
        
        self.is_fitted = True
        
        return self
    
    def hash(self, embeddings):
        """
        Generate binary hashes using SimHash
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            
        Returns:
            hashes: Binary array of shape [n_samples, hash_bits]
        """
        if not self.is_fitted:
            raise ValueError("SimHash model is not fitted yet")
        
        # Project embeddings onto hyperplanes
        projections = np.dot(embeddings, self.hyperplanes.T)
        
        # Convert to binary (0 or 1)
        binary_hash = (projections > 0).astype(np.int8)
        
        return binary_hash
    
    def save(self, path):
        """
        Save the fitted SimHash model
        
        Args:
            path: Path to save the model
        """
        if not self.is_fitted:
            raise ValueError("SimHash model is not fitted yet")
        
        os.makedirs(os.path.dirname(path), exist_ok=True)
        
        # Save model data
        model_data = {
            'hyperplanes': self.hyperplanes,
            'input_dim': self.input_dim,
            'hash_bits': self.hash_bits,
            'random_state': self.random_state
        }
        
        joblib.dump(model_data, path)
    
    @classmethod
    def load(cls, path):
        """
        Load a fitted SimHash model
        
        Args:
            path: Path to load the model from
            
        Returns:
            simhash: Loaded SimHash model
        """
        # Load model data
        model_data = joblib.load(path)
        
        # Create model instance
        simhash = cls(
            hash_bits=model_data['hash_bits'],
            random_state=model_data['random_state']
        )
        
        # Restore model state
        simhash.hyperplanes = model_data['hyperplanes']
        simhash.input_dim = model_data['input_dim']
        simhash.is_fitted = True
        
        return simhash

In [None]:
# key_generation/similarity_component/lsh_factory.py

from key_generation.similarity_component.simhash import SimHash
from key_generation.similarity_component.spherical_lsh import SphericalLSH

def get_lsh(method, hash_bits=192, **kwargs):
    """
    Factory function to get an LSH hasher
    
    Args:
        method: LSH method ('simhash', 'spherical')
        hash_bits: Number of bits in the output hash
        **kwargs: Additional arguments for specific hashers
        
    Returns:
        lsh: LSH hasher
    """
    if method == 'simhash':
        return SimHash(hash_bits=hash_bits, **kwargs)
    elif method == 'spherical':
        return SphericalLSH(hash_bits=hash_bits, **kwargs)
    else:
        raise ValueError(f"Unknown LSH method: {method}")

def load_lsh(method, path):
    """
    Load a fitted LSH hasher
    
    Args:
        method: LSH method
        path: Path to load the model from
        
    Returns:
        lsh: Loaded LSH hasher
    """
    if method == 'simhash':
        return SimHash.load(path)
    elif method == 'spherical':
        return SphericalLSH.load(path)
    else:
        raise ValueError(f"Unknown LSH method: {method}")

In [None]:
# key_generation/security_component/metadata_hasher.py

import os
import numpy as np
import hashlib
import joblib

class MetadataHasher:
    """Generate security component from facial metadata"""
    
    def __init__(self, hash_bits=64, metadata_weights=None, random_state=42):
        """
        Initialize metadata hasher
        
        Args:
            hash_bits: Number of bits in the output hash
            metadata_weights: Dictionary of weights for different metadata fields
            random_state: Random seed for reproducibility
        """
        self.hash_bits = hash_bits
        self.metadata_weights = metadata_weights
        self.random_state = random_state
        
        # Set default weights if not provided
        if self.metadata_weights is None:
            self.metadata_weights = self._get_default_weights()
        
        # Set random seed
        np.random.seed(self.random_state)
        
        # Features used for hashing
        self.selected_features = None
        self.normalized_weights = None
    
    def _get_default_weights(self):
        """Get default weights for metadata fields"""
        return {
            # Geometric features (higher weight)
            'inter_ocular_ratio': 1.0,
            'eye_aspect_ratio': 1.0,
            'nose_width_ratio': 1.0,
            'lip_fullness_ratio': 1.0,
            'face_width_height_ratio': 1.0,
            'jaw_angle': 1.0,
            'face_symmetry': 0.5
            # Other features would be included in a full implementation
        }
    
    def fit(self, metadata_list):
        """
        Analyze metadata to determine which features to use
        
        Args:
            metadata_list: List of facial metadata dictionaries
        """
        # Find consistently available features
        if not metadata_list:
            raise ValueError("Need at least one metadata sample to fit")
        
        # Count availability of each feature
        feature_counts = {}
        for metadata in metadata_list:
            for key in metadata:
                if key in self.metadata_weights:
                    feature_counts[key] = feature_counts.get(key, 0) + 1
        
        # Select features that are available in at least 90% of samples
        threshold = 0.9 * len(metadata_list)
        self.selected_features = [
            feature for feature, count in feature_counts.items()
            if count >= threshold and feature in self.metadata_weights
        ]
        
        # Normalize weights
        total_weight = sum(self.metadata_weights[f] for f in self.selected_features)
        self.normalized_weights = {
            f: self.metadata_weights[f] / total_weight 
            for f in self.selected_features
        }
        
        return self
    
    def normalize_metadata(self, metadata):
        """
        Normalize metadata values to [0, 1] range
        
        Args:
            metadata: Dictionary of facial metadata
            
        Returns:
            normalized: Dictionary of normalized metadata
        """
        # Simple min-max normalization within expected ranges
        normalized = {}
        
        for feature in self.selected_features:
            if feature in metadata:
                # Different normalization for different feature types
                if 'ratio' in feature:
                    # Ratios typically have known bounds
                    normalized[feature] = min(1.0, max(0.0, metadata[feature] / 2.0))
                elif 'angle' in feature:
                    # Angles normalized to [0, 1]
                    normalized[feature] = (metadata[feature] % 360) / 360.0
                else:
                    # Default normalization (assume 0-1 already)
                    normalized[feature] = min(1.0, max(0.0, metadata[feature]))
            else:
                # If feature is missing, use median value
                normalized[feature] = 0.5
        
        return normalized
    
    def hash(self, metadata):
        """
        Generate a hash from metadata
        
        Args:
            metadata: Dictionary of facial metadata
            
        Returns:
            binary_hash: Binary array of shape [hash_bits]
        """
        if self.selected_features is None:
            raise ValueError("MetadataHasher needs to be fitted first")
        
        # Normalize metadata
        normalized = self.normalize_metadata(metadata)
        
        # Create a weighted string representation
        feature_strings = []
        for feature in self.selected_features:
            # Weight the feature by repeating it proportionally to its weight
            repetitions = max(1, int(self.normalized_weights[feature] * 10))
            value_str = f"{feature}:{normalized[feature]:.6f}"
            feature_strings.extend([value_str] * repetitions)
        
        # Sort for determinism and join
        feature_strings.sort()
        metadata_str = "|".join(feature_strings)
        
        # Generate SHA-256 hash
        hasher = hashlib.sha256()
        hasher.update(metadata_str.encode('utf-8'))
        hash_bytes = hasher.digest()
        
        # Convert to binary array (take only the bits we need)
        binary_hash = np.unpackbits(np.frombuffer(hash_bytes, dtype=np.uint8))[:self.hash_bits]
        
        return binary_hash

In [None]:
# key_generation/face_key_generator.py

import os
import numpy as np
import time
import joblib

from key_generation.dimensionality_reduction.reducer_factory import get_reducer
from key_generation.similarity_component.lsh_factory import get_lsh
from key_generation.security_component.metadata_hasher import MetadataHasher

class FacialPublicKeyGenerator:
    """Generate facial public keys from face embeddings and metadata"""
    
    def __init__(
        self,
        reduction_method='pca',
        reduction_dims=32,
        lsh_method='spherical',
        similarity_bits=192,
        security_bits=64,
        random_state=42
    ):
        """
        Initialize the key generator
        
        Args:
            reduction_method: Method for dimensionality reduction
            reduction_dims: Target dimensionality after reduction
            lsh_method: Method for locality-sensitive hashing
            similarity_bits: Number of bits for similarity component
            security_bits: Number of bits for security component
            random_state: Random seed for reproducibility
        """
        self.reduction_method = reduction_method
        self.reduction_dims = reduction_dims
        self.lsh_method = lsh_method
        self.similarity_bits = similarity_bits
        self.security_bits = security_bits
        self.random_state = random_state
        
        # Initialize components
        self.reducer = get_reducer(
            method=reduction_method,
            target_dims=reduction_dims,
            random_state=random_state
        )
        
        self.lsh = get_lsh(
            method=lsh_method,
            hash_bits=similarity_bits,
            random_state=random_state
        )
        
        self.metadata_hasher = MetadataHasher(
            hash_bits=security_bits,
            random_state=random_state
        )
        
        self.is_fitted = False
    
    def fit(self, embeddings, metadata_list):
        """
        Fit the key generator to training data
        
        Args:
            embeddings: Array of face embeddings [n_samples, n_features]
            metadata_list: List of facial metadata dictionaries
        """
        # Fit dimensionality reducer
        print(f"Fitting {self.reduction_method} reducer...")
        self.reducer.fit(embeddings)
        
        # Reduce dimensionality
        reduced_embeddings = self.reducer.transform(embeddings)
        
        # Fit LSH hasher
        print(f"Fitting {self.lsh_method} hasher...")
        self.lsh.fit(reduced_embeddings)
        
        # Fit metadata hasher
        print("Fitting metadata hasher...")
        self.metadata_hasher.fit(metadata_list)
        
        self.is_fitted = True
        
        return self
    
    def generate_key(self, embedding, metadata):
        """
        Generate a facial public key
        
        Args:
            embedding: Face embedding vector
            metadata: Dictionary of facial metadata
            
        Returns:
            key: Binary key array
            similarity_component: Similarity component of the key
            security_component: Security component of the key
            generation_time_ms: Key generation time in milliseconds
        """
        if not self.is_fitted:
            raise ValueError("Key generator needs to be fitted first")
        
        # Start timing
        start_time = time.time()
        
        # Reshape embedding if needed
        embedding = np.atleast_2d(embedding)
        
        # Reduce dimensionality
        reduced_embedding = self.reducer.transform(embedding)
        
        # Generate similarity component
        similarity_component = self.lsh.hash(reduced_embedding)[0]
        
        # Generate security component
        security_component = self.metadata_hasher.hash(metadata)
        
        # Combine components
        key = np.concatenate([similarity_component, security_component])
        
        # Measure generation time
        generation_time_ms = (time.time() - start_time) * 1000
        
        return key, similarity_component, security_component, generation_time_ms
    
    def key_to_hex(self, key):
        """
        Convert binary key to hexadecimal string
        
        Args:
            key: Binary key array
            
        Returns:
            hex_key: Hexadecimal string
        """
        # Pack bits into bytes
        key_bytes = np.packbits(key)
        
        # Convert to hex string
        hex_key = ''.join(f"{b:02x}" for b in key_bytes)
        
        return hex_key
    

In [None]:
def compare_keys(self, key1, key2):
    """
    Compare two keys and return similarity
    
    Args:
        key1: First binary key array
        key2: Second binary key array
        
    Returns:
        similarity: Overall similarity (0-1)
        sim_component_similarity: Similarity component similarity (0-1)
        sec_component_match: Security component exact match (True/False)
    """
    # Extract components
    sim_component1 = key1[:self.similarity_bits]
    sec_component1 = key1[self.similarity_bits:]
    
    sim_component2 = key2[:self.similarity_bits]
    sec_component2 = key2[self.similarity_bits:]
    
    # Calculate similarity component similarity
    sim_component_similarity = 1 - (np.sum(sim_component1 != sim_component2) / self.similarity_bits)
    
    # Check if security components match
    sec_component_match = np.array_equal(sec_component1, sec_component2)
    
    # Calculate overall similarity
    sim_weight = self.similarity_bits / (self.similarity_bits + self.security_bits)
    sec_weight = self.security_bits / (self.similarity_bits + self.security_bits)
    
    security_score = 1.0 if sec_component_match else 0.0
    
    similarity = (sim_weight * sim_component_similarity) + (sec_weight * security_score)
    
    return similarity, sim_component_similarity, sec_component_match

def find_similar_keys(self, target_key, key_database, threshold=0.8):
    """
    Find similar keys in a database
    
    Args:
        target_key: Target key to search for
        key_database: Dictionary mapping IDs to keys
        threshold: Similarity threshold (0-1)
        
    Returns:
        matches: List of (id, similarity) tuples for matches above threshold
    """
    matches = []
    
    for key_id, key in key_database.items():
        similarity, _, _ = self.compare_keys(target_key, key)
        
        if similarity >= threshold:
            matches.append((key_id, similarity))
    
    # Sort by similarity (descending)
    matches.sort(key=lambda x: x[1], reverse=True)
    
    return matches

def evaluate_key_quality(self, embeddings, metadata_list, same_person_pairs=None, diff_person_pairs=None):
    """
    Evaluate key quality with similarity metrics
    
    Args:
        embeddings: List of face embeddings
        metadata_list: List of facial metadata dictionaries
        same_person_pairs: List of (idx1, idx2) tuples for same person
        diff_person_pairs: List of (idx1, idx2) tuples for different people
        
    Returns:
        results: Dictionary with evaluation metrics
    """
    if not self.is_fitted:
        raise ValueError("Key generator needs to be fitted first")
    
    # Generate keys for all faces
    keys = []
    for embedding, metadata in zip(embeddings, metadata_list):
        key, _, _, _ = self.generate_key(embedding, metadata)
        keys.append(key)
    
    # If pairs not provided, evaluate all possible pairs
    if same_person_pairs is None and diff_person_pairs is None:
        print("No specific pairs provided; evaluating random sample of all pairs")
        
        # Generate random pairs for evaluation
        np.random.seed(self.random_state)
        all_indices = np.arange(len(keys))
        
        # For simplicity, assume first half are same person in different photos
        # and second half are different people
        same_indices = all_indices[:len(all_indices)//2]
        diff_indices = all_indices[len(all_indices)//2:]
        
        # Generate up to 100 same-person pairs
        same_person_pairs = []
        for i in range(min(100, len(same_indices) * (len(same_indices) - 1) // 2)):
            idx1, idx2 = np.random.choice(same_indices, 2, replace=False)
            same_person_pairs.append((idx1, idx2))
        
        # Generate up to 100 different-person pairs
        diff_person_pairs = []
        for i in range(min(100, len(diff_indices) * (len(diff_indices) - 1) // 2)):
            idx1, idx2 = np.random.choice(diff_indices, 2, replace=False)
            diff_person_pairs.append((idx1, idx2))
    
    # Evaluate same-person pairs
    same_person_similarities = []
    for idx1, idx2 in same_person_pairs:
        similarity, _, _ = self.compare_keys(keys[idx1], keys[idx2])
        same_person_similarities.append(similarity)
    
    # Evaluate different-person pairs
    diff_person_similarities = []
    for idx1, idx2 in diff_person_pairs:
        similarity, _, _ = self.compare_keys(keys[idx1], keys[idx2])
        diff_person_similarities.append(similarity)
    
    # Calculate metrics
    results = {
        'same_person_mean': np.mean(same_person_similarities),
        'same_person_std': np.std(same_person_similarities),
        'diff_person_mean': np.mean(diff_person_similarities),
        'diff_person_std': np.std(diff_person_similarities),
        'separation': np.mean(same_person_similarities) - np.mean(diff_person_similarities)
    }
    
    return results

In [None]:
# key_generation/visualization/key_visualizer.py

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.manifold import TSNE
from sklearn.metrics import roc_curve, auc

def plot_key_bits(key, similarity_bits=192, figsize=(10, 2)):
    """
    Visualize a key's bit pattern
    
    Args:
        key: Binary key array
        similarity_bits: Number of bits in similarity component
        figsize: Figure size
        
    Returns:
        fig: Matplotlib figure
    """
    fig, ax = plt.subplots(figsize=figsize)
    
    # Reshape for visualization
    key_2d = key.reshape(1, -1)
    
    # Create a colormap with white (0) and blue (1)
    cmap = plt.cm.Blues
    
    # Plot heatmap
    sns.heatmap(key_2d, cmap=cmap, cbar=False, ax=ax, 
                xticklabels=False, yticklabels=False)
    
    # Add vertical line to separate similarity and security components
    if similarity_bits < len(key):
        ax.axvline(x=similarity_bits, color='red', linestyle='--', linewidth=2)
        
        # Add labels
        mid_sim = similarity_bits // 2
        mid_sec = similarity_bits + (len(key) - similarity_bits) // 2
        
        ax.text(mid_sim, -0.3, "Similarity Component", ha='center', fontsize=10)
        ax.text(mid_sec, -0.3, "Security Component", ha='center', fontsize=10)
    
    ax.set_title("Facial Public Key Bit Pattern")
    
    return fig

def plot_key_similarities(same_person_similarities, diff_person_similarities, figsize=(8, 6)):
    """
    Plot histogram of key similarities for same person and different people
    
    Args:
        same_person_similarities: List of similarities between same person
        diff_person_similarities: List of similarities between different people
        figsize: Figure size
        
    Returns:
        fig: Matplotlib figure
    """
    fig, ax = plt.subplots(figsize=figsize)
    
    # Plot histograms
    sns.histplot(same_person_similarities, color='green', alpha=0.6, 
                 label='Same Person', kde=True, ax=ax)
    sns.histplot(diff_person_similarities, color='red', alpha=0.6, 
                 label='Different People', kde=True, ax=ax)
    
    # Add labels and title
    ax.set_xlabel('Key Similarity (0-1)')
    ax.set_ylabel('Frequency')
    ax.set_title('Distribution of Key Similarities')
    ax.legend()
    
    # Add vertical line at potential threshold
    threshold = (np.mean(same_person_similarities) + np.mean(diff_person_similarities)) / 2
    ax.axvline(x=threshold, color='blue', linestyle='--', linewidth=2,
              label=f'Potential Threshold: {threshold:.2f}')
    
    return fig

def plot_roc_curve(same_person_similarities, diff_person_similarities, figsize=(8, 6)):
    """
    Plot ROC curve for key matching
    
    Args:
        same_person_similarities: List of similarities between same person
        diff_person_similarities: List of similarities between different people
        figsize: Figure size
        
    Returns:
        fig: Matplotlib figure
    """
    fig, ax = plt.subplots(figsize=figsize)
    
    # Create ground truth labels
    y_true = np.hstack([np.ones(len(same_person_similarities)), 
                       np.zeros(len(diff_person_similarities))])
    
    # Create similarity scores
    y_scores = np.hstack([same_person_similarities, diff_person_similarities])
    
    # Calculate ROC curve
    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)
    
    # Plot ROC curve
    ax.plot(fpr, tpr, color='darkorange', lw=2, 
            label=f'ROC curve (area = {roc_auc:.2f})')
    ax.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    
    # Add labels and title
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    ax.set_title('Receiver Operating Characteristic (ROC) Curve')
    ax.legend(loc="lower right")
    
    return fig, roc_auc

In [None]:
import os
import numpy as np
import json
import time
import argparse
from datetime import datetime
import matplotlib.pyplot as plt

# Import Phase 1 components
from main import process_image  # From Phase 1

# Import Phase 2 components
from key_generation.face_key_generator import FacialPublicKeyGenerator
from key_generation.visualization.key_visualizer import plot_key_bits, plot_key_similarities

def train_key_generator(dataset_path, output_model_path, reduction_method='pca', lsh_method='spherical'):
    """
    Train the facial public key generator on a dataset
    
    Args:
        dataset_path: Path to dataset of Phase 1 results
        output_model_path: Path to save the trained model
        reduction_method: Method for dimensionality reduction
        lsh_method: Method for locality-sensitive hashing
        
    Returns:
        key_generator: Trained FacialPublicKeyGenerator
    """
    print(f"Training key generator with {reduction_method} and {lsh_method}...")
    
    # Collect embeddings and metadata from dataset
    embeddings = []
    metadata_list = []
    
    # Load all files from dataset
    for filename in os.listdir(dataset_path):
        if filename.endswith('.json'):
            file_path = os.path.join(dataset_path, filename)
            
            try:
                # Load Phase 1 result
                with open(file_path, 'r') as f:
                    result = json.load(f)
                
                # Check if valid result
                if 'embedding' in result and 'metadata' in result:
                    embeddings.append(np.array(result['embedding']))
                    metadata_list.append(result['metadata'])
            except Exception as e:
                print(f"Error loading {filename}: {e}")
    
    print(f"Loaded {len(embeddings)} samples from dataset")
    
    if len(embeddings) < 10:
        raise ValueError("Too few samples in dataset (need at least 10)")
    
    # Convert to numpy array
    embeddings = np.array(embeddings)
    
    # Create and train key generator
    key_generator = FacialPublicKeyGenerator(
        reduction_method=reduction_method,
        lsh_method=lsh_method
    )
    
    # Fit the key generator
    key_generator.fit(embeddings, metadata_list)
    
    # Save the model
    os.makedirs(os.path.dirname(output_model_path), exist_ok=True)
    key_generator.save(output_model_path)
    
    print(f"Key generator trained and saved to {output_model_path}")
    
    return key_generator

def generate_key_from_image(image_path, key_generator, output_dir=None, visualize=False):
    """
    Generate a facial public key from an image
    
    Args:
        image_path: Path to input image
        key_generator: Trained FacialPublicKeyGenerator
        output_dir: Directory to save results
        visualize: Whether to visualize the key
        
    Returns:
        result: Dictionary with key generation results
    """
    print(f"Processing image: {image_path}")
    
    # Process image through Phase 1
    phase1_result = process_image(image_path)
    
    # Check for errors
    if 'error' in phase1_result:
        return {'error': phase1_result['error']}
    
    # Extract embedding and metadata
    embedding = np.array(phase1_result['embedding'])
    metadata = phase1_result['metadata']
    
    # Generate key
    start_time = time.time()
    key, sim_component, sec_component, _ = key_generator.generate_key(embedding, metadata)
    key_generation_time = (time.time() - start_time) * 1000  # Convert to ms
    
    # Convert to hex for display
    hex_key = key_generator.key_to_hex(key)
    
    # Prepare result
    result = {
        'input_image': image_path,
        'binary_key': key.tolist(),
        'hex_key': hex_key,
        'generation_time_ms': key_generation_time,
        'timestamp': datetime.now().isoformat()
    }
    
    # Save result if output directory provided
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)
        
        # Generate output filename
        base_filename = os.path.splitext(os.path.basename(image_path))[0]
        output_path = os.path.join(output_dir, f"{base_filename}_key.json")
        
        # Save JSON result
        with open(output_path, 'w') as f:
            json.dump(result, f, indent=2)
        
        # Visualize key if requested
        if visualize:
            fig = plot_key_bits(key, similarity_bits=key_generator.similarity_bits)
            vis_path = os.path.join(output_dir, f"{base_filename}_key_visualization.png")
            fig.savefig(vis_path)
            plt.close(fig)
    
    return result

def main():
    # Parse command line arguments
    parser = argparse.ArgumentParser(description='Facial Public Key Generation (Phase 2)')
    
    # Mode selection
    parser.add_argument('mode', choices=['train', 'generate', 'batch'],
                        help='Operation mode: train model, generate single key, or batch process')
    
    # Common parameters
    parser.add_argument('--model', default='models/key_generator',
                        help='Path to model directory (for saving or loading)')
    
    # Training parameters
    parser.add_argument('--dataset', help='Path to training dataset (Phase 1 results)')
    parser.add_argument('--reduction', default='pca', 
                        choices=['pca', 'umap', 'random_projection'],
                        help='Dimensionality reduction method')
    parser.add_argument('--lsh', default='spherical', 
                        choices=['simhash', 'spherical'],
                        help='LSH method for similarity component')
    
    # Generation parameters
    parser.add_argument('--image', help='Path to input image (for single key generation)')
    parser.add_argument('--input-dir', help='Directory with input images (for batch processing)')
    parser.add_argument('--output-dir', default='results',
                        help='Directory to save results')
    parser.add_argument('--visualize', action='store_true',
                        help='Generate visualizations')
    
    args = parser.parse_args()
    
    # Operation based on mode
    if args.mode == 'train':
        if not args.dataset:
            parser.error("Training mode requires --dataset")
        
        # Train key generator
        train_key_generator(
            dataset_path=args.dataset,
            output_model_path=args.model,
            reduction_method=args.reduction,
            lsh_method=args.lsh
        )
    
    elif args.mode == 'generate':
        if not args.image:
            parser.error("Generate mode requires --image")
        
        # Load key generator
        key_generator = FacialPublicKeyGenerator.load(args.model)
        
        # Generate key from image
        result = generate_key_from_image(
            image_path=args.image,
            key_generator=key_generator,
            output_dir=args.output_dir,
            visualize=args.visualize
        )
        
        # Print result
        if 'error' in result:
            print(f"Error: {result['error']}")
        else:
            print(f"Generated key: {result['hex_key']}")
            print(f"Generation time: {result['generation_time_ms']:.2f} ms")
    
    elif args.mode == 'batch':
        if not args.input_dir:
            parser.error("Batch mode requires --input-dir")
        
        # Load key generator
        key_generator = FacialPublicKeyGenerator.load(args.model)
        
        # Process all images in directory
        processed = 0
        errors = 0
        
        for filename in os.listdir(args.input_dir):
            if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
                image_path = os.path.join(args.input_dir, filename)
                
                # Generate key
                result = generate_key_from_image(
                    image_path=image_path,
                    key_generator=key_generator,
                    output_dir=args.output_dir,
                    visualize=args.visualize
                )
                
                if 'error' in result:
                    print(f"Error processing {filename}: {result['error']}")
                    errors += 1
                else:
                    processed += 1
        
        print(f"Batch processing complete: {processed} images processed, {errors} errors")

if __name__ == "__main__":
    main()

In [None]:
# test_phase2.py

import unittest
import os
import numpy as np
import tempfile
import shutil

from key_generation.dimensionality_reduction.pca_reducer import PCAReducer
from key_generation.similarity_component.simhash import SimHash
from key_generation.security_component.metadata_hasher import MetadataHasher
from key_generation.face_key_generator import FacialPublicKeyGenerator

class TestPhase2(unittest.TestCase):
    
    def setUp(self):
        # Create temporary directory for test files
        self.test_dir = tempfile.mkdtemp()
        
        # Create sample data
        np.random.seed(42)
        self.sample_embeddings = np.random.randn(20, 128)
        
        self.sample_metadata = []
        for i in range(20):
            self.sample_metadata.append({
                'inter_ocular_ratio': 0.43 + np.random.rand() * 0.1,
                'eye_aspect_ratio': 0.3 + np.random.rand() * 0.2,
                'nose_width_ratio': 0.25 + np.random.rand() * 0.1,
                'face_width_height_ratio': 0.7 + np.random.rand() * 0.2,
                'face_symmetry': 0.8 + np.random.rand() * 0.2
            })
    
    def tearDown(self):
        # Remove test directory
        shutil.rmtree(self.test_dir)
    
    def test_pca_reducer(self):
        # Test PCA reducer
        reducer = PCAReducer(target_dims=32)
        reducer.fit(self.sample_embeddings)
        
        # Test dimensionality reduction
        reduced = reducer.transform(self.sample_embeddings)
        self.assertEqual(reduced.shape, (20, 32))
        
        # Test save and load
        save_path = os.path.join(self.test_dir, 'pca_model.pkl')
        reducer.save(save_path)
        
        loaded_reducer = PCAReducer.load(save_path)
        loaded_reduced = loaded_reducer.transform(self.sample_embeddings)
        
        # Check that results are the same
        np.testing.assert_allclose(reduced, loaded_reduced)
    
    def test_simhash(self):
        # Test SimHash
        hasher = SimHash(hash_bits=192)
        hasher.fit(self.sample_embeddings)
        
        # Test hashing
        hashes = hasher.hash(self.sample_embeddings)
        self.assertEqual(hashes.shape, (20, 192))
        
        # Test types
        self.assertTrue(np.issubdtype(hashes.dtype, np.integer))
        self.assertTrue(np.all((hashes == 0) | (hashes == 1)))
        
        # Test save and load
        save_path = os.path.join(self.test_dir, 'simhash_model.pkl')
        hasher.save(save_path)
        
        loaded_hasher = SimHash.load(save_path)
        loaded_hashes = loaded_hasher.hash(self.sample_embeddings)
        
        # Check that results are the same
        np.testing.assert_array_equal(hashes, loaded_hashes)
    
    def test_metadata_hasher(self):
        # Test metadata hasher
        hasher = MetadataHasher(hash_bits=64)
        hasher.fit(self.sample_metadata)
        
        # Test hashing
        hash1 = hasher.hash(self.sample_metadata[0])
        self.assertEqual(hash1.shape, (64,))
        
        # Test determinism (same input should give same hash)
        hash2 = hasher.hash(self.sample_metadata[0])
        np.testing.assert_array_equal(hash1, hash2)
        
        # Different input should give different hash
        hash3 = hasher.hash(self.sample_metadata[1])
        self.assertFalse(np.array_equal(hash1, hash3))
    
    def test_face_key_generator(self):
        # Test full key generator
        key_generator = FacialPublicKeyGenerator(
            reduction_method='pca',
            lsh_method='simhash',
            similarity_bits=192,
            security_bits=64
        )
        
        # Fit the generator
        key_generator.fit(self.sample_embeddings, self.sample_metadata)
        
        # Generate key
        key, sim_component, sec_component, _ = key_generator.generate_key(
            self.sample_embeddings[0], self.sample_metadata[0]
        )
        
        # Check dimensions
        self.assertEqual(len(key), 256)
        self.assertEqual(len(sim_component), 192)
        self.assertEqual(len(sec_component), 64)
        
        # Test hex conversion
        hex_key = key_generator.key_to_hex(key)
        self.assertEqual(len(hex_key), 64)  # 256 bits = 32 bytes = 64 hex chars
        
        # Test key comparison
        key2, _, _, _ = key_generator.generate_key(
            self.sample_embeddings[0], self.sample_metadata[0]
        )
        similarity, sim_component_similarity, sec_component_match = key_generator.compare_keys(key, key2)
        
        # Same input should give same key
        self.assertEqual(similarity, 1.0)
        self.assertEqual(sim_component_similarity, 1.0)
        self.assertTrue(sec_component_match)
        
        # Different faces should give different keys
        key3, _, _, _ = key_generator.generate_key(
            self.sample_embeddings[5], self.sample_metadata[5]
        )
        similarity3, _, _ = key_generator.compare_keys(key, key3)
        self.assertLess(similarity3, 1.0)

if __name__ == '__main__':
    unittest.main()