In [1]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [2]:
import tensorflow as tf
print(tf.config.list_physical_devices('GPU'))

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [3]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf



print("TF version:", tf.__version__)
print("GPUs detected:", tf.config.list_physical_devices('GPU'))

print("Built with CUDA:", tf.test.is_built_with_cuda())
print("Is GPU available:", tf.test.is_gpu_available(cuda_only=True))


TF version: 2.10.0
GPUs detected: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Built with CUDA: True
Instructions for updating:
Use `tf.config.list_physical_devices('GPU')` instead.
Is GPU available: True


In [4]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

In [5]:
#Setup Paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [6]:
#Directory
os.makedirs(POS_PATH, exist_ok=True)#if file created already it passes
os.makedirs(NEG_PATH, exist_ok=True)
os.makedirs(ANC_PATH, exist_ok=True)

if not os.path.exists(POS_PATH):
    os.makedirs(POS_PATH)
if not os.path.exists(NEG_PATH):
    os.makedirs(NEG_PATH)
if not os.path.exists(ANC_PATH):
    os.makedirs(ANC_PATH)

In [7]:
#moves the lfw images to the following repository data/negative
for directory in os.listdir('lfw'): 
    dir_path = os.path.join('lfw', directory)
    for file in os.listdir(dir_path):
        EX_PATH = os.path.join(dir_path, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

In [8]:
#importing UUID to generate unique image names
import uuid

In [9]:
uuid.uuid1()

UUID('b480314f-5daa-11f0-a06e-f02f74181521')

In [10]:
#Capturing Images Via Webcam
cap = cv2.VideoCapture(0)

if not cap.isOpened():
    print("Cannot open webcam")

while cap.isOpened():
    ret, frame = cap.read()

    frame = frame[120:120+250, 200:200+250, :3] # cut down frame to 250x250 and slices to the center of camera
    
    #Collect Anchors
    if cv2.waitKey(1) & 0xFF == ord('a'):
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())) #creates a unique file path
        cv2.imwrite(imgname, frame) # writes out the unique image name
        
        
    #Collect Positives
    if cv2.waitKey(1) & 0xFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    
    if not ret:
        print("Failed to grab frame")
        break

    cv2.imshow('Image Collection (Hit q to exit)', frame)

    # Use 'q' to quit
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

In [11]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300) #takes 300 images from the anchor directory
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [12]:
#preprocessing

def preprocess(file_path):
    byte_img = tf.io.read_file(file_path) #read in image from file path
    img = tf.io.decode_jpeg(byte_img) # load in image
    img = tf.image.resize(img, (100,100)) #resizing image to be 100x100x3
    img = img / 255.0 #scale image to be 0 and 1
    return img

In [13]:
#Creating a labelled dataset
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [14]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [15]:
#dataloader pipeline

data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size = 1024)

In [16]:
#training partitioner
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [17]:
#Testing Partition

test_data = data.skip(round(len(data)*.7)) #skips the training portion
test_data = test_data.take(round(len(data)*.3))#takes the final as test
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [18]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096 , activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')


In [19]:
embedding = make_embedding()

In [20]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

In [21]:
#Siamese Distance Class

class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    #Similarity Calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)



In [22]:
input_image = Input(name = 'input_img', shape = (100,100,3))
validation_image = Input(name = 'validation_img', shape = (100,100,3)) #Validation image


In [23]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [24]:
siamese_layer = L1Dist()

In [25]:
distances = siamese_layer(inp_embedding, val_embedding)

In [26]:
classifier = Dense(1, activation = 'sigmoid')(distances)

In [27]:
classifier

<KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'dense_1')>

In [28]:
def make_siamese_model():
    input_image = Input(name='input_img', shape=(100, 100, 3))# Anchor
    validation_image = Input(name='validation_img', shape=(100, 100, 3))  # Positive or Negative

    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'

    distances = siamese_layer(embedding(input_image), embedding(validation_image))

    # Classification layer
    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')


In [29]:
siamese_model = make_siamese_model()

In [30]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [31]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [32]:
opt = tf.keras.optimizers.Adam(1e-4) #0.0001 my name so I had to use it..

In [33]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt = opt, siamese_model=siamese_model)

In [34]:
test_batch = train_data.as_numpy_iterator()

In [35]:
batch_1 = test_batch.next()

In [36]:
X = batch_1[:2]

In [37]:
y = batch_1[2]

In [38]:
y

array([1., 1., 0., 0., 1., 0., 0., 1., 1., 1., 1., 0., 0., 0., 1., 1.],
      dtype=float32)

In [39]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    # Return loss
    return loss

In [40]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            train_step(batch)
            progbar.update(idx+1)
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [41]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50
 4/27 [===>..........................] - ETA: 4s

In [None]:
from tensorflow.keras.metrics import Precision, Recall

In [None]:
#batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_val])
y_hat 

In [None]:
[1 if prediction > 0.5 else 0 for prediction in y_hat]

In [None]:
y_true

In [None]:
m = Recall()
m.update_state(y_true, y_hat)
m.result().numpy()

In [None]:
#Showing image comparison 
plt.figure(figsize=(10, 8))
plt.subplot(1,2,1)
plt.imshow(test_input[2])
plt.subplot(1,2,2)
plt.imshow(test_val[2])
plt.show

print('1.0 if POSITIVE // 0.0 IF NEGATIVE: ', y_true[2])


In [None]:
#Save Model

siamese_model.save('siamesemodel.h5')

In [None]:
#reload
model = tf.keras.models.load_model('siamesemodel.h5', 
                                  custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
#preds with reloaded model
model.predict([test_input, test_val])

In [None]:
model.summary()

In [None]:
#verification

def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_images', 'input_img.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))

        result = model.predict(list(np.expand_dims([input_img, validation_img], axis = 1)))
        results.append(result)
        
    #Metric to judge if a prediction is considered positive
    detection = np.sum(np.array(results) > detection_threshold)

    #Proportion of positive predictions / total positive samples
    verification = detection / len (os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification > verification_threshold

    return results, verified

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    #Makes it the same size as images have been taken in the past.
    frame = frame[120:120+250, 200:200+250, :3]

    cv2.imshow('Verification', frame)

    #Verification Trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        #Saving input img to input folder
        cv2.imwrite(os.path.join('application_data', 'input_images', 'input_img.jpg'), frame)

        #running verification
        results, verified = verify(model, 0.9, 0.7)
        print(verified)
    

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
        
cap.release()
cv2.destroyAllWindows()

In [None]:
np.sum(np.squeeze(results) > 0.5)

In [None]:
results