<a href="https://colab.research.google.com/github/allll-dev/AI/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: ignored

In [None]:
#import tensorflow dependecies - functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer,Conv2D,Dense,MaxPooling2D,Input,Flatten
import tensorflow as tf


In [None]:
#set gpu Growth 
#Avoid OOM errors by setting GPU Memory Condumption Growth 
gpus=tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)

In [None]:
#create folder structure 
POS_PATH=os.path.join('data','positive')
NEG_PATH=os.path.join('data','negative')
ANC_PATH=os.path.join('data','anchor')

In [14]:
len(gpus)

1

In [None]:
#Make directories 
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

In [None]:
#Uncompress Tar GZ Labelled Faces in the Wild DataSet 
!tar -xf lfw.tgz

In [None]:
#Move LFW Images to the following  repository data/negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw',directory)):
        EX_PATH=os.path.join('lfw',directory,file)
        NEW_PATH=os.path.join(NEG_PATH,file)
        os.replace(EX_PATH,NEW_PATH)

In [None]:
#import uuid lib to generate unique image names
import uuid

In [None]:
uuid.uuid1()

In [None]:
#Establish a connection to the webcam
cap=cv2.VideoCapture(0)
while cap.isOpened():
    ret,frame=cap.read()
    frame=frame[120:120+250,200:200+250,:]
    #collect anchors
    if cv2.waitKey(1) & 0xFF == ord('a'):
        #create the unique file path 
        imgname=os.path.join(ANC_PATH,'{}.jpg'.format(uuid.uuid1()))
       #write out anchor
        cv2.imwrite(imgname,frame)
        
    #collect positives
    if cv2.waitKey(1) & 0xFF == ord('p'):
         #create the unique file path 
        imgname=os.path.join(POS_PATH,'{}.jpg'.format(uuid.uuid1()))
       #write out anchor
        cv2.imwrite(imgname,frame)
        
    #Show image back to screen
    cv2.imshow('Image Collection',frame)
    #Breaking gracefully
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
#Release the webcam 
cap.release()
#Close the image show frame
cv2.destroyAllWindows()

In [None]:
anchor=tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive=tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative=tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [None]:
dir_test=anchor.as_numpy_iterator()


In [None]:
def preprocess(file_path):
    #read image from file path
    byte_img=tf.io.read_file(file_path)
    img=tf.io.decode_jpeg(byte_img)
    img=tf.image.resize(img,(100,100))
    img=img/255.0
    return img

In [None]:
dataset.map(preprocess)

In [None]:
#create Labelled Dataset
#(anchor,positive)=>1,1,1,1,1
#(anchor,negative)=>0,0,0,0,0

In [None]:

positives=tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives=tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data=positives.concatenate(negatives)

In [None]:
samples=data.as_numpy_iterator()

In [None]:
example=samples.next()

In [None]:
 #build train and test partition
def preprocess_twin(input_img,validation_img,label):
    return(preprocess(input_img),preprocess(validation_img),label)

In [None]:
res=preprocess_twin(*example)

In [None]:
#build dataloader pipline
data=data.map(preprocess_twin)
data=data.cache() 
data=data.shuffle(buffer_size=1024)

In [None]:
  samples=data.as_numpy_iterator()

In [None]:
samples.next()

In [None]:
#Training partition
train_data=data.take(round(len(data)*.7))
train_data=train_data.batch(16)
train_data=train_data.prefetch(8)


In [None]:
train_samples=train_data.as_numpy_iterator()

In [None]:
train_sample=train_samples.next()

In [None]:
#testing partition
test_data=data.skip(round(len(data)*.7))
test_data=test_data.take(round(len(data)*.3))
test_data=test_data.batch(16)
test_data=test_data.prefetch(8)

In [None]:
#build embedded layer 
def make_embedding():
    inp=Input(shape=(100,100,3),name='input_image')
    #First block
    c1=Conv2D(64,(10,10),activation='relu')(inp)
    m1=MaxPooling2D(64,(2,2),padding='same')(c1)
    
    #Second block
    c2=Conv2D(128,(7,7),activation='relu')(m1)
    m2=MaxPooling2D(64,(2,2),padding='same')(c2)
    
    #third block
    c3=Conv2D(128,(4,4),activation='relu')(m2)
    m3=MaxPooling2D(64,(2,2),padding='same')(c3)
    
    #final embedding block
    c4=Conv2D(256,(4,4),activation='relu')(m3)
    f1=Flatten()(c4)
    d1=Dense(4096,activation='sigmoid')(f1)
    return Model(inputs=[inp],outputs=[d1],name='embedding')

In [None]:
embedding=make_embedding()

In [None]:
embedding.summary()

In [None]:
#Build distance layer
class L1Dist(Layer):
    #Init method - inheritance
     def __init__(self,**kwargs):
            super().__init__()
            
     #Magic happens here - similarity calc
     def call(self,input_embedding,validation_embedding):
        return tf.math.abs(input_embedding-validation_embedding)

In [None]:
#Make Siamese Model 
def make_siamese_model():
    
    #Anchor image input in the network
    input_image=Input(name='input_img',shape=(100,100,3))
    
    #Validation image input in the network
    validation_image=Input(name='validation_image',shape=(100,100,3))
    
    #combine siamese distance components
    siamese_layer=L1Dist()
    siamese_layer._name='distance'
    distances=siamese_layer(embedding(input_image),embedding(validation_image))
     #classification layer
    classifier=Dense(1,activation='sigmoid')(distances)
    return Model([input_image,validation_image],outputs=classifier,name='SiameseNetwork')

In [None]:
siamese_model=make_siamese_model()

In [None]:
#Training 
#setup loss and optimizer
binary_cross_loss=tf.losses.BinaryCrossentropy()   

In [None]:
opt=tf.keras.optimizers.Adam(1e-4)#0.0001

In [None]:
#Establish checkpoints
checkpoint_dir='./training_checkpoints'
checkpoint_prefix=os.path.join(checkpoint_dir,'ckpt')
checkpoint=tf.train.Checkpoint(opt=opt,siamese_model=siamese_model)

In [None]:
 #Build train step function
@tf.function
def train_step(batch):
    #Record all of our operations
    with tf.GradientTape() as tape:
        #Get anchor and positive/negative image
        X=batch[:2]
        #Get label
        y=batch[2]
        #Forward pass
        yhat=siamese_model(X,training=True)
        #Calculate loss
        loss=binary_cross_loss(y,yhat)
    #Calculate gradients
    grad=tape.gradient(loss,siamese_model.trainable_variables)
        
    #Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad,siamese_model.trainable_variables))
    
    return loss    

In [None]:
#Build training loop
def train(data,EPOCHS):
    #loop through epochs
    for epoch in range(1,EPOCHS+1):
        print('\n EPOCH {}/{}'.format(epoch,EPOCHS))
        progbar=tf.keras.utils.Progbar(len(data))
        
        #loop through each batch
        for idx,batch in enumerate(data):
            #Run train step here
            train_step(batch)
            progbar.update(idx+1)
            
         #Save checkpoints
        if epoch%10==0:
            checkpoint.save(file_prefixx=checkpoint_prefix)

In [None]:
EPOCHS=50

In [None]:
#Train the model
train(train_data,EPOCHS)