In [1]:
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt

In [2]:
# import tensorflow dependencies
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [3]:
# Avoid OOM errors by seetting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU') # All GPUs
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [4]:
# Setup paths and Make directories
POS_PATH = os.path.join('data','positive')
NEG_PATH = os.path.join('data','negative')
ANC_PATH = os.path.join('data','anchor')
#os.makedirs(POS_PATH)
#os.makedirs(NEG_PATH)
#os.makedirs(ANC_PATH)

In [7]:
#!tar -xf lfw.tgz

In [None]:
# Move images to negative repo
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw',directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

In [5]:
import uuid

In [9]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    # to 250x 250
    frame = frame[120:120+250,200:200+250,:]

    # Collect anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Create unique file path
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
        
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Create unique file path
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
        
    #show image on to the screen
    cv2.imshow('Camera', frame)
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [6]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(400)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(400)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(400)

In [7]:
dir_test = anchor.as_numpy_iterator()
dir_test.next()

b'data\\anchor\\1ec1e54b-6d48-11ef-9a54-089798a32039.jpg'

In [8]:
def preprogress(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100, 100))
    img = img / 255.0
    return img

In [11]:
img = preprogress('data\\anchor\\1101d7af-6d48-11ef-93b9-089798a32039.jpg')

In [None]:
plt.imshow(img)

In [None]:
dataset.map(preprogress)

In [12]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [13]:
samples = data.as_numpy_iterator()

In [14]:
example = samples.next()

In [15]:
def preprogress_twin(input_img, validation_img, label):
    return(preprogress(input_img), preprogress(validation_img), label)

In [16]:
preprogress_twin(*example)

(<tf.Tensor: shape=(100, 100, 3), dtype=float32, numpy=
 array([[[0.3502451 , 0.34632352, 0.2757353 ],
         [0.3394608 , 0.33161765, 0.2747549 ],
         [0.32058823, 0.31862745, 0.27058825],
         ...,
         [0.22328432, 0.2625    , 0.26740196],
         [0.24558823, 0.28480393, 0.29264706],
         [0.26544118, 0.3007353 , 0.32034314]],
 
        [[0.35882354, 0.34411764, 0.28039217],
         [0.3389706 , 0.32647058, 0.26985294],
         [0.3156863 , 0.30759802, 0.25465685],
         ...,
         [0.22254902, 0.2615196 , 0.26789215],
         [0.24338235, 0.28161764, 0.29240197],
         [0.25906864, 0.29730392, 0.30808824]],
 
        [[0.35      , 0.32647058, 0.26568627],
         [0.3362745 , 0.3245098 , 0.25980392],
         [0.3247549 , 0.32083333, 0.24436274],
         ...,
         [0.22696078, 0.2637255 , 0.2884804 ],
         [0.24068627, 0.28186274, 0.3       ],
         [0.26078433, 0.3019608 , 0.31862745]],
 
        ...,
 
        [[0.85539216, 0.86715686

In [17]:
data = data.map(preprogress_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [18]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [19]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*3))
test_Data = test_data.batch(16)
test_Data = test_data.prefetch(8)

In [21]:
inp = Input(shape=(100,100,3), name='input_image')
c1 = Conv2D(64, (10,10), activation='relu')(inp)

In [22]:
c1

<KerasTensor: shape=(None, 91, 91, 64) dtype=float32 (created by layer 'conv2d')>

In [25]:
def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')
    
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [26]:
model = make_embedding()

In [None]:
model.summary()

In [30]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [31]:
L1 = L1Dist()

In [32]:
L1

<__main__.L1Dist at 0x19f06fef8e0>

In [33]:
L1(anchor_embedding, validation_embedding)

NameError: name 'anchor_embedding' is not defined