In [None]:
#This code is inspired from the tutorial of Nicholas Renotte -Siamese neural network-Tutorial


In [None]:
import cv2
import os                
import random
import numpy as np
from matplotlib import pyplot as plt
import uuid

In [None]:

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer,Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [None]:
path = r"C:\Users\sachu\Desktop\My_Documents\Research_Project\Computer_Graphics\CAR_TEST\Siamese_Model\50_input_set"#Path to 50 image input training dataset

In [None]:

gpus= tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
#Path to various folders containing negative, positive,and anchor images
POS_PATH = os.path.join(path,'positive')
NEG_PATH = os.path.join(path,'negative')
ANC_PATH = os.path.join(path,'anchor')

In [None]:
#Function for Data augmentation
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data

In [None]:
#for file_name in os.listdir(os.path.join(NEG_PATH)):
    img_path = os.path.join(NEG_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(NEG_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(500)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(500)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(500)

In [None]:
def preprocess(file_path): 
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100,100)) 
    img = img / 255.0
    return img

In [None]:

positives=tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives=tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data=positives.concatenate(negatives)


In [None]:
def preprocess_twin(input_img, validation_img,label):
    return(preprocess(input_img),preprocess(validation_img),label)

In [None]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [None]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data =train_data.prefetch(8)

In [None]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data=test_data.batch(16)
test_data=test_data.prefetch(8)

In [None]:
def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')
    c1= Conv2D(64, (10,10),activation='relu')(inp)            
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)          
    c2= Conv2D(128, (7,7),activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    c3= Conv2D(128, (4,4),activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    return Model(inputs=[inp], outputs=[d1], name='embedding')       

In [None]:
embedding=make_embedding()

In [None]:
embedding.summary()

In [None]:
class L1Dist(Layer):
    def __init__(self,**kwargs):
        super().__init__()
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
input_image = Input(name='input_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))

In [None]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [None]:
siamese_layer = L1Dist()

In [None]:
siamese_layer(inp_embedding, val_embedding)

In [None]:
def make_siamese_model():
    input_image = Input(name='input_img', shape=(100,100,3))
    validation_image = Input(name='validation_img', shape=(100,100,3))
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    classifier = Dense(1,activation='sigmoid')(distances)
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
opt=tf.keras.optimizers.Adam(1e-03)

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [None]:
@tf.function   
def train_step(batch):   
    with tf.GradientTape() as tape:     
        X = batch[:2]
        y=batch[2]
        yhat = siamese_model(X, training=True)  
        loss = binary_cross_loss(y, yhat)
    print(loss)
    grad = tape.gradient(loss, siamese_model.trainable_variables)           
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))    
    return loss
    

In [None]:
from tensorflow.keras.metrics import Precision, Recall

In [None]:
def train(data, EPOCHS):
    losses=[]
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch{}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        r = Recall()
        p = Precision()
        
        for idx, batch in enumerate(data):
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat)
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        losses.append(loss.numpy())
        if epoch % 10 ==0:
            checkpoint.save(file_prefix=checkpoint_prefix)
            
    return losses

In [None]:
EPOCHS = 120

In [None]:
lops=train(train_data,EPOCHS)

In [None]:
plt.figure(figsize=(10,10))
epo = np.arange(1,121)
plt.plot(epo,lops)
plt.show()


In [None]:

siamese_model.save('siamesemodel_CARTEST_50_3.h5')

In [None]:
model = tf.keras.models.load_model('siamesemodel_CARTEST_50_2.h5', custom_objects = {'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy},compile = False)

In [None]:
import random
import shutil

In [None]:
source = 'C:\\Users\\sachu\\Desktop\\My_Documents\\Research_Project\\Computer_Graphics\\Siamese Attempt_on_Kitti_dataset\\data\\positive'
dest = 'C:\\Users\\sachu\\Desktop\\My_Documents\\Research_Project\\Computer_Graphics\\Siamese Attempt_on_Kitti_dataset\\application_data\\verification_images'
files = os.listdir(source)
no_of_files = len(files) // 10

for file_name in random.sample(files, no_of_files):
    shutil.copy(os.path.join(source, file_name), dest)

In [None]:
def verify(model, detection_threshold, verification_threshold):
    ver = []
    y = []
    z =[]
    for img  in os.listdir(os.path.join('application_data','input_images')):
        results = []
        print(img)
        z.append(img)
      
        for image in os.listdir(os.path.join('application_data','verification_images')):
            input_img = preprocess(os.path.join('application_data','input_images',img))
            validation_img = preprocess(os.path.join('application_data','verification_images',image))
            result = model.predict(list(np.expand_dims([input_img, validation_img],axis=1)))
            results.append(result)   
        detection = np.sum(np.array(results) > detection_threshold)
        verification = detection /len(os.listdir(os.path.join('application_data','verification_images')))
        verified  = verification > verification_threshold
        ver.append(verified)
        print(results)
        print(verified)
    print (ver)
    return ver,z
    
    

In [None]:
veri,zi = verify(model,0.9, 0.5)#Alter these 2 detection threshold and verification threshold to find the optimal


In [None]:
print(veri)

In [None]:
print(zi)

In [None]:
for i in range(100):
    if veri[i] == True:
        print(zi[i])
    

In [None]:
for i in range(100):
    if veri[i] == False:
        print(zi[i])

In [None]:
a = [['img1.jpg',False],['img2.jpg',False],['img3.jpg',False],['img4.jpg',False],['img5.jpg',False],['img6.jpg',False],['img7.jpg',False],['img8.jpg',False],['img9.jpg',False],['img10.jpg',False],['img11.jpg',False],['img12.jpg',False],['img13.jpg',False],['img14.jpg',False],['img15.jpg',False],['img16.jpg',False],['img17.jpg',False],['img18.jpg',False],['img19.jpg',False],['img20.jpg',False],['img21.jpg',False],['img22.jpg',False],['img23.jpg',False],['img24.jpg',False],['img25.jpg',False],['img26.jpg',False],['img27.jpg',False],['img28.jpg',False],['img29.jpg',False],['img30.jpg',False],['img31.jpg',False],['img32.jpg',False],['img33.jpg',False],['img34.jpg',False],['img35.jpg',False],['img36.jpg',False],['img37.jpg',False],['img38.jpg',False],['img39.jpg',False],['img40.jpg',False],['img41.jpg',False],['img42.jpg',False],['img43.jpg',False],['img44.jpg',False],['img45.jpg',False],['img46.jpg',False],['img47.jpg',False],['img48.jpg',False],['img49.jpg',False],['img50.jpg',False],['img51.jpg',False],['img52.jpg',False],['img53.jpg',False],['img54.jpg',False],['img55.jpg',False],['img56.jpg',False],['img57.jpg',False],['img58.jpg',False],['img59.jpg',False],['img60.jpg',True],['img61.jpg',True],['img62.jpg',True],['img63.jpg',True],['img64.jpg',True],['img65.jpg',True],['img66.jpg',True],['img67.jpg',True],['img68.jpg',True],['img69.jpg',True],['img70.jpg',True],['img71.jpg',True],['img72.jpg',True],['img73.jpg',True],['img74.jpg',True],['img75.jpg',True],['img76.jpg',True],['img77.jpg',True],['img78.jpg',True],['img79.jpg',True],['img80.jpg',True],['img81.jpg',True],['img82.jpg',True],['img83.jpg',True],['img84.jpg',True],['img85.jpg',True],['img86.jpg',True],['img87.jpg',True],['img88.jpg',True],['img89.jpg',True],['img90.jpg',True],['img91.jpg',True],['img92.jpg',True],['img93.jpg',True],['img94.jpg',True],['img95.jpg',True],['img96.jpg',True],['img97.jpg',True],['img98.jpg',True],['img99.jpg',True],['img100.jpg',True]]
    

In [None]:
m = 0
for l in range(100):
    for i in a:
        if (i[0] == zi[l] and i[1] == veri[l]):
            m= m+1
print(m)
accuracy = m/100
print(accuracy)