# Import Dependencies

In [1]:
#import standard dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as pyplot

In [5]:
#importing tensorflow dependencies
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf


# Create Folder Structures

In [9]:
#setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [10]:
#make the directoeriesq
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

# camera access to take positive and negative images


In [None]:
# cap  = cv2.VideoCapture(0)
# while cap.isOpened():
#     ret, frame = cap.read()  
#     frame = frame[:360, :513, :]
#     cv2.imshow('image', frame)   
    
#     if cv2.waitKey(1) & 0XFF == ord('a'):
#         img = os.path.join(anchor, '{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(img, frame)
        
#     if cv2.waitKey(1) & 0XFF == ord('p'):
#         img = os.path.join(pospath, '{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(img, frame)
        
#     if cv2.waitKey(1) & 0XFF == ord('q'):
#         break
# cap.release()
# cv2.destroyAllWindows()    

# Image Processing

In [None]:
# for negimg in os.listdir(negpath):
#     img = cv2.imread(os.path.join(negpath, negimg))
#     img = cv2.resize(img, None, fx=0.731, fy=0.731)
#     cv2.imwrite(os.path.join(negpath, negimg), img)

In [None]:
# img = cv2.imread(r'data\negative\000_left_ear.jpg')

In [None]:
# plt.imshow(img)

# To Process Image (Roatate 90 degree Clockwise)

In [None]:
# for img in os.listdir(anchor):
#     a = cv2.imread(os.path.join(anchor, img))
#     a = cv2.rotate(a, cv2.ROTATE_90_CLOCKWISE)
#     cv2.imwrite(os.path.join(anchor, img), a)

In [None]:
# for directories in os.listdir("lfw"):
#     for files in os.listdir(os.path.join('lfw',directories)):
#         ex = os.path.join("lfw", directories, files)
#         new = os.path.join(negpath, files)
#         os.replace(ex, new)

# Creating TensorFlow Dataset

In [None]:
def createdataset(path):
    return tf.data.Dataset.list_files(path+"\*.jpg").take(500) 

In [None]:
pos = createdataset(POS_PATH)
neg = createdataset(NEG_PATH)
anch = createdataset(ANC_PATH)

In [None]:
print(pos, neg, anch, sep='\n')

In [None]:
test = pos.as_numpy_iterator()
test.next()

In [None]:
tf.ones(len(anch))

# Zipping the Dataset (Anchor with Validation Image) & Concatenating

In [None]:
positives = tf.data.Dataset.zip((anch, pos, tf.data.Dataset.from_tensor_slices(tf.ones(len(anch)))))
negatives = tf.data.Dataset.zip((anch, neg, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anch)))))
rawdata = positives.concatenate(negatives)

In [None]:
print(positives, negatives, rawdata, sep='\n')

In [None]:
test = rawdata.as_numpy_iterator()
test.next()

# Image Preprocessing

In [None]:
def preprocess(path):
    byteimg = tf.io.read_file(path)
    img = tf.io.decode_jpeg(byteimg)
    img = tf.image.resize(img, (105, 105))
    img /= 255.0
    return img

In [None]:
def preprocess_twin(inputimg, validationimg, label):
    return (preprocess(inputimg), preprocess(validationimg), label)

# Suffling

In [None]:
data = rawdata.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
test = data.as_numpy_iterator()
test.next()

# Splitting Train & Test Data

In [None]:
len(data)

In [None]:
train_data = data.skip(round(len(data)*.7))

In [None]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Embedding Layer

In [None]:
def make_embedding(): 
    inp = Input(shape=(105,105,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()
embedding.summary()

# L1 Distance Layer

In [None]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    def call(self, input_emb, validation_emb):
        return tf.math.abs(input_emb - validation_emb)

In [None]:
inputimg = Input(name='input_img', shape=(105, 105, 3))
validationimg = Input(name='val_img', shape=(105, 105, 3))
print(inputimg, validationimg, sep='\n')

In [None]:
siameselayer = L1Dist()

In [None]:
inputembedding = embedding(inputimg)
validationembedding = embedding(validationimg)
print(inputembedding, validationembedding, sep='\n')

In [None]:
distances = siameselayer(inputembedding, validationembedding)
distances

In [None]:
classifier = Dense(1, activation='sigmoid')(distances)
classifier

In [None]:
testnetwork = Model(inputs=[inputimg, validationimg], outputs=classifier, name='SiameseNetwork')
testnetwork


# Siamese Model

In [None]:
def make_siamese_model():
    inputimg = Input(name='inputimg', shape=(105, 105, 3))
    validationimg = Input(name='validationimg', shape=(105, 105, 3))
    siameselayer = L1Dist()
    siameselayer._name = 'distance'
    distances = siameselayer(embedding(inputimg), embedding(validationimg))
    classifier = Dense(1, activation='sigmoid')(distances)
    return Model(inputs=[inputimg, validationimg], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()
siamese_model.summary()

# Loss Function & Optimizer

In [None]:
bcl = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)

# Checkpoints Creation

In [None]:
checkpointdir = '../checkpoints'
checkpointprefix = os.path.join(checkpointdir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

# Training the Model (Step Function)

In [None]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        X = batch[:2]
        Y = batch[2]
        y_pred = siamese_model(X, training=True)
        loss = bcl(Y, y_pred)
    print(loss)
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    return loss

# Loop for training the entire batch by sending to step function

In [None]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print("\n EPOCH : {}/{}".format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)
        if epoch%10==0:
            checkpoint.save(file_prefix=checkpointprefix)

# Training the model

In [None]:
EPOCHS = 50
print(train(train_data, EPOCHS))

In [None]:
siamese_model.save('siamese.h5')

# Run the Pretrained

In [None]:
model = tf.keras.models.load_model('siamese.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossEntropy':tf.losses.BinaryCrossentropy})

In [None]:
from tensorflow.keras.metrics import Recall, Precision

In [None]:
testinput, testval, ytrue = test_data.as_numpy_iterator().next()

In [None]:
ypred = model.predict([testinput, testval])

In [None]:
[1 if prediction>0.5 else 0 for prediction in ypred]

In [None]:
ytrue

In [None]:
m = Recall()
m.update_state(ytrue, ypred)
m.result().numpy()

In [None]:
m = Precision()
m.update_state(ytrue, ypred)
m.result().numpy()

In [None]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

In [None]:
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(testinput[2])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(testval[2])

# Renders cleanly
plt.show()