# Face Recognition Model

## Setup

In [69]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import shutil
from pathlib import Path
import re
import uuid
from tensorflow.keras.metrics import Precision, Recall
import shutil
from tensorflow.keras.mixed_precision import set_global_policy

In [70]:
# Model(inputs = [inputImg, veriImg], outputs = [1,0] )

In [71]:
set_global_policy('mixed_float16')

In [72]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        tf.config.experimental.enable_tensor_float_32()
    except RuntimeError as e:
        print(e)


In [73]:
TRAIN = os.path.join('data', 'training')
TEST = os.path.join('data', 'test')
ARCH = os.path.join('data', 'archive')

## Preprocess

### Data gathering

In [74]:
def split_data():
    for d in [TRAIN, TEST]:
        Path(d).mkdir(parents=True, exist_ok=True)

    all_folders = [
        folder for folder in os.listdir(ARCH)
        if os.path.isdir(os.path.join(ARCH, folder))
    ]

    random.shuffle(all_folders)
    split_idx = int(len(all_folders) * 0.7)
    train_folders = all_folders[:split_idx]
    test_folders = all_folders[split_idx:]

    for folder_name in train_folders:
        src = os.path.join(ARCH, folder_name)
        dest = os.path.join(TRAIN, folder_name)
        shutil.copytree(src, dest)
        print(f"Copied to training: {folder_name}")

    for folder_name in test_folders:
        src = os.path.join(ARCH, folder_name)
        dest = os.path.join(TEST, folder_name)
        shutil.copytree(src, dest)
        print(f"Copied to testing: {folder_name}")

    print(f"\n✅ Done. {len(train_folders)} folders in training, {len(test_folders)} in test.")


In [75]:
# Correcting function because I messed up the top one and did it twice once with folder deleting and once without lol
def remove_duplicate_images(directory):
    if not os.path.exists(directory):
        print(f"Error: Directory '{directory}' does not exist.")
        return
    
    files = os.listdir(directory)
    files_to_delete = []
    pattern = re.compile(r'^(.+)_1(\.[^.]+)$')
    deleted_count = 0
    
    print(f"Scanning directory: {directory}")

    for file in files:
        match = pattern.match(file)
        if match:
            base_name = match.group(1)
            extension = match.group(2)
            original_file = f"{base_name}{extension}"
            if original_file in files:
                files_to_delete.append(file)

    for file in files_to_delete:
        try:
            file_path = os.path.join(directory, file)
            os.remove(file_path)
            print(f"Deleted: {file}")
            deleted_count += 1
        except Exception as e:
            print(f"Error deleting {file}: {str(e)}")
    
    print(f"\nTotal duplicate files deleted: {deleted_count}")


