**Setup**

For quicker model training

In [235]:
import tensorflow as tf
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

Install dependencies

In [None]:
%pip install tensorflow==2.15.1 opencv-python matplotlib

In [237]:

import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

Import dependencies

In [238]:
#Import tensor flow dependencies - Functional API

from tensorflow.keras.models import Model 
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten, BatchNormalization
import tensorflow as tf


#Model: Lets you build and train a neural network.
#Layer: The base class for all Keras layers.
#Conv2D: Adds convolutional layers to extract image features.
#Dense: Fully connected layer for making decisions.
#MaxPooling2D: Reduces image size while keeping key features.
#Input: Defines the input shape of the model.
#Flatten: Turns multi-dimensional data into a flat vector.


Set GPU growth

In [239]:
#Avoiding out of memory errors for memory consumption
gpus=tf.config.experimental.list_physical_devices('GPU')

# Enable memory growth for each GPU, so TensorFlow uses memory as needed instead of pre-allocating all at once
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

Create folder structures

In [240]:
#Setting paths for directories

POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
import os
import shutil
import random

# Look for Bush photos in the negative directory
bush_images = [img for img in os.listdir(NEG_PATH) if img.startswith('George_W_Bush')]
print(f"Found {len(bush_images)} Bush images")

# Shuffle the images for random split
random.shuffle(bush_images)

# Split point - half for anchor, half for positive
split = len(bush_images) // 2

# Move (not copy) to anchor and positive directories
for i, img in enumerate(bush_images):
    source_path = os.path.join(NEG_PATH, img)
    if i < split:
        destination_path = os.path.join(ANC_PATH, img)
    else:
        destination_path = os.path.join(POS_PATH, img)
    # Move the file (removes it from negatives)
    shutil.move(source_path, destination_path)

print(f"Moved {split} images to anchor directory")
print(f"Moved {len(bush_images) - split} images to positive directory")

In [242]:
#Getting our imaage directories as datasets

anchor=tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(200)
positive=tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(200)
negative=tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(200)


In [300]:
# Modify how we create training pairs to ensure equal positive/negative examples
positive_samples = min(len(list(anchor)), len(list(positive)))
negative_samples = positive_samples
anchor = anchor.take(positive_samples)
positive = positive.take(positive_samples)
negative = negative.take(negative_samples)

PREPROCESS IMAGES

In [243]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    # Add basic image augmentation for better model generalization
    img = tf.image.random_flip_left_right(img)  # Random horizontal flip
    img = tf.image.random_brightness(img, 0.2)   # Slight brightness adjustment
    img = tf.image.random_contrast(img, lower=0.8, upper=1.2)
    img = tf.image.resize(img, (100, 100))
    # Handle potential numerical instabilities
    img = tf.cast(img, tf.float32) / 255.0
    return img

In [244]:
positives=tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives=tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))

# Combine both positive and negative pairs into a single dataset with a mix of matching and non-matching pairs
raw_data = positives.concatenate(negatives) 

In [245]:
# Calculate split sizes
total_size = len(raw_data)
train_size = round(total_size * 0.7)
val_size = total_size - train_size

In [None]:
val_size

In [None]:
raw_data

#The first string is our file path to the specific image, second string is the path to either the positive or neggative image
#last value determines whether its +ve or -ve for verification

In [248]:
sample=raw_data.as_numpy_iterator()

In [249]:
eg=sample.next()

BUILD, TRAIN AND TEST PARTITION 

In [250]:
#Function to preprcess the input and validation images as twins
def preprocess_twins(input_img, validation_img, label):
    try:
        print("Input types:", type(input_img), type(validation_img), type(label))
        print("Input image values:", input_img)
        
        def process_single_image(img):
            # If the image is already a tensor, we need to handle it differently
            if isinstance(img, tf.Tensor):
                # If it's already a preprocessed image tensor
                if img.dtype == tf.float32:
                    return tf.image.resize(img,(100,100))
                # If it's a string tensor (filepath)
                elif img.dtype == tf.string:
                    img = tf.io.read_file(img)
                    img = tf.io.decode_jpeg(img, channels=3)
                    img = tf.cast(img, tf.float32) / 255.0
                    img = tf.image.resize(img, (100, 100))
                    return img
            return None
        
        processed_input = process_single_image(input_img)
        processed_validation = process_single_image(validation_img)
        
        return processed_input, processed_validation, label
    except Exception as e:
        print(f"Error processing images: {str(e)}")
        raise

In [None]:
# Check the first element of the dataset
sample = next(iter(raw_data))
print("Sample structure:", [type(x) for x in sample])
print("First element shape/type:", tf.shape(sample[0]), sample[0].dtype)

In [None]:
res = preprocess_twins(*eg)  # * collecting the eg values from the register

In [253]:
res[1]

In [None]:
res[2]

Build dataloader pipeline

In [255]:
def build_data_pipeline(data, batch_size=32, training=True):
    """
    Creates an optimized data pipeline that properly handles caching.
    The order of operations is crucial for efficient data processing.
    """
    # First preprocess the raw images
    data = data.map(preprocess_twins, num_parallel_calls=tf.data.AUTOTUNE)
    
    if training:
        # For training data, shuffle before batching
        data = data.shuffle(1000)
    
    # Batch the data
    data = data.batch(batch_size, drop_remainder=True)
    
    # Cache after batching to store complete batches
    data = data.cache()
    
    # Prefetch at the end for pipeline efficiency
    data = data.prefetch(tf.data.AUTOTUNE)
    
    return data


Training partition

In [256]:
# Verify the shapes
def verify_shapes(dataset, name):
    print(f"\nVerifying {name} shapes:")
    for batch in dataset.take(1):
        print(f"Input shape: {batch[0].shape}")
        print(f"Validation shape: {batch[1].shape}")
        print(f"Label shape: {batch[2].shape}")
        break



In [257]:
# Create and verify datasets with monitoring
def create_and_verify_datasets(raw_data, batch_size=32):
    # Calculate split sizes
    total_size = len(raw_data)
    train_size = round(total_size * 0.7)
    
    # Split raw data
    raw_train = raw_data.take(train_size)
    raw_val = raw_data.skip(train_size)
    
    # Create pipelines
    train_data = build_data_pipeline(raw_train, batch_size, training=True)
    val_data = build_data_pipeline(raw_val, batch_size, training=False)
    
    # Verify both datasets
    print("\nDataset Statistics:")
    print(f"Total examples: {total_size}")
    print(f"Training examples: {train_size}")
    print(f"Validation examples: {total_size - train_size}")
    
    return train_data, val_data



In [None]:
# Check both datasets
verify_shapes(train_data, "Training Data")
verify_shapes(val_data, "Validation Data")

In [None]:
# Use the improved creation function
train_data, val_data = create_and_verify_datasets(raw_data, BATCH_SIZE)

In [None]:
train_data

In [261]:
train_sample = train_data.as_numpy_iterator()

In [None]:
train_sample =train_sample.next()

In [None]:
len(train_sample[0])

MODEL ENGINEERING

In [264]:

#https://www.cs.cmu.edu/~rsalakhu/papers/oneshot1.pdf  Siememse networks paper

#Builds embedding layer

def make_embedding():
    inp=Input(shape=(100,100,3), name='input_image')

    #First block 
    c1 = Conv2D(64, (10,10), activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(inp)
    c1 = BatchNormalization()(c1)
    m1 = MaxPooling2D(64,(2,2),padding='same')(c1)
    m1 = tf.keras.layers.Dropout(0.3)(m1)

    #Second block
    c2 = Conv2D(128, (3,3), activation='relu')(m1)
    m2 = MaxPooling2D(64,(2,2),padding='same')(c2)
    m2 = tf.keras.layers.Dropout(0.3)(m2)

    #Third block 
    c3 = Conv2D(128, (7,7), activation='relu')(m2)
    m3 = MaxPooling2D(64,(2,2),padding='same')(c3)
    m3 = tf.keras.layers.Dropout(0.3)(m3)

    #Fouth block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)


    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [265]:
embedding = make_embedding()

In [None]:
embedding.summary()

Build distance layer

In [267]:
class L1Dist(Layer):

    #Init method for inheritance
    def __init__(self, **kwargs):
        super().__init__()

    #Des the similarity calculation 
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding) + 1e-6

In [268]:
l1 = L1Dist()

Make Siamese model

In [269]:
def make_siamese_model():
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))

    #Validation image in the network
    validation_image = Input(name='validation_img', shape=(100,100,3))

    #Combine siamese distance components
    siamise_layer = L1Dist()
    siamise_layer._name = 'distance'
    distances = siamise_layer(embedding(input_image), embedding(validation_image))

    #Classification layer
    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')



In [270]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

TRAINING OUR MODEL

Setup loss function and optimizer

In [272]:
loss_function = tf.losses.BinaryCrossentropy()

initial_learning_rate = 1e-5
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=500,
    decay_rate=0.95
)


In [273]:
opt = tf.keras.optimizers.legacy.Adam(learning_rate=lr_schedule) #learning rate @ 0.0001 initially then gradually reduces

Make a ckeckpoint 

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [275]:
# Add before training
import shutil
if os.path.exists(checkpoint_dir):
    shutil.rmtree(checkpoint_dir)
os.makedirs(checkpoint_dir, exist_ok=True)

Build train step function

In [None]:
test_batch = train_data.as_numpy_iterator()

In [277]:
batch1 = test_batch.next()

In [None]:
batch1[2]

In [279]:
@tf.function
def train_step(batch):
    # Get anchor and positive/negative images
    X = batch[:2]
    # Get label
    Y = batch[2]
    
    # Record operations with gradient tape
    with tf.GradientTape() as tape:
        # Forward pass through the model
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = loss_function(Y, yhat)
    
    # Calculate gradients
    gradients = tape.gradient(loss, siamese_model.trainable_variables)
    # Clip gradients to prevent exploding gradients
    clipped_gradients = [tf.clip_by_value(g, -1.0, 1.0) for g in gradients]
    # Apply gradients to update model
    opt.apply_gradients(zip(clipped_gradients, siamese_model.trainable_variables))
    
    return loss

Build training loop

In [280]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1): 
        print(f'\nEpoch {epoch}/{EPOCHS}')
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Track losses for this epoch
        epoch_loss = tf.keras.metrics.Mean()
        
        for idx, batch in enumerate(data):
            # Get numerical loss value
            loss = train_step(batch)
            epoch_loss.update_state(loss)
            
            # Update progress bar with actual loss value
            progbar.update(
                idx+1, 
                values=[('loss', float(epoch_loss.result()))]
            )
            
        # Save checkpoints every 10 epochs
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

Train the model

In [281]:
EPOCHS = 100

In [None]:
# Check data shape
for batch in train_data.take(1):
    print("Input shape:", batch[0].shape)
    print("Validation shape:", batch[1].shape)
    print("Label shape:", batch[2].shape)
    break

# Check model input shape
print("\nModel input shape:")
for layer in siamese_model.layers:
    if isinstance(layer, tf.keras.layers.InputLayer):
        print(f"{layer.name}: {layer.input_shape}")

In [None]:
# Initialize model by running one batch through it
for batch in train_data.take(1):
    X = batch[:2]
    # Do a forward pass to initialize variables
    _ = siamese_model(X, training=False)

In [None]:
train(train_data, EPOCHS)


EVALUATE MODEL

In [293]:
def evaluate_model(model, test_data, threshold=0.5):
    """
    Enhanced evaluation with raw prediction analysis
    """
    test_loss = tf.metrics.Mean()
    raw_predictions = []
    raw_labels = []
    
    for batch in test_data:
        X = batch[:2]
        y_true = batch[2]
        y_pred = model(X, training=False)
        
        # Store raw values for analysis
        raw_predictions.extend(y_pred.numpy().flatten())
        raw_labels.extend(y_true.numpy().flatten())
        
        test_loss.update_state(loss_function(y_true, y_pred))
    
    # Convert to numpy arrays for easier analysis
    predictions = np.array(raw_predictions)
    labels = np.array(raw_labels)
    
    # Print prediction distribution
    print(f"\nPrediction Statistics:")
    print(f"Min prediction: {predictions.min():.4f}")
    print(f"Max prediction: {predictions.max():.4f}")
    print(f"Mean prediction: {predictions.mean():.4f}")
    
    # Calculate metrics using numpy
    y_pred_binary = (predictions > threshold).astype(float)
    
    tp = np.sum((labels == 1) & (y_pred_binary == 1))
    fp = np.sum((labels == 0) & (y_pred_binary == 1))
    tn = np.sum((labels == 0) & (y_pred_binary == 0))
    fn = np.sum((labels == 1) & (y_pred_binary == 0))
    
    accuracy = (tp + tn) / len(labels) if len(labels) > 0 else 0
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    return {
        'loss': float(test_loss.result()),
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'true_positives': int(tp),
        'false_positives': int(fp),
        'true_negatives': int(tn),
        'false_negatives': int(fn),
        'threshold_used': threshold,
        'prediction_mean': float(predictions.mean()),
        'prediction_std': float(predictions.std())
    }

