In [None]:
import os
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt
import uuid

In [6]:
import tensorflow as tf
from tensorflow.keras.metrics import Precision, Recall # pyright: ignore[reportMissingImports]
from tensorflow.keras.models import Model # pyright: ignore[reportMissingImports]
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten # pyright: ignore[reportMissingImports]

# ***Create Folders***

In [7]:
# Setup Paths

POS_PATH = os.path.join("Data","Positive")
NEG_PATH =os.path.join("Data","Negative")
ANCHOR_PATH = os.path.join("Data", "Anchor")

In [81]:
# Make directories

# os.makedirs(POS_PATH)
# os.makedirs(NEG_PATH)
# os.makedirs(ANCHOR_PATH)

# ***Collecting the Positives & Anchors***

In [8]:
uuid.uuid1()

UUID('7ca007bc-d2fe-11f0-be52-a5eb1324763a')

In [9]:
# Connection to webcam

cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    # Resize the frame
    frame = frame[120:370, 200:450, :]

    # Collect Anchor images
    if cv2.waitKey(1) & 0xFF == ord('a'):
        imgname = os.path.join(ANCHOR_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)

    # Collect Positive images
    if cv2.waitKey(1) & 0xFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)

    #show the frame
    cv2.imshow("Image Collection", frame)

    # Break condition
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the webcam and close windows
cap.release()
cv2.destroyAllWindows()

### ***Data Augmentation***

In [10]:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.01, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.75, upper=1, seed=(1,3))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data

In [11]:
frame.shape

(250, 250, 3)

In [12]:
for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [13]:
for file_name in os.listdir(os.path.join(ANCHOR_PATH)):
    img_path = os.path.join(ANCHOR_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(ANCHOR_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

# ***Load & Preprocess Images***

### ***Get Image Directories***

In [14]:
anchor = tf.data.Dataset.list_files(ANCHOR_PATH+'/*.jpg').take(4000)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(4000)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(8000)

### ***Preprocessing***

In [15]:
def preprocess(file_path):
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # load the image
    img = tf.io.decode_jpeg(byte_img)
    # Resize the image to be 105x105x3
    img = tf.image.resize(img, (105,105))
    # Scale image to be between 0 and 1
    img = img / 255.0

    return img

### ***Create Labelled Dataset***

In [16]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [17]:
sample = data.as_numpy_iterator()

In [18]:
example = sample.next()

In [19]:
example

(b'Data\\Anchor\\bd8c1fcb-d2fe-11f0-bd1e-a5eb1324763a.jpg',
 b'Data\\Positive\\8bd148d1-d2fe-11f0-9fd9-a5eb1324763a.jpg',
 np.float32(1.0))

### ***Train & Test Data***

In [20]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

In [21]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [22]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(32)
train_data = train_data.prefetch(8)

In [23]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(32)
test_data = test_data.prefetch(8)

# ***Model Building***

### ***Build Embedding layer***

In [24]:
def make_embedding(): 
    inp = Input(shape=(105,105,3), name='input_image')

    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D((2,2), padding='same')(c1)

    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D((2,2), padding='same')(c2)

    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D((2,2), padding='same')(c3)

    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=d1, name='embedding')

In [25]:
embedding_model = make_embedding()
embedding_model.summary()

### ***Build Distance Layer***

In [26]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

### ***Siamese Model***

In [27]:
def make_siamese_model():
    
    # Anchor Image Input
    input_image = Input(name='input_img', shape=(105,105,3))
    # Validation Image Input
    validation_image = Input(name='validation_img', shape=(105,105,3))

    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = "distance"
    distances = siamese_layer(embedding_model(input_image), embedding_model(validation_image))

    # Classification Layer
    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name="SiameseNetwork")

In [28]:
siamese_model = make_siamese_model()




In [29]:
siamese_model.summary()

# ***Training***

### ***Set The Loss & Optimizer***

In [30]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4) 

### ***Checkpoints***

In [31]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

### ***Training Step Function***

In [32]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

### ***Training Loop***

