In [2]:
pip install --upgrade pip

Note: you may need to restart the kernel to use updated packages.


In [3]:
!pip install tensorflow opencv-python matplotlib



In [1]:
# Import standard dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [2]:
# Import tensorflow dependencies
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [6]:
# GPU Restriction Code, would need it if working on a GPU based device

# Avoid OOM errors by setting GPU Memory Consumption Growth

gpus = tf.config.experimental.list_physical_devices('GPU')

if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    for gpu in gpus:
        print(gpu)
else:
    print("No GPUs available.")


No GPUs available.


In [6]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')


In [8]:
# Make the Directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

FileExistsError: [Errno 17] File exists: 'data/positive'

In [None]:
# Download Dataset from http://vis-www.cs.umass.edu/lfw/lfw.tgz

In [None]:
# Move LFW Images to the following repository data/negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

In [9]:
# Import uuid library to generate unique image names
import uuid

In [None]:
 # Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    # Cut down frame to 250*250px
    frame = frame[100:100+250,580:580+250,:]

    # Collect anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Create unique file path
        imgName = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgName, frame)

    
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Create unique file path
        imgName = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out positive image
        cv2.imwrite(imgName, frame) 
        
    # Show image back to screen
    cv2.imshow('Image Collection', frame)

    # Breaking
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

In [52]:
plt.imshow(frame)

NameError: name 'frame' is not defined

In [7]:

# use '\' if using in Windows
# '/' for MacOS 
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(50)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(50)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(50)

In [8]:
dir_test = anchor.as_numpy_iterator()

In [9]:
dir_test.next()

b'data/anchor/2f2d98b2-c449-11ee-920e-7e3f4eddb86b.jpg'

In [10]:
 def preprocess(file_path):
     byte_img = tf.io.read_file(file_path)
     img = tf.io.decode_jpeg(byte_img)
     img = tf.image.resize(img, (100,100))
     img = img / 255.0
     return img

In [54]:
img = preprocess('data/anchor/2e6c60de-c449-11ee-920e-7e3f4eddb86b.jpg')

In [56]:
img.numpy().max()

0.9115196

In [None]:
dataset.map(preprocessor)

In [None]:
(anchor, positive) => 1,1,1,1,1

tf.ones_like([1,1,1,4.35235, 2352.5])

In [14]:
positives = tf.data.Dataset.zip(anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor))))
negatives = tf.data.Dataset.zip(anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor))))
data = positives.concatenate(negatives)

In [19]:
sample = data.as_numpy_iterator()

In [22]:
example = sample.next()

In [21]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [23]:
res = preprocess_twin(*example)

In [58]:
res[2]

1.0

In [26]:
# Build Dataloader Pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [47]:
# Training Partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [48]:
train_samples = train_data.as_numpy_iterator()

In [49]:
train_sample = train_samples.next()

In [51]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)