In [47]:
import tensorflow as tf
import numpy as np
import tensorflow_hub as hub
import os
import io
from keras.layers import GlobalAveragePooling2D,Dense,Dropout,Input,BatchNormalization,Lambda, Flatten, Dropout,MaxPooling2D 
from keras.models import Model
from keras.optimizers import Adam
import argparse
from tensorflow.keras import Sequential
from keras.utils import plot_model
from keras.layers import RandomRotation,RandomFlip
import keras
import  keras.callbacks as callback
import pandas as pd


In [48]:
InputPath = 'Images/'

In [49]:
def TrainDataset(datapath, state):
    files = os.listdir(datapath)
    file_ = []
    label = []
    if state:
        Files = [file for file in files]
    else:
        Files = [file for file in files]
    for file in Files:
        file_.append(file)
        label.append(file)  # Each file is its own class
    return np.array([file_, label]).T, np.unique(label)

In [50]:
TrainDataSet,TrainClasses=TrainDataset(InputPath,True)
TestDataSet,TestClasses=TrainDataset(InputPath,False)
np.random.shuffle(TrainDataSet)
TrainImagesName,TrainImagesLabel=TrainDataSet[:,0],TrainDataSet[:,1]
TrainClassesCount=len(TrainClasses)

In [51]:
print(f"TrainClassesCount={TrainClassesCount}")
print(f"TrainImagesName={len(TrainImagesName)}")

TrainClassesCount=4
TrainImagesName=4


In [52]:

TestImagesName,TestImagesLabel=TestDataSet[:,0],TestDataSet[:,1]
TestClassesCount=len(TestClasses)
print(f"TestClassesCount= {TestClassesCount}")
print(f"TestImagesName={len(TestImagesName)}")

TestClassesCount= 4
TestImagesName=4


In [53]:
mask=np.zeros(shape=(224,224,3))
mask[:,:,0]=200
mask[:,:,1]=100
mask[:,:,2]=200
mask=tf.cast(mask/255,tf.float32)
FliPer=RandomFlip(mode="horizontal")
Rotater=RandomRotation([-0.125,0.125])
def PreProcessInput(Image,num):
    if num ==0:
        Image=FliPer(Image)
    elif num==1:
        Image= 0.75*Image+0.25*mask
    if num<=2:
         return Rotater(Image)
    else:
         return Image

