# 1. Dependencies and Folders

In [None]:
import cv2
import os
import uuid
import random
import pytest
import tensorflow as tf
import numpy as np
from matplotlib import pyplot as plt

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

In [None]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

# 2. Image Collection and Other Processes

In [None]:
def sharpen_image(image):
    kernel = np.array([[-1, -1, -1],
                       [-1, 9, -1],
                       [-1, -1, -1]])

    sharpened =cv2.filter2D(image, -1, kernel)
    return sharpened

cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    
    frame = frame[:1080,:1920, :]
    
    blurred_frame = cv2.GaussianBlur(frame, (5, 5), 0)
    
    more_blurred_frame = cv2.GaussianBlur(blurred_frame, (9, 9), 0)
    
    sharpened_frame = sharpen_image(more_blurred_frame)

    if cv2.waitKey(10) & 0Xff == ord('a'):
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame) 
            
    if cv2.waitKey(10) & 0Xff == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame) 
    
    if cv2.waitKey(10) & 0Xff == ord('n'):
        imgname = os.path.join(NEG_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame) 
    
    cv2.imshow('Image Collection', frame)
    
    if cv2.waitKey(10) & 0Xff == ord('q'):
        break
    
cap.release()

cv2.destroyAllWindows()


* Optional: If image type is not '.jpg'

In [None]:
def rename_images(directory):
    for filename in os.listdir(directory):
        if filename.endswith('.jpg') or filename.endswith('.png'):
            old_path = os.path.join(directory, filename)
            new_filename = str(uuid.uuid1()) + os.path.splitext(filename)[1]
            new_path = os.path.join(directory, new_filename)
            os.rename(old_path, new_path)

rename_images(POS_PATH)

rename_images(ANC_PATH)


# 2.x Data Augmentation

In [None]:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data

# 3. Data Setting and Image Loading

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\\*.jpg').take(2000)
positive = tf.data.Dataset.list_files(POS_PATH+'\\*.jpg').take(2000)
negative = tf.data.Dataset.list_files(NEG_PATH+'\\*.jpg').take(2000)

In [None]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (150, 300))
    img = img / 255.0
    return img

In [None]:
img = preprocess('data\\anchor\\21e9c4a2-0c86-11ef-82f2-010101010000.jpg')

In [None]:
img.numpy().max() 

In [None]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
samples = data.as_numpy_iterator()

In [None]:
example = samples.next()

In [None]:
example

In [None]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [None]:
res = preprocess_twin(*example)

In [None]:
plt.imshow(res[1])

In [None]:
res[2]

In [None]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(8)
train_data = train_data.prefetch(4)

In [None]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(8)
test_data = test_data.prefetch(4)

# 4. Siamese Network

In [None]:
inp = Input(shape=(150,300,3), name='input_image')

In [None]:
c1 = Conv2D(32, (7, 7), activation='relu')(inp)

In [None]:
m1 = MaxPooling2D(pool_size=(2,2))(c1)

In [None]:
f1 = Flatten()(m1)
d1 = Dense(128, activation='sigmoid')(f1)

In [None]:
mod = Model(inputs=[inp], outputs=[d1], name='embedding_simple')

In [None]:
mod.summary()

In [None]:
def make_embedding_simple():
    inp = Input(shape=(150, 300, 3), name='input_image')
    
    c1 = Conv2D(32, (7, 7), activation='relu')(inp)
    m1 = MaxPooling2D(pool_size=(2, 2))(c1)
    f1 = Flatten()(m1)
    d1 = Dense(128, activation='sigmoid')(f1)
    
    return Model(inputs=[inp], outputs=[d1], name='embedding_simple')

In [None]:
embedding_simple = make_embedding_simple()

In [None]:
embedding_simple.summary()

# 4.2 Build Distance Layer

In [None]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    
    def call(self, input_embedding, validation_embedding):

        input_embedding = tf.convert_to_tensor(input_embedding)
        validation_embedding = tf.convert_to_tensor(validation_embedding)
        
        distances = tf.math.abs(input_embedding - validation_embedding)
        return distances

In [None]:
l1 = L1Dist()

# 4.3 Make Siamese Model

In [None]:
input_image = Input(name='input_img', shape=(150,300,3))
validation_image =Input(name='validation_img', shape=(150,300,3))

In [None]:
inp_embedding = embedding_simple(input_image)
val_embedding = embedding_simple(validation_image)

In [None]:
siamese_layer = L1Dist()

In [None]:
distances = siamese_layer(inp_embedding, val_embedding)

In [None]:
classifier = Dense(1, activation='sigmoid')(distances)

In [None]:
classifier

