## import dependency

In [3]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
import random

In [4]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer , Input, Dense,MaxPooling2D, Flatten,Conv2D
import tensorflow as tf

In [5]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)    
    

In [6]:
POS_PATH=os.path.join('data','positive')
NEG_PATH=os.path.join('data','negative')
ANC_PATH=os.path.join('data','anchor')

## collecting negative data

In [None]:
import os

# Define the path to the directory where you want to move the files
NEG_PATH = 'data/negative'

# Ensure the NEG_PATH directory exists
if not os.path.exists(NEG_PATH):
    os.makedirs(NEG_PATH)

# Iterate over the directories and files in the 'lfw' directory
for directory in os.listdir('lfw'):
    dir_path = os.path.join('lfw', directory)
    if os.path.isdir(dir_path):  # Check if it is indeed a directory
        for file in os.listdir(dir_path):
            EX_PATH = os.path.join(dir_path, file)
            NEW_PATH = os.path.join(NEG_PATH, file)

            # Print paths to debug
            print(f'Moving {EX_PATH} to {NEW_PATH}')

            os.replace(EX_PATH, NEW_PATH)


## collecting positive and anchore data

In [8]:
import uuid

In [None]:
import cv2

# Initialize the video capture object with the correct device index
cap = cv2.VideoCapture(0)  # Try different indices if necessary (0, 1, 2, etc.)

if not cap.isOpened():
    print("Error: Could not open video device.")
else:
    print("Video device opened successfully.")
    while cap.isOpened():
        ret, frame = cap.read()
        frame=frame[120:120+250,200:200+250,:]
        ## collecting anchore
        if cv2.waitKey(1) & 0xFF == ord('a'):
            imgname=os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
            cv2.imwrite(imgname,frame)
        ## collecting positive
        if cv2.waitKey(1) & 0xFF == ord('p'):
            imgname=os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
            cv2.imwrite(imgname,frame)
        if not ret:
            print("Error: Failed to capture image.")
            break
        # Check if the frame is empty
        if frame is None or frame.size == 0:
            continue
        cv2.imshow('Image Collection', frame)
        # Break the loop on 'q' key press
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release the webcam and close all OpenCV windows
    cap.release()
    cv2.destroyAllWindows()


Video device opened successfully.


In [None]:
plt.imshow(frame[:,:,:])

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [None]:
dir_test = anchor.as_numpy_iterator()

In [None]:
print(dir_test.next())

In [None]:
def preprocess(file_path):

    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)

    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1 
    img = img / 255.0

    # Return image
    return img

In [None]:
img = preprocess('data\\anchor\\ac0a02c6-37bb-11ef-8c9a-fc349794977f.jpg')

In [None]:
img.numpy().max()

In [None]:
plt.imshow(img)

In [None]:
# (anchore , positive ) => 1,1,1,1,1
# (anchore , negative )=> 0,0,0,0,0

In [None]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
data

In [None]:
samples = data.as_numpy_iterator()
exampple = samples.next()

In [None]:
exampple

In [None]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [None]:
res = preprocess_twin(*exampple)


In [None]:
plt.imshow(res[1])

In [None]:
res[2]

## Build dataloader pipline

In [None]:
# Build dataloader pipeline
data = data.map(preprocess_twin)

In [None]:
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
data

In [None]:
samples=data.as_numpy_iterator()

In [None]:
len(samples.next())

In [None]:
samp=samples.next()

In [None]:
plt.imshow(samp[1])

In [None]:
plt.imshow(samp[0])

In [None]:
samp[2]

In [None]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# model engineering

In [None]:
# inp=Input(shape=(100,100,3),name='input_image')
# #first block
# c1=Conv2D(64,(10,10),activation='relu')(inp)
# m1=MaxPooling2D(64,(2,2),padding='same')(c1)
# # second block
# c2=Conv2D(128,(7,7),activation='relu')(m1)
# m2=MaxPooling2D(64,(2,2),padding='same')(c2)
# # third Block
# c3=Conv2D(128,(4,4),activation='relu')(m2)
# m3=MaxPooling2D(64,(2,2),padding='same')(c3)
# # final block
# c4=Conv2D(256,(4,4),activation='relu')(m3)
# f1=Flatten()(c4)
# d1=Dense(4096,activation='sigmoid')(f1)
# mod=Model(inputs=[inp],outputs=[d1],name='embedding')
# mod.summary()

In [None]:
def make_embedding():
    inp=Input(shape=(100,100,3),name='input_image')
    #first block
    c1=Conv2D(64,(10,10),activation='relu')(inp)
    m1=MaxPooling2D(64,(2,2),padding='same')(c1)
    # second block
    c2=Conv2D(128,(7,7),activation='relu')(m1)
    m2=MaxPooling2D(64,(2,2),padding='same')(c2)
    # third Block
    c3=Conv2D(128,(4,4),activation='relu')(m2)
    m3=MaxPooling2D(64,(2,2),padding='same')(c3)
    # final block
    c4=Conv2D(256,(4,4),activation='relu')(m3)
    f1=Flatten()(c4)
    d1=Dense(4096,activation='sigmoid')(f1)
        
    return Model(inputs=[inp],outputs=[d1],name='embedding')

In [None]:
embedding=make_embedding()
embedding.summary()

In [None]:
class L1Dist(Layer):
    def __init__(self,**kwargs):
        super().__init__()
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding-validation_embedding)

In [None]:
l1=L1Dist()

In [None]:
def make_siamese_model():
    input_image=Input(name='input_image',shape=(100,100,3))
    validation_input=Input(name='validation_input',shape=(100,100,3))
    siamese_layer=L1Dist()
    siamese_layer._name='distance'
    distance=siamese_layer(embedding(input_image),embedding(validation_input))
    classifier=Dense(1,activation='sigmoid')(distance)
    return Model(inputs=[input_image,validation_input],outputs=classifier,name='SiameseNetwork')
    

In [None]:
siamese_Model=make_siamese_model()
siamese_Model.summary()

# Training

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_Model)

In [None]:
test_batch = train_data.as_numpy_iterator()
batch_1 = test_batch.next()
X = batch_1[:2]
y = batch_1[2]
y

In [None]:
@tf.function
def train_step(batch):

    # Record all of our operations 
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]

        # Forward pass
        yhat = siamese_Model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)

    # Calculate gradients
    grad = tape.gradient(loss, siamese_Model.trainable_variables)

    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_Model.trainable_variables))

    # Return loss
    return loss

In [None]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            train_step(batch)
            progbar.update(idx+1)

        # Save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

In [None]:
EPOCHS = 50
train(train_data, EPOCHS)

In [None]:
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())


In [5]:
tf.config.experimental.list_physical_devices('GPU')

[]

In [7]:
import tensorflow.python.client

def get_available_gpus():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos if x.device_type == 'GPU']

print(get_available_gpus())



[]