In [54]:
@tf.function
def load_image(Anchor,Positive,Nagative,State):

    Anchor=tf.io.read_file(Anchor)
    Anchor=tf.image.decode_jpeg(Anchor)
    Anchor = tf.cast(Anchor, tf.float32)
    Anchor = tf.image.resize(Anchor, [224,224], method = tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    ranA=tf.random.uniform(shape=[1],minval=0,maxval=4,dtype=tf.int32)

    Positive=tf.io.read_file(Positive)
    Positive=tf.image.decode_jpeg(Positive)
    Positive = tf.cast(Positive, tf.float32)
    Positive = tf.image.resize(Positive, [224,224], method = tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    ranP=tf.random.uniform(shape=[1],minval=0,maxval=4,dtype=tf.int32)

    Negative=tf.io.read_file(Nagative)
    Negative=tf.image.decode_jpeg(Negative)
    Negative = tf.cast(Negative, tf.float32)
    Negative = tf.image.resize(Negative, [224,224], method = tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    ranN=tf.random.uniform(shape=[1],minval=0,maxval=4,dtype=tf.int32)
    if State:
        Anchor=PreProcessInput(Anchor/255,ranA)
        Positive=PreProcessInput(Positive/255,ranP)
        Negative=PreProcessInput(Negative/255,ranN)
    else:
        Anchor=Anchor/255
        Positive=Positive/255
        Negative=Negative/255

    return Anchor,Positive,Negative

In [55]:
def DatasetTripletsGenerator(State):
    if State:
        np.random.shuffle(TrainDataSet)
        ImagesName=TrainImagesName
        ImagesLabel=TrainImagesLabel
        ClassesCount=TrainClassesCount
        Classes=TrainClasses
    else:
        ImagesName=TestImagesName
        ImagesLabel=TestImagesLabel
        ClassesCount=TestClassesCount
        Classes=TestClasses
        
    TripletList=[]
    for i in range(ClassesCount):
        class_=Classes[i]
        files=list(ImagesName[ImagesLabel==class_])[:15]
        files_num=len(files)
        for index in range(files_num-1):
            for j in range(index+1,files_num):
                ancore=InputPath+class_+files[index]
                positive=InputPath+class_+files[j]
                neg_folder=class_
                while neg_folder== class_:
                    neg_folder=np.random.choice(Classes)
                negative=InputPath+neg_folder+np.random.choice(list(ImagesName[ImagesLabel==neg_folder]))
                yield ancore,positive,negative,State

In [77]:
TrainData=tf.data.Dataset.from_generator(DatasetTripletsGenerator,args=[True],output_types=(tf.string,tf.string,tf.string,tf.bool),output_shapes=((),(),(),()),name="DataLoaderPipeline")
TrainData=TrainData.map(load_image)
TrainData=TrainData.batch(1)
TrainData=TrainData.prefetch(buffer_size=1)
# data=data.cache()
TestData=tf.data.Dataset.from_generator(DatasetTripletsGenerator,args=[False],output_types=(tf.string,tf.string,tf.string,tf.bool),output_shapes=((),(),(),()),name="DataLoaderPipeline")
TestData=TestData.map(load_image).batch(1)

In [78]:
class DistanceLayer(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()
    def call(self,anchor,positive,negative):
        dis_ap=tf.reduce_sum(tf.square(anchor - positive), 1)  ## distance between anchor and positive
        dis_an=tf.reduce_sum(tf.square(anchor - negative), 1)   ## distance between anchor and negative
        return  dis_ap , dis_an

In [96]:
def GetEncoder():
    
    model = tf.keras.applications.MobileNetV3Small(
    alpha=1.0,
    minimalistic=False,
    include_top=False,
    weights='imagenet',
    input_tensor=None,
    pooling=None,
    dropout_rate=0.2,
    classifier_activation='softmax',
    include_preprocessing=True
                                                )   
   
    encode_model =Sequential([
        model,
        Dropout(0.2),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dense(256, activation="relu"),
        Lambda(lambda x: tf.math.l2_normalize(x, axis=1))
    ], name="Encoder")
    return encode_model

In [97]:

def SiameseNetwork(inputshape=(224,224,3)):
    An_input=Input(shape=inputshape)

    Po_input=Input(shape=inputshape)

    Ne_input=Input(shape=inputshape)

    encoder=GetEncoder()

    An_embeding=encoder(An_input)
    Po_embeding=encoder(Po_input)
    Ne_embeding=encoder(Ne_input)


    distanc=DistanceLayer()(An_embeding,Po_embeding,Ne_embeding) #return distance between (A and B) and (A and N)

    return Model(inputs=[An_input,Po_input,Ne_input],outputs=distanc)

In [98]:
siames_net=SiameseNetwork()
siames_net.summary()

  return MobileNetV3(


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v3/weights_mobilenet_v3_small_224_1.0_float_no_top_v2.h5
[1m4334752/4334752[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 0us/step


In [99]:
class SiamesModel(Model):
    def __init__(self,siames_net,DesiredDistance):
        super(SiamesModel, self).__init__()

        self.Model=siames_net
        self.DesiredDistance=DesiredDistance
        self.LossTracker=tf.keras.metrics.Mean(name="Loss")

        self.VALTracker=tf.keras.metrics.Mean(name="VAL")

        self.PmeanTracker=tf.keras.metrics.Mean(name="P_mean")

        self.PmaxTracker=tf.keras.metrics.Mean(name="P_max")

        self.PstdTracker=tf.keras.metrics.Mean(name="P_std")

        self.FARTracker=tf.keras.metrics.Mean(name="FAR")

        self.N_meanTracker=tf.keras.metrics.Mean(name="N_mean")

        self.NstdTracker=tf.keras.metrics.Mean(name="N_std")
        self.NminTracker=tf.keras.metrics.Mean(name="N_min")

    def call(self,data):
        return self.Model(data)

    def train_step(self,data):
        with tf.GradientTape() as Tape:
            AP_distanc,AN_distance=self.Model(data)
            loss=self.TripLoss(AP_distanc,AN_distance)
            gradients=Tape.gradient(loss,self.Model.trainable_weights)
            self.optimizer.apply_gradients(zip(gradients, self.Model.trainable_weights))
        self.DistanceEval(AP_distanc,AN_distance)
        self.LossTracker.update_state(loss)
        return {"VAL":self.VALTracker.result(),
                "P_mean":self.PmeanTracker.result(),
                "P_max":self.PmaxTracker.result(),
                "P_std":self.PstdTracker.result(),
                "FAR":self.FARTracker.result(),
                "N_mean":self.N_meanTracker.result(),
                "N_min":self.NminTracker.result(),
                "N_std":self.NstdTracker.result(),
                "Loss":self.LossTracker.result()}


    def test_step(self, data):
        AP_distanc,AN_distance=self.Model(data)
        loss=self.TripLoss(AP_distanc,AN_distance)
        self.LossTracker.update_state(loss)
        self.DistanceEval(AP_distanc,AN_distance)
        return {"VAL":self.VALTracker.result(),
                "P_mean":self.PmeanTracker.result(),
                "P_max":self.PmaxTracker.result(),
                "P_std":self.PstdTracker.result(),
                "FAR":self.FARTracker.result(),
                "N_mean":self.N_meanTracker.result(),
                "N_min":self.NminTracker.result(),
                "N_std":self.NstdTracker.result(),
                "Loss":self.LossTracker.result()}



    def TripLoss(self,ap_distance,an_distance):
        return 3*tf.reduce_mean(tf.maximum(ap_distance-0.2*self.DesiredDistance,0.0)+tf.maximum(self.DesiredDistance-an_distance, 0.0))


    @property
    def metrics(self):
        return [self.LossTracker,self.VALTracker,self.PmaxTracker,self.PmeanTracker,self.PstdTracker,self.FARTracker,self.N_meanTracker,self.NminTracker,self.NstdTracker]

    def DistanceEval(self,P_distance,N_distance):
        
        P_pred,N_pred=self.TDEvaluation(P_distance,N_distance)
        PCDCount=tf.cast(tf.reduce_sum(P_pred),dtype=tf.int32)
        
        VAL=PCDCount/tf.size(P_pred)
        self.VALTracker.update_state(VAL)

        NCDcount=tf.cast(tf.reduce_sum(N_pred),dtype=tf.int32)
        FAR=1-(NCDcount/tf.size(N_pred))
        self.FARTracker.update_state(FAR)
        P_mean=tf.reduce_mean(P_distance)
        self.PmeanTracker.update_state(P_mean)
        N_mean=tf.reduce_mean(N_distance)
        self.N_meanTracker.update_state(N_mean)
        P_std=tf.math.reduce_std(P_distance)
        self.PstdTracker.update_state(P_std)
        N_std=tf.math.reduce_std(N_distance)
        self.NstdTracker.update_state(N_std)
        P_max=tf.reduce_max(P_distance)
        self.PmaxTracker.update_state(P_max)
        N_min=tf.reduce_min(N_distance)
        self.NminTracker.update_state(N_min)

    def TDEvaluation(self,P_distance,N_distance):
        return tf.cast(P_distance<=self.DesiredDistance,dtype=tf.int8),tf.cast(N_distance>self.DesiredDistance,dtype=tf.int8)
DesiredDistance=1
Optimizer= Adam(learning_rate=1e-3)
Siamesmodel=SiamesModel(siames_net,DesiredDistance)
Siamesmodel.compile(optimizer=Optimizer,weighted_metrics=[])


In [100]:
def LR(epoch,lr):
    if epoch<5:
        return 5e-3
    elif epoch<8:
        return 3e-3
    elif epoch<35:
        return 1e-3
    elif epoch <37:
        return 5e-4
    else :
        return 1e-4

In [101]:
LRcallback=callback.LearningRateScheduler(LR)

In [102]:
history=Siamesmodel.fit(TrainData,validation_data=TestData,epochs=40, callbacks=[LRcallback])
TrainTracker=history.history

Epoch 1/40




ValueError: math domain error