In [76]:
#cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    
    if not ret:
        print("Failed to grab frame")
        break

    frame = frame[120:120+250, 200:200+250, :]
    cv2.imshow('Image Collection', frame)
    key = cv2.waitKey(1) & 0xFF
    if key == ord('a'):
        img_name = os.path.join(ANC, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(img_name, frame)
        print(f"Saved anchor image: {img_name}")
    elif key == ord('p'):
        img_name = os.path.join(POS, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(img_name, frame)
        print(f"Saved positive image: {img_name}")
    elif key == ord('q'):
        print("Quitting...")
        break
    
cap.release()
cv2.destroyAllWindows()

In [77]:
def create_pairs_from_directory(directory):
    person_dirs = [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
    print(f"Found {len(person_dirs)} people in '{directory}'")

    anchor_paths = []
    positive_paths = []
    negative_paths = []

    total_skipped = 0
    for idx, person in enumerate(person_dirs):
        person_path = os.path.join(directory, person)
        person_images = [os.path.join(person_path, f) for f in os.listdir(person_path) 
                         if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

        if len(person_images) < 2:
            total_skipped += 1
            continue

        for i in range(len(person_images)):
            for j in range(i+1, len(person_images)):
                anchor_paths.append(person_images[i])
                positive_paths.append(person_images[j])

        other_people = [p for p in person_dirs if p != person]
        for anchor_img in person_images[:10]:
            sampled_others = random.sample(other_people, min(len(other_people), 10))  
            for other_person in sampled_others:
                other_person_path = os.path.join(directory, other_person)
                other_person_images = [os.path.join(other_person_path, f) 
                                       for f in os.listdir(other_person_path) 
                                       if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

                if not other_person_images:
                    continue

                for _ in range(min(5, len(other_person_images))):
                    negative_img = random.choice(other_person_images)
                    anchor_paths.append(anchor_img)
                    negative_paths.append(negative_img)

        if (idx + 1) % 100 == 0 or (idx + 1) == len(person_dirs):
            print(f"[{idx + 1}/{len(person_dirs)}] Processed '{person}': "
                  f"{len(person_images)} imgs, total positives: {len(positive_paths)}, "
                  f"negatives: {len(negative_paths)}")

    print(f"\n✅ Finished pair creation.")
    print(f"  - Skipped people with <2 images: {total_skipped}")
    print(f"  - Total positive pairs: {len(positive_paths)}")
    print(f"  - Total negative pairs: {len(negative_paths)}")

    positive_labels = tf.ones(len(positive_paths))
    negative_labels = tf.zeros(len(negative_paths))

    all_anchor_paths = anchor_paths + anchor_paths
    all_comparison_paths = positive_paths + negative_paths
    all_labels = tf.concat([positive_labels, negative_labels], axis=0)

    print(f"  - Final dataset size: {len(all_labels)} pairs (anchors: {len(all_anchor_paths)})")

    anchor_ds = tf.data.Dataset.from_tensor_slices(all_anchor_paths)
    comparison_ds = tf.data.Dataset.from_tensor_slices(all_comparison_paths)
    labels_ds = tf.data.Dataset.from_tensor_slices(all_labels)

    return anchor_ds, comparison_ds, labels_ds


In [78]:
def create_optimized_pairs_from_directory(directory, max_people=1500, max_pairs_per_person=200):
    person_dirs = [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]

    if len(person_dirs) > max_people:
        person_dirs = np.random.choice(person_dirs, max_people, replace=False)
    
    print(f"Using {len(person_dirs)} people (limited from potentially more)")
    pos_anchor_paths = []
    pos_comparison_paths = []
    neg_anchor_paths = []
    neg_comparison_paths = []

    for idx, person in enumerate(person_dirs):
        person_path = os.path.join(directory, person)
        person_images = [os.path.join(person_path, f) for f in os.listdir(person_path) 
                         if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

        if len(person_images) < 2:
            continue

        pair_count = 0
        for i in range(len(person_images)):
            if pair_count >= max_pairs_per_person:
                break
            for j in range(i+1, len(person_images)):
                if pair_count >= max_pairs_per_person:
                    break
                pos_anchor_paths.append(person_images[i])
                pos_comparison_paths.append(person_images[j])
                pair_count += 1

    print(f"Created {len(pos_anchor_paths)} positive pairs")

    target_negative_pairs = len(pos_anchor_paths)
    
    for idx, person in enumerate(person_dirs):
        if len(neg_anchor_paths) >= target_negative_pairs:
            break
            
        person_path = os.path.join(directory, person)
        person_images = [os.path.join(person_path, f) for f in os.listdir(person_path) 
                         if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

        if len(person_images) < 1:
            continue

        other_people = [p for p in person_dirs if p != person]
        if not other_people:
            continue

        for anchor_img in person_images: 
            if len(neg_anchor_paths) >= target_negative_pairs:
                break

            sampled_others = np.random.choice(other_people, min(len(other_people), 20), replace=False)
            
            for other_person in sampled_others:
                if len(neg_anchor_paths) >= target_negative_pairs:
                    break
                    
                other_person_path = os.path.join(directory, other_person)
                other_person_images = [os.path.join(other_person_path, f) 
                                       for f in os.listdir(other_person_path) 
                                       if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

                if not other_person_images:
                    continue

                for _ in range(min(8, len(other_person_images))):  
                    if len(neg_anchor_paths) >= target_negative_pairs:
                        break
                    negative_img = np.random.choice(other_person_images)
                    neg_anchor_paths.append(anchor_img)
                    neg_comparison_paths.append(negative_img)

    print(f"Created {len(neg_anchor_paths)} negative pairs")
    print(f"✅ Final dataset: {len(pos_anchor_paths)} positive, {len(neg_anchor_paths)} negative pairs")
    print(f"✅ Total pairs: {len(pos_anchor_paths) + len(neg_anchor_paths)}")
    
    all_anchor_paths = pos_anchor_paths + neg_anchor_paths
    all_comparison_paths = pos_comparison_paths + neg_comparison_paths

    positive_labels = tf.ones(len(pos_anchor_paths))
    negative_labels = tf.zeros(len(neg_anchor_paths))
    all_labels = tf.concat([positive_labels, negative_labels], axis=0)

    return all_anchor_paths, all_comparison_paths, all_labels


### Preprocess

In [79]:
def preprocess(img_path):
    byte_img = tf.io.read_file(img_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img,(100,100))
    img = img/255
    return img

In [80]:
@tf.function
def optimized_preprocess(img_path):
    byte_img = tf.io.read_file(img_path)
    img = tf.io.decode_jpeg(byte_img, channels=3)
    img = tf.image.resize(img, (100, 100), method='bilinear')
    img = tf.cast(img, tf.float16) / 255.0 
    return img

In [81]:
def preproc_twin(in_img, valid_img, label):
    return(preprocess(in_img), preprocess(valid_img), label)

In [82]:
@tf.function
def optimized_preproc_twin(anchor_path, comparison_path, label):
    return (
        optimized_preprocess(anchor_path), 
        optimized_preprocess(comparison_path), 
        tf.cast(label, tf.float16)
    )

In [83]:
def prepare_datasets():
    anchor_ds, comparison_ds, labels_ds = create_pairs_from_directory(TRAIN)
    
    dataset = tf.data.Dataset.zip((anchor_ds, comparison_ds, labels_ds))
    dataset = dataset.map(preproc_twin)
    dataset = dataset.cache()
    dataset = dataset.shuffle(buffer_size=1024)
    
    dataset_size = tf.data.experimental.cardinality(dataset).numpy()
    train_size = int(dataset_size * 0.8)
    
    train_dataset = dataset.take(train_size)
    val_dataset = dataset.skip(train_size)
    
    train_dataset = train_dataset.batch(16)
    train_dataset = train_dataset.prefetch(8)
    
    val_dataset = val_dataset.batch(16)
    val_dataset = val_dataset.prefetch(8)
    
    return train_dataset, val_dataset

In [84]:
def prepare_optimized_datasets(train_dir, batch_size=32, prefetch_size=tf.data.AUTOTUNE):
    anchor_paths, comparison_paths, labels = create_optimized_pairs_from_directory(
        train_dir, max_people=1500, max_pairs_per_person=30
    )
    
    anchor_ds = tf.data.Dataset.from_tensor_slices(anchor_paths)
    comparison_ds = tf.data.Dataset.from_tensor_slices(comparison_paths)
    labels_ds = tf.data.Dataset.from_tensor_slices(labels)
    
    dataset = tf.data.Dataset.zip((anchor_ds, comparison_ds, labels_ds))

    dataset = dataset.map(
        optimized_preproc_twin, 
        num_parallel_calls=tf.data.AUTOTUNE,  
        deterministic=False  
    )

    dataset = dataset.cache()  
    dataset = dataset.shuffle(buffer_size=min(10000, len(anchor_paths)))
    dataset_size = len(anchor_paths)
    train_size = int(dataset_size * 0.8)
    
    train_dataset = dataset.take(train_size)
    val_dataset = dataset.skip(train_size)
    train_dataset = train_dataset.batch(batch_size, drop_remainder=True)
    train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
    
    val_dataset = val_dataset.batch(batch_size, drop_remainder=True)
    val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)
    
    return train_dataset, val_dataset


In [85]:
def create_dynamic_dataset_generator(directory, max_people=1500, max_pairs_per_person=50):
    person_dirs = [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
    person_images = {}
    
    # Pre-load all image paths
    for person in person_dirs:
        person_path = os.path.join(directory, person)
        images = [os.path.join(person_path, f) for f in os.listdir(person_path) 
                 if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
        if len(images) >= 2:
            person_images[person] = images
    
    people_list = list(person_images.keys())
    if len(people_list) > max_people:
        people_list = np.random.choice(people_list, max_people, replace=False).tolist()
        
    print(f"Loaded {len(people_list)} people for dynamic generation")
    
    def generate_epoch_dataset(epoch_num):
        # Set different random seed for each epoch
        np.random.seed(42 + epoch_num)
        random.seed(42 + epoch_num)
        
        pos_anchor_paths = []
        pos_comparison_paths = []
        neg_anchor_paths = []
        neg_comparison_paths = []
        
        # Generate positive pairs
        print(f"Generating positive pairs for epoch {epoch_num}...")
        for person in people_list:
            images = person_images[person]
            
            # Create positive pairs - sample different combinations each epoch
            num_pairs = min(max_pairs_per_person, len(images) * (len(images) - 1) // 2)
            
            pairs_created = 0
            attempts = 0
            max_attempts = num_pairs * 3  # Prevent infinite loops
            
            while pairs_created < num_pairs and attempts < max_attempts:
                i = random.randint(0, len(images) - 1)
                j = random.randint(0, len(images) - 1)
                if i != j:  # Different images
                    pos_anchor_paths.append(images[i])
                    pos_comparison_paths.append(images[j])
                    pairs_created += 1
                attempts += 1
        
        # Generate negative pairs
        print(f"Generating negative pairs for epoch {epoch_num}...")
        target_negative_pairs = len(pos_anchor_paths)  # Match positive pairs
        
        for person in people_list:
            if len(neg_anchor_paths) >= target_negative_pairs:
                break
                
            images = person_images[person]
            other_people = [p for p in people_list if p != person]
            
            if not other_people:
                continue
                
            # Sample anchor images for this person
            anchor_samples = min(10, len(images))
            selected_anchors = random.sample(images, anchor_samples)
            
            for anchor_img in selected_anchors:
                if len(neg_anchor_paths) >= target_negative_pairs:
                    break
                    
                # Pick random other people for negatives
                num_others = min(10, len(other_people))
                selected_others = random.sample(other_people, num_others)
                
                for other_person in selected_others:
                    if len(neg_anchor_paths) >= target_negative_pairs:
                        break
                        
                    other_images = person_images[other_person]
                    if other_images:
                        negative_img = random.choice(other_images)
                        neg_anchor_paths.append(anchor_img)
                        neg_comparison_paths.append(negative_img)
        
        # Balance the dataset
        min_size = min(len(pos_anchor_paths), len(neg_anchor_paths))
        
        # Randomly sample to balance
        pos_indices = random.sample(range(len(pos_anchor_paths)), min_size)
        neg_indices = random.sample(range(len(neg_anchor_paths)), min_size)
        
        final_anchors = [pos_anchor_paths[i] for i in pos_indices] + [neg_anchor_paths[i] for i in neg_indices]
        final_comparisons = [pos_comparison_paths[i] for i in pos_indices] + [neg_comparison_paths[i] for i in neg_indices]
        final_labels = [1.0] * min_size + [0.0] * min_size
        
        # Shuffle everything together
        combined = list(zip(final_anchors, final_comparisons, final_labels))
        random.shuffle(combined)
        final_anchors, final_comparisons, final_labels = zip(*combined)
        
        print(f"Epoch {epoch_num}: Generated {len(final_labels)} pairs ({min_size} pos, {min_size} neg)")
        
        return list(final_anchors), list(final_comparisons), list(final_labels)
    
    return generate_epoch_dataset


## Model

### Building

In [86]:
def embeding_make():
    in_ = Input(shape=(100,100,3), name="in img")

    c1 = Conv2D(64, (10,10), activation='relu')(in_)
    p1 = MaxPooling2D(64, (2,2), padding='same')(c1)

    c2 = Conv2D(128, (7,7), activation='relu')(p1)
    p2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    c3 = Conv2D(128, (4,4), activation='relu')(p2)
    p3 = MaxPooling2D(64, (2,2), padding='same')(c3)

    c4 = Conv2D(256, (4,4), activation='relu')(p3)
    f1 = Flatten()(c4)
    d1 = Dense(4096,activation='sigmoid')(f1)

    return Model(inputs=in_, outputs=d1, name='embedding')

In [87]:
def create_optimized_embedding():
    inputs = tf.keras.Input(shape=(100, 100, 3), name="input_img")

    x = tf.keras.layers.SeparableConv2D(64, (10, 10), activation='relu', padding='same')(inputs)
    x = tf.keras.layers.MaxPooling2D((2, 2))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    
    x = tf.keras.layers.SeparableConv2D(128, (7, 7), activation='relu', padding='same')(x)
    x = tf.keras.layers.MaxPooling2D((2, 2))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    
    x = tf.keras.layers.SeparableConv2D(128, (4, 4), activation='relu', padding='same')(x)
    x = tf.keras.layers.MaxPooling2D((2, 2))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    
    x = tf.keras.layers.SeparableConv2D(256, (4, 4), activation='relu', padding='same')(x)
    x = tf.keras.layers.GlobalAveragePooling2D()(x)

    outputs = tf.keras.layers.Dense(512, activation='sigmoid', dtype='float32')(x) 
    
    return tf.keras.Model(inputs=inputs, outputs=outputs, name='optimized_embedding')

In [88]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    def call(self, in_embed, valid_embed):
        return tf.math.abs(in_embed - valid_embed)

In [89]:
def make_model(): #Simanese
    input_img = Input(name='input_img', shape=(100,100,3))
    validation_img = Input(name='validation_img', shape=(100,100,3))

    model_layer = L1Dist()
    model_layer.name = 'distance'
    distances = model_layer(embedding(input_img), embedding(validation_img))

    classifier = Dense(1,activation='sigmoid')(distances)

    return Model(inputs=[input_img, validation_img], outputs=classifier, name='SimaneseNetwork')

### Training

In [90]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)

In [91]:
embedding = create_optimized_embedding()
siamese_model = make_model()

In [92]:
checkpoints = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoints, 'ckpt2')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [93]:
@tf.function
def t_step(batch, model, optimizer, loss_fn):
    with tf.GradientTape() as tape:  
        anchor_img, comparison_img, y_true = batch
        y_pred = model([anchor_img, comparison_img], training=True)
        loss = loss_fn(y_true, y_pred)

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss

In [94]:
@tf.function
def optimized_train_step(batch, model, optimizer, loss_fn):
    with tf.GradientTape() as tape:
        anchor_img, comparison_img, y_true = batch
        y_pred = model([anchor_img, comparison_img], training=True)
        
        y_pred = tf.cast(y_pred, tf.float32)
        y_true = tf.cast(y_true, tf.float32)
        
        loss = loss_fn(y_true, y_pred)
        scaled_loss = optimizer.get_scaled_loss(loss) if hasattr(optimizer, 'get_scaled_loss') else loss

    scaled_gradients = tape.gradient(scaled_loss, model.trainable_variables)
    if hasattr(optimizer, 'get_unscaled_gradients'):
        gradients = optimizer.get_unscaled_gradients(scaled_gradients)
    else:
        gradients = scaled_gradients
    gradients = [tf.clip_by_norm(grad, 1.0) if grad is not None else grad for grad in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    return loss

In [95]:
def train(data, epochs):
    for epoch in range(1, epochs+1):
        print('\n Epoch {}/{}'.format(epoch, epochs))
        progbar = tf.keras.utils.Progbar(len(data))  
        for idx, batch in enumerate(data): 
            t_step(batch)
            progbar.update(idx+1)
        
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix) 

In [96]:
def retrain_model(model, epochs=50):
    train_loss = tf.keras.metrics.Mean(name='train_loss')
    train_accuracy = tf.keras.metrics.BinaryAccuracy(name='train_accuracy')
    val_accuracy = tf.keras.metrics.BinaryAccuracy(name='val_accuracy')

    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}")
        
        train_loss.reset_state()
        train_accuracy.reset_state()
        
        progress_bar = tf.keras.utils.Progbar(len(train_dataset))
        for batch_idx, batch in enumerate(train_dataset):
            loss = t_step(batch, model, opt, binary_cross_loss)
            
            train_loss(loss)
            train_accuracy(batch[2], model([batch[0], batch[1]], training=False))
            progress_bar.update(batch_idx + 1)

        val_accuracy.reset_state()
        for batch in val_dataset:
            val_preds = model([batch[0], batch[1]], training=False)
            val_accuracy(batch[2], val_preds)

        print(f"Loss: {train_loss.result():.4f}, Accuracy: {train_accuracy.result():.4f}, Val Accuracy: {val_accuracy.result():.4f}")
        if (epoch + 1) % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)
    
    return model

In [97]:
def optimized_retrain_model(model, train_dataset, val_dataset, epochs=10):
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
    if hasattr(tf.keras.mixed_precision, 'LossScaleOptimizer'):
        optimizer = tf.keras.mixed_precision.LossScaleOptimizer(optimizer)
   
    loss_fn = tf.keras.losses.BinaryCrossentropy()
    train_loss = tf.keras.metrics.Mean(name='train_loss')
    train_accuracy = tf.keras.metrics.BinaryAccuracy(name='train_accuracy')
    val_accuracy = tf.keras.metrics.BinaryAccuracy(name='val_accuracy')
    best_val_acc = 0
    patience = 3
    wait = 0
    best_weights_saved = False
   
    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}")
        train_loss.reset_state()
        train_accuracy.reset_state()
        num_batches = tf.data.experimental.cardinality(train_dataset).numpy()
        progress_bar = tf.keras.utils.Progbar(num_batches)
       
        for batch_idx, batch in enumerate(train_dataset):
            loss = optimized_train_step(batch, model, optimizer, loss_fn)
            train_loss(loss)
            preds = model([batch[0], batch[1]], training=False)
            train_accuracy(tf.cast(batch[2], tf.float32), tf.cast(preds, tf.float32))
           
            progress_bar.update(batch_idx + 1)
       
        val_accuracy.reset_state()
        for batch in val_dataset:
            val_preds = model([batch[0], batch[1]], training=False)
            val_accuracy(tf.cast(batch[2], tf.float32), tf.cast(val_preds, tf.float32))
       
        current_val_acc = val_accuracy.result()
        print(f"Loss: {train_loss.result():.4f}, "
              f"Accuracy: {train_accuracy.result():.4f}, "
              f"Val Accuracy: {current_val_acc:.4f}")
        
        if current_val_acc > best_val_acc:
            best_val_acc = current_val_acc
            wait = 0
            model.save_weights('best_model.weights.h5')
            best_weights_saved = True
            print(f"New best validation accuracy: {best_val_acc:.4f} - weights saved")
        else:
            wait += 1
            if wait >= patience:
                print(f"Early stopping at epoch {epoch+1}")
                break

    if best_weights_saved:
        model.load_weights('best_model.weights.h5')
        print("Loaded best weights")
    else:
        print("No improvement found, keeping current weights")
    
    return model

In [106]:
def create_dynamic_dataset_generator(directory, max_people=1500, max_pairs_per_person=50):
    """
    Creates a generator that produces different dataset each epoch
    """
    person_dirs = [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
    person_images = {}
    
    # Pre-load all image paths
    for person in person_dirs:
        person_path = os.path.join(directory, person)
        images = [os.path.join(person_path, f) for f in os.listdir(person_path) 
                 if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
        if len(images) >= 2:
            person_images[person] = images
    
    people_list = list(person_images.keys())
    if len(people_list) > max_people:
        people_list = np.random.choice(people_list, max_people, replace=False).tolist()
        
    print(f"Loaded {len(people_list)} people for dynamic generation")
    
    def generate_epoch_dataset(epoch_num):
        # Set different random seed for each epoch
        np.random.seed(42 + epoch_num)
        random.seed(42 + epoch_num)
        
        pos_anchor_paths = []
        pos_comparison_paths = []
        neg_anchor_paths = []
        neg_comparison_paths = []
        
        # Generate positive pairs
        print(f"Generating positive pairs for epoch {epoch_num}...")
        for person in people_list:
            images = person_images[person]
            
            # Create positive pairs - sample different combinations each epoch
            num_pairs = min(max_pairs_per_person, len(images) * (len(images) - 1) // 2)
            
            pairs_created = 0
            attempts = 0
            max_attempts = num_pairs * 3  # Prevent infinite loops
            
            while pairs_created < num_pairs and attempts < max_attempts:
                i = random.randint(0, len(images) - 1)
                j = random.randint(0, len(images) - 1)
                if i != j:  # Different images
                    pos_anchor_paths.append(images[i])
                    pos_comparison_paths.append(images[j])
                    pairs_created += 1
                attempts += 1
        
        # Generate negative pairs
        print(f"Generating negative pairs for epoch {epoch_num}...")
        target_negative_pairs = len(pos_anchor_paths)  # Match positive pairs
        
        for person in people_list:
            if len(neg_anchor_paths) >= target_negative_pairs:
                break
                
            images = person_images[person]
            other_people = [p for p in people_list if p != person]
            
            if not other_people:
                continue
                
            # Sample anchor images for this person
            anchor_samples = min(10, len(images))
            selected_anchors = random.sample(images, anchor_samples)
            
            for anchor_img in selected_anchors:
                if len(neg_anchor_paths) >= target_negative_pairs:
                    break
                    
                # Pick random other people for negatives
                num_others = min(10, len(other_people))
                selected_others = random.sample(other_people, num_others)
                
                for other_person in selected_others:
                    if len(neg_anchor_paths) >= target_negative_pairs:
                        break
                        
                    other_images = person_images[other_person]
                    if other_images:
                        negative_img = random.choice(other_images)
                        neg_anchor_paths.append(anchor_img)
                        neg_comparison_paths.append(negative_img)
        
        # Balance the dataset
        min_size = min(len(pos_anchor_paths), len(neg_anchor_paths))
        
        # Randomly sample to balance
        pos_indices = random.sample(range(len(pos_anchor_paths)), min_size)
        neg_indices = random.sample(range(len(neg_anchor_paths)), min_size)
        
        final_anchors = [pos_anchor_paths[i] for i in pos_indices] + [neg_anchor_paths[i] for i in neg_indices]
        final_comparisons = [pos_comparison_paths[i] for i in pos_indices] + [neg_comparison_paths[i] for i in neg_indices]
        final_labels = [1.0] * min_size + [0.0] * min_size
        
        # Shuffle everything together
        combined = list(zip(final_anchors, final_comparisons, final_labels))
        random.shuffle(combined)
        final_anchors, final_comparisons, final_labels = zip(*combined)
        
        print(f"Epoch {epoch_num}: Generated {len(final_labels)} pairs ({min_size} pos, {min_size} neg)")
        
        return list(final_anchors), list(final_comparisons), list(final_labels)
    
    return generate_epoch_dataset

def train_with_dynamic_datasets(model, train_directory, test_directory, epochs=20, batch_size=32):
    """
    Training loop with dynamic dataset generation + fixed validation
    """
    # Simple, robust training step function
    @tf.function
    def simple_train_step(batch, model, optimizer, loss_fn):
        anchor_img, comparison_img, y_true = batch
        
        with tf.GradientTape() as tape:
            y_pred = model([anchor_img, comparison_img], training=True)
            y_pred = tf.cast(y_pred, tf.float32)
            y_true = tf.cast(y_true, tf.float32)
            loss = loss_fn(y_true, y_pred)
        
        gradients = tape.gradient(loss, model.trainable_variables)
        # Clip gradients to prevent exploding gradients
        gradients = [tf.clip_by_norm(grad, 1.0) if grad is not None else grad for grad in gradients]
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
        return loss
    
    # Dynamic training data generator
    train_generator = create_dynamic_dataset_generator(train_directory, max_people=1500, max_pairs_per_person=30)
    
    # Fixed validation dataset (created once, reused every epoch)
    print("Creating fixed validation dataset...")
    val_anchors, val_comparisons, val_labels = create_optimized_pairs_from_directory(
        test_directory, max_people=300, max_pairs_per_person=50
    )
    
    print(f"Validation dataset: {len(val_labels)} pairs")
    
    # Create validation TensorFlow dataset
    val_anchor_ds = tf.data.Dataset.from_tensor_slices(val_anchors)
    val_comparison_ds = tf.data.Dataset.from_tensor_slices(val_comparisons)
    val_labels_ds = tf.data.Dataset.from_tensor_slices(val_labels)
    
    val_dataset = tf.data.Dataset.zip((val_anchor_ds, val_comparison_ds, val_labels_ds))
    val_dataset = val_dataset.map(optimized_preproc_twin, num_parallel_calls=tf.data.AUTOTUNE)
    val_dataset = val_dataset.batch(batch_size, drop_remainder=True)
    val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)
    
    # Training setup
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
    loss_fn = tf.keras.losses.BinaryCrossentropy()
    
    train_loss = tf.keras.metrics.Mean(name='train_loss')
    train_accuracy = tf.keras.metrics.BinaryAccuracy(name='train_accuracy')
    val_accuracy = tf.keras.metrics.BinaryAccuracy(name='val_accuracy')
    
    best_val_acc = 0
    patience = 5
    wait = 0
    
    for epoch in range(epochs):
        print(f"\n=== Epoch {epoch+1}/{epochs} ===")
        
        # Generate NEW training dataset for this epoch
        anchors, comparisons, labels = train_generator(epoch)
        print(f"Training dataset: {len(labels)} pairs")
        
        # Create TensorFlow dataset for this epoch
        anchor_ds = tf.data.Dataset.from_tensor_slices(anchors)
        comparison_ds = tf.data.Dataset.from_tensor_slices(comparisons)
        labels_ds = tf.data.Dataset.from_tensor_slices(labels)
        
        train_dataset = tf.data.Dataset.zip((anchor_ds, comparison_ds, labels_ds))
        train_dataset = train_dataset.map(optimized_preproc_twin, num_parallel_calls=tf.data.AUTOTUNE)
        train_dataset = train_dataset.batch(batch_size, drop_remainder=True)
        train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
        
        # Training phase
        train_loss.reset_state()
        train_accuracy.reset_state()
        
        num_batches = tf.data.experimental.cardinality(train_dataset).numpy()
        progress_bar = tf.keras.utils.Progbar(num_batches)
        
        for batch_idx, batch in enumerate(train_dataset):
            # Use the simple training step
            loss = simple_train_step(batch, model, optimizer, loss_fn)
            
            # Ensure loss is not None and is a proper tensor
            if loss is not None:
                train_loss(loss)
                
                preds = model([batch[0], batch[1]], training=False)
                train_accuracy(tf.cast(batch[2], tf.float32), tf.cast(preds, tf.float32))
            else:
                print(f"Warning: Loss is None at batch {batch_idx}")
            
            progress_bar.update(batch_idx + 1)
        
        # Validation phase (using FIXED validation set)
        val_accuracy.reset_state()
        for batch in val_dataset:
            val_preds = model([batch[0], batch[1]], training=False)
            val_accuracy(tf.cast(batch[2], tf.float32), tf.cast(val_preds, tf.float32))
        
        current_val_acc = val_accuracy.result()
        print(f"Loss: {train_loss.result():.4f}, "
              f"Train Acc: {train_accuracy.result():.4f}, "
              f"Val Acc: {current_val_acc:.4f}")
        
        # Early stopping and best model saving
        if current_val_acc > best_val_acc:
            best_val_acc = current_val_acc
            wait = 0
            model.save_weights('best_dynamic_model.weights.h5')
            print(f"🎯 New best validation accuracy: {best_val_acc:.4f}")
        else:
            wait += 1
            if wait >= patience:
                print(f"⏹️ Early stopping at epoch {epoch+1}")
                break
        
        # Save checkpoint periodically
        if (epoch + 1) % 5 == 0:
            model.save_weights(f'dynamic_model_epoch_{epoch+1}.weights.h5')
    
    # Load best weights
    if os.path.exists('best_dynamic_model.weights.h5'):
        model.load_weights('best_dynamic_model.weights.h5')
        print(f"✅ Training complete. Best validation accuracy: {best_val_acc:.4f}")
    
    return model

In [99]:
epochs = 10

In [100]:
#train(train_data, epochs)

In [101]:
#retrain_model(siamese_model, epochs)

In [102]:
train_dataset, val_dataset = prepare_optimized_datasets(
    TRAIN, 
    batch_size=64,
    prefetch_size=tf.data.AUTOTUNE
)

Using 1500 people (limited from potentially more)
Created 3152 positive pairs
Created 3152 negative pairs
✅ Final dataset: 3152 positive, 3152 negative pairs
✅ Total pairs: 6304


In [104]:
#optimized_model = optimized_retrain_model(
#    siamese_model, 
#    train_dataset, 
#    val_dataset, 
#    epochs=20
#)

In [None]:
trained_model = train_with_dynamic_datasets(
    model=siamese_model,
    train_directory=TRAIN,
    test_directory=TEST,
    epochs=20,
    batch_size=32
)

Loaded 1159 people for dynamic generation
Creating fixed validation dataset...
Using 300 people (limited from potentially more)
Created 771 positive pairs
Created 771 negative pairs
✅ Final dataset: 771 positive, 771 negative pairs
✅ Total pairs: 1542
Validation dataset: 1542 pairs

=== Epoch 1/20 ===
Generating positive pairs for epoch 0...
Generating negative pairs for epoch 0...
Epoch 0: Generated 17026 pairs (8513 pos, 8513 neg)
Training dataset: 17026 pairs
[1m532/532[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23354s[0m 44s/step
Loss: 0.6918, Train Acc: 0.5256, Val Acc: 0.5189
🎯 New best validation accuracy: 0.5189

=== Epoch 2/20 ===
Generating positive pairs for epoch 1...
Generating negative pairs for epoch 1...
Epoch 1: Generated 17038 pairs (8519 pos, 8519 neg)
Training dataset: 17038 pairs
[1m532/532[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21854s[0m 41s/step
Loss: 0.6856, Train Acc: 0.5403, Val Acc: 0.5684
🎯 New best validation accuracy: 0.5684

=== Epoch 3/20

In [None]:
trained_model.save('face_verification_v2.h5')

### Test / Evaluate model

In [None]:
test_in, test_val, y_true = test_data.as_numpy_iterator().next()
y_hat = siamese_model.predict([test_in, test_val])

In [None]:
[1 if prediction > 0.5 else 0 for prediction in y_hat]

In [None]:
y_true

In [None]:
m = Recall()
m.update_state(y_true, y_hat)
m.result().numpy()

In [None]:
m = Precision()
m.update_state(y_true, y_hat)
m.result().numpy()

In [None]:
plt.figure(figsize=(10,8))
plt.subplot(1,2,1)
plt.imshow(test_in[0])
plt.subplot(1,2,2)
plt.imshow(test_val[0])
plt.show()

In [None]:
siamese_model.save('face_verification.h5')

In [None]:
model = tf.keras.models.load_model('face_verification.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
def test_on_two_images(img1_path, img2_path, model):
    def preprocess_single_image(img_path):
        byte_img = tf.io.read_file(img_path)
        img = tf.io.decode_jpeg(byte_img)
        img = tf.image.resize(img, (100, 100))
        img = img / 255.0
        return img

    img1 = preprocess_single_image(img1_path)
    img2 = preprocess_single_image(img2_path)

    # Add batch dimension
    img1 = tf.expand_dims(img1, axis=0)
    img2 = tf.expand_dims(img2, axis=0)

    # Predict similarity
    result = model.predict([img1, img2])
    print(f"Similarity score: {result[0][0]:.4f}")
    
    if result[0][0] > 0.5:
        print("✅ Match: Likely the same person")
    else:
        print("❌ No Match: Likely different people")

test_on_two_images('C:/Users/lokna/Projects/MyReactNativeApp/extensions/face_auth/data/positive/496e69d0-3499-11f0-a4ad-e8fb1c79b654.jpg', 'C:/Users/lokna/Projects/MyReactNativeApp/extensions/face_auth/data/positive/4a5df055-3499-11f0-ae89-e8fb1c79b654.jpg', model)