In [None]:
siamese_model_simple = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetworkSimple')

In [None]:
siamese_model_simple.summary()

In [None]:
def make_siamese_model_simple():
    input_image = Input(name='input_img', shape=(150, 300, 3))
    validation_image = Input(name='validation_img', shape=(150, 300, 3))

    embedding_simple = make_embedding_simple()

    inp_embedding = embedding_simple(input_image)
    val_embedding = embedding_simple(validation_image)

    if isinstance(inp_embedding, list):
        inp_embedding = inp_embedding[0]
    if isinstance(val_embedding, list):
        val_embedding = val_embedding[0]

    siamese_layer = L1Dist()

    distances = siamese_layer(inp_embedding, val_embedding)

    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetworkSimple')

In [None]:
siamese_model_simple = make_siamese_model_simple()

In [None]:
siamese_model_simple.summary()

## 5. Training

In [None]:
opt = tf.keras.optimizers.Adam(1e-4)

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model_simple=siamese_model_simple)

In [None]:
test_batch = train_data.as_numpy_iterator()

In [None]:
batch = test_batch.next()

In [None]:
batch_1= batch[:2]

In [None]:
input_image, validation_image = batch[:2]

print("Shape before any operation:", input_image.shape, validation_image.shape)

input_image = tf.reshape(input_image, (-1, 150, 300, 3))
validation_image = tf.reshape(validation_image, (-1, 150, 300, 3))

print("Shape after reshaping:", input_image.shape, validation_image.shape)

yhat = siamese_model_simple.predict([input_image, validation_image])

In [None]:
X = (batch_1[:0], batch[:1])

In [None]:
y = batch_1[1]

In [None]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        X = (tf.expand_dims(batch[0], axis=0), tf.expand_dims(batch[1], axis=0))
        y = tf.expand_dims(batch[2], axis=0)
        yhat = siamese_model_simple(X, training=True)
        
        if len(yhat.shape) == 0:
            yhat = tf.expand_dims(yhat, axis=0)

        loss = binary_cross_loss(y, yhat)
    
    grad = tape.gradient(loss, siamese_model_simple.trainable_variables)
    opt.apply_gradients(zip(grad, siamese_model_simple.trainable_variables))
    
    return loss

5.1 Training Loop

In [None]:
from tensorflow.keras.metrics import Precision, Recall

In [None]:
def train(data, start_epoch, EPOCHS):
    for epoch in range(start_epoch, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        r = Recall()
        p = Precision()
        
        for idx, batch in enumerate(data):
            loss = train_step(batch)
            
            input_image, validation_image = batch[0], batch[1]
            input_image = tf.reshape(input_image, [-1, 150, 300, 3])
            validation_image = tf.reshape(validation_image, [-1, 150, 300, 3])
            
            yhat = siamese_model_simple.predict([input_image, validation_image])
            
            labels = tf.expand_dims(batch[2], axis=-1)
            
            r.update_state(labels, yhat)
            p.update_state(labels, yhat)
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        if epoch % 5 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [None]:
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
EPOCHS = 25

start_epoch = 21

In [None]:
train(data, start_epoch, EPOCHS) 

## 6. Evaluate Model

In [None]:
from tensorflow.keras.metrics import Precision, Recall

In [None]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model_simple.predict([test_input, test_val])
y_hat

In [None]:
[1 if prediction > 0.5 else 0 for prediction in y_hat.flatten()]

In [None]:
y_true

6.2 Calculate Metrics

In [None]:
m = Precision()
m.update_state(y_true, y_hat)
m.result().numpy()

In [None]:
m = Recall()
m.update_state(y_true, y_hat)
m.result().numpy()

6.3 Visual Res

In [None]:
plt.figure(figsize=(10,8))

plt.subplot(1,2,1)
plt.imshow(test_input[2])

plt.subplot(1,2,2)
plt.imshow(test_val[2])

plt.show()

## 7. Save Model

In [None]:
siamese_model_simple.save('licenseIDmodel.keras')

In [None]:
model = tf.keras.models.load_model('licenseIDmodel.keras', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
model.predict([test_input, test_val])

In [None]:
model.summary()

## 8. Real Time Test

In [None]:
def verify(frame, model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data','verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))

        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)

    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection /  len(os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification > verification_threshold

    return results, verified

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[:1080,:1920, :]

    cv2.imshow('verification', frame)

    if cv2.waitKey(5) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        results, verified = verify(frame, model, detection_threshold=0.9, verification_threshold=0.7)
        print(verified)

    if cv2.waitKey(5) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [None]:
np.sum(np.squeeze(results) > 0.9)

In [None]:
42/50