In [33]:
def train(data, EPOCHS):
    # Setup checkpoint directory
    checkpoint_dir = './training_checkpoints'
    os.makedirs(checkpoint_dir, exist_ok=True)
    
    # Early stopping variables
    best_loss = float('inf')
    patience = 3
    patience_counter = 0
    
    # Loop through epochs
    for epoch in range(1, EPOCHS + 1):
        print(f'\nEpoch {epoch}/{EPOCHS}')
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating metric objects 
        r = Recall()
        p = Precision()
        loss_metric = tf.keras.metrics.Mean()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2], verbose=0)
            
            # Update metrics
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat)
            loss_metric.update_state(loss)
            
            progbar.update(idx + 1)
        
        # Print epoch summary
        current_loss = loss_metric.result().numpy()
        recall = r.result().numpy()
        precision = p.result().numpy()
        print(f'Loss: {current_loss:.4f} | Recall: {recall:.4f} | Precision: {precision:.4f}')
        
        # Save best checkpoint
        if current_loss < best_loss:
            best_loss = current_loss
            patience_counter = 0
            checkpoint_path = os.path.join(checkpoint_dir, f'best_model_epoch_{epoch}.keras')
            siamese_model.save(checkpoint_path)
            print(f'Best model saved (Loss: {best_loss:.4f})')
        else:
            patience_counter += 1
            print(f'No improvement for {patience_counter}/{patience} epochs')
        
        # Early stopping check
        if patience_counter >= patience:
            print(f'\nEarly stopping triggered at epoch {epoch}')
            print(f'Best loss achieved: {best_loss:.4f}')
            break
        
        # Periodic checkpoint save (every 5 epochs)
        if epoch % 5 == 0:
            checkpoint_path = os.path.join(checkpoint_dir, f'checkpoint_epoch_{epoch}.keras')
            siamese_model.save(checkpoint_path)
            print(f'Checkpoint saved at epoch {epoch}')
    
    print(f'\n Training complete')


### ***Training The Model***

In [34]:
EPOCHS = 30
train(train_data, EPOCHS)


Epoch 1/30
Tensor("binary_crossentropy/div_no_nan:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/div_no_nan:0", shape=(), dtype=float32)
[1m175/175[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m327s[0m 2s/step
Loss: 0.0863 | Recall: 0.9799 | Precision: 0.9942
Best model saved (Loss: 0.0863)

Epoch 2/30
[1m175/175[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m619s[0m 4s/step
Loss: 0.0047 | Recall: 0.9996 | Precision: 0.9996
Best model saved (Loss: 0.0047)

Epoch 3/30
[1m175/175[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m336s[0m 2s/step
Loss: 0.0139 | Recall: 0.9986 | Precision: 0.9964
No improvement for 1/3 epochs

Epoch 4/30
[1m175/175[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m335s[0m 2s/step
Loss: 0.0018 | Recall: 1.0000 | Precision: 1.0000
Best model saved (Loss: 0.0018)

Epoch 5/30
[1m175/175[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m337s[0m 2s/step
Loss: 0.0015 | Recall: 1.0000 | Precision: 1.0000
Best model saved (Loss: 0.0015)
Checkpoint saved

KeyboardInterrupt: 

# ***Evaluation***

In [35]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 379ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 394ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 464ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 395ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 393ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 426ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 434ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 378ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 422ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 411ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 395ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 427ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 422ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 

# ***Save Model***

In [36]:
siamese_model.save('siamese_modelv2.keras')

In [37]:
model = tf.keras.models.load_model(
    'siamese_modelv2.keras',
    custom_objects={'L1Dist': L1Dist}
)

In [38]:
model.summary()

# ***Real Time Test***

In [42]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

### ***OpenCV RT Verification***

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    # Resize the frame
    frame = frame[120:370, 200:450, :]

    # Collect Anchor images
    if cv2.waitKey(1) & 0xFF == ord('v'):
        imgname = os.path.join("Test","InputImages","InputImages.jpg")
        cv2.imwrite(imgname, frame)

        verified, verification = verify(model, 0.7 , 0.6)
        print(verified)

    #show the frame
    cv2.imshow("FACE ID", frame)

    # Break condition
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
# Release the webcam and close windows
cap.release()
cv2.destroyAllWindows()