In [294]:
def test_model_evaluation():
    """
    Tests the model evaluation function and displays results in a readable format.
    """
    # Load the model
    siamese_model = make_siamese_model()
    
    # Restore the latest checkpoint
    checkpoint = tf.train.Checkpoint(siamese_model=siamese_model)
    latest_checkpoint = tf.train.latest_checkpoint('./training_checkpoints')
    
    if latest_checkpoint:
        print("Loading model from checkpoint...")
        checkpoint.restore(latest_checkpoint)
        print("Model restored successfully!")
    else:
        print("No checkpoint found. Using untrained model.")
    
    # Create a fresh validation dataset for each threshold
    thresholds = [0.3, 0.5, 0.7]
    for threshold in thresholds:
        print(f"\nEvaluating model with threshold {threshold}:")
        # Recreate validation data for each threshold evaluation
        _, fresh_val_data = create_and_verify_datasets(raw_data, BATCH_SIZE)
        results = evaluate_model(siamese_model, fresh_val_data, threshold)
        
        # Display results in a readable format
        print("\nEvaluation Metrics:")
        print("-" * 50)
        print(f"Loss: {results['loss']:.4f}")
        print(f"Accuracy: {results['accuracy']:.4f} ({results['accuracy']*100:.1f}%)")
        print(f"Precision: {results['precision']:.4f} ({results['precision']*100:.1f}%)")
        print(f"Recall: {results['recall']:.4f} ({results['recall']*100:.1f}%)")
        print(f"F1 Score: {results['f1_score']:.4f}")
        
        print("\nConfusion Matrix:")
        print("-" * 50)
        print(f"True Positives: {results['true_positives']}")
        print(f"False Positives: {results['false_positives']}")
        print(f"True Negatives: {results['true_negatives']}")
        print(f"False Negatives: {results['false_negatives']}")

In [295]:
def inspect_model_predictions(model, test_data):
    """
    Detailed inspection of model predictions
    """
    # Get single batch
    for batch in test_data.take(1):
        X = batch[:2]
        y_true = batch[2]
        y_pred = model(X, training=False)
        
        print("Label distribution:", np.unique(y_true, return_counts=True))
        print("Prediction range:", np.min(y_pred), "-", np.max(y_pred))
        
        # Look at embeddings
        embedding_layer = model.get_layer('embedding')
        embeddings = embedding_layer(X[0])
        print("Embedding stats:", {
            'mean': np.mean(embeddings),
            'std': np.std(embeddings),
            'min': np.min(embeddings),
            'max': np.max(embeddings)
        })

In [296]:
def detailed_model_inspection(model, test_data):
    """
    Detailed model inspection with image pair analysis
    """
    for batch in test_data.take(1):
        X = batch[:2]
        y_true = batch[2]
        
        # Get embeddings for both images
        embedding_layer = model.get_layer('embedding')
        embeddings1 = embedding_layer(X[0])
        embeddings2 = embedding_layer(X[1])
        
        # Compare embeddings
        distances = tf.abs(embeddings1 - embeddings2)
        
        print("\nEmbedding Analysis:")
        print(f"First image embeddings range: {tf.reduce_min(embeddings1):.4f} to {tf.reduce_max(embeddings1):.4f}")
        print(f"Second image embeddings range: {tf.reduce_min(embeddings2):.4f} to {tf.reduce_max(embeddings2):.4f}")
        print(f"Distance range: {tf.reduce_min(distances):.4f} to {tf.reduce_max(distances):.4f}")
        print(f"Average distance: {tf.reduce_mean(distances):.4f}")
        
        # Get model predictions
        predictions = model([X[0], X[1]], training=False)
        
        # Print paired results
        for i in range(min(5, len(y_true))):  # Show first 5 pairs
            print(f"\nPair {i+1}:")
            print(f"True label: {y_true[i]}")
            print(f"Prediction: {predictions[i][0]:.4f}")
            print(f"Average embedding distance: {tf.reduce_mean(distances[i]):.4f}")



In [None]:
detailed_model_inspection(siamese_model, val_data)

In [None]:
inspect_model_predictions(siamese_model, val_data)

In [None]:
# Run the evaluation test
test_model_evaluation()