# Setup

In [1]:
import numpy as np
from matplotlib import pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf
import os
import random
from tensorflow.keras.metrics import Precision, Recall

In [2]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [3]:
gpus

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [4]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

# Collecting Images

In [5]:
#!wget http://vis-www.cs.umass.edu/lfw/lfw.tgz

--2022-09-11 01:26:31--  http://vis-www.cs.umass.edu/lfw/lfw.tgz
Resolving vis-www.cs.umass.edu (vis-www.cs.umass.edu)... 128.119.244.95
Connecting to vis-www.cs.umass.edu (vis-www.cs.umass.edu)|128.119.244.95|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 180566744 (172M) [application/x-gzip]
Saving to: ‘lfw.tgz’


2022-09-11 01:26:48 (10.7 MB/s) - ‘lfw.tgz’ saved [180566744/180566744]



In [6]:
#!tar -xf lfw.tgz

In [7]:
# Move Pictures to Negative examples if only one image of a person exists
for directory in os.listdir('lfw'):
    if len([file for file in os.listdir(os.path.join('lfw', directory))]) <= 1:
        for file in os.listdir(os.path.join('lfw', directory)):
            EX_PATH = os.path.join('lfw', directory, file)
            NEW_PATH = os.path.join(NEG_PATH, file)
            os.replace(EX_PATH, NEW_PATH)
        os.rmdir(os.path.join('lfw', directory))

In [8]:
# Split other pictures equally into Anchor and Positive
# discard picture if number of pictures is uneven
for directory in os.listdir('lfw')[:100]:
    num_files = len([file for file in os.listdir(os.path.join('lfw', directory))])
    if num_files % 2 != 0:
        os.remove(os.path.join('lfw', directory, os.listdir(os.path.join('lfw', directory))[0]))
    num_files = int(num_files / 2)
    for dest_path in [ANC_PATH, POS_PATH]:
        for file in os.listdir(os.path.join('lfw', directory))[:num_files]:
            EX_PATH = os.path.join('lfw', directory, file)
            NEW_PATH = os.path.join(dest_path, file)
            os.replace(EX_PATH, NEW_PATH)
    os.rmdir(os.path.join('lfw', directory))

# Data Augmentation

In [9]:
def data_aug(img):
    data = []
    for i in range(15):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data

In [10]:
import cv2
import uuid

In [11]:
for src_path in [ANC_PATH, POS_PATH]:    
    for file_name in os.listdir(os.path.join(src_path)):
        if file_name.split('.')[-1] != 'jpg':
            continue
        img_path = os.path.join(src_path, file_name)
        img = cv2.imread(img_path)
        augmented_images = data_aug(img) 

        for image in augmented_images:
            cv2.imwrite(os.path.join(src_path, f'{file_name.split(".")[0]}_{uuid.uuid1()}.jpg'), image.numpy())

# TensorFlow Data Pipeline

In [12]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg', shuffle=False).take(3000)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg', shuffle=False).take(3000)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(3000)

## Preprocessing

In [13]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 105x105x3
    img = tf.image.resize(img, (105,105))
    # Scale image to be between 0 and 1 
    img = img / 255.0

    return img

In [14]:
img = preprocess(negative.as_numpy_iterator().next())

## Create Labels

In [15]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [16]:
bla = data.as_numpy_iterator()
for _ in range(29):
    print(bla.next())

(b'data/anchor/Aaron_Peirsol_0001.jpg', b'data/positive/Aaron_Peirsol_0002.jpg', 1.0)
(b'data/anchor/Aaron_Peirsol_0001_35302ef8-3171-11ed-ab76-0242ac1c0002.jpg', b'data/positive/Aaron_Peirsol_0002_3c696cb6-3171-11ed-ab76-0242ac1c0002.jpg', 1.0)
(b'data/anchor/Aaron_Peirsol_0001_353066ca-3171-11ed-ab76-0242ac1c0002.jpg', b'data/positive/Aaron_Peirsol_0002_3c69a7bc-3171-11ed-ab76-0242ac1c0002.jpg', 1.0)
(b'data/anchor/Aaron_Peirsol_0001_35309e06-3171-11ed-ab76-0242ac1c0002.jpg', b'data/positive/Aaron_Peirsol_0002_3c69e0c4-3171-11ed-ab76-0242ac1c0002.jpg', 1.0)
(b'data/anchor/Aaron_Peirsol_0001_3530d556-3171-11ed-ab76-0242ac1c0002.jpg', b'data/positive/Aaron_Peirsol_0002_3c6a1a94-3171-11ed-ab76-0242ac1c0002.jpg', 1.0)
(b'data/anchor/Aaron_Peirsol_0001_35310bfc-3171-11ed-ab76-0242ac1c0002.jpg', b'data/positive/Aaron_Peirsol_0002_3c6a53b0-3171-11ed-ab76-0242ac1c0002.jpg', 1.0)
(b'data/anchor/Aaron_Peirsol_0001_353140cc-3171-11ed-ab76-0242ac1c0002.jpg', b'data/positive/Aaron_Peirsol_0002_3c

## Train/Test Partitions

In [17]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [18]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=len(data))

In [19]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(4)

In [20]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(4)

# Build Deep Learning Model

In [21]:
def make_embedding():
    inp = Input(shape=(105,105,3), name='input_image')

    conv1 = Conv2D(64, (10,10), activation='relu')(inp)
    mpool1 = MaxPooling2D(64, (2,2), padding='same')(conv1)

    conv2 = Conv2D(128, (7,7), activation='relu')(mpool1)
    mpool2 = MaxPooling2D(64, (2,2), padding='same')(conv2)

    conv3 = Conv2D(128, (4,4), activation='relu')(mpool2)
    mpool3 = MaxPooling2D(64, (2,2), padding='same')(conv3)

    conv4 = Conv2D(256, (4,4), activation='relu')(mpool3)
    flatten = Flatten()(conv4)
    fc1 = Dense(4096, activation='sigmoid')(flatten)
    
    return Model(inputs=[inp], outputs=[fc1], name='embedding')

In [22]:
embedding = make_embedding()

In [23]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 105, 105, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 96, 96, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 48, 48, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 42, 42, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 21, 21, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 18, 18, 128)       26

## Build L1-Distance Layer

In [24]:
class L1Dist(Layer):
    
    def __init__(self, **kwargs):
        super().__init__()
        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [25]:
l1 = L1Dist()

## Build Siamese Model

In [26]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(105,105,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(105,105,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [27]:
siamese_model = make_siamese_model()
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

# Training

## Optimizer and Loss

In [28]:
optim = tf.keras.optimizers.Adam(0.0001)

In [29]:
bce_loss = tf.losses.BinaryCrossentropy()

In [30]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = bce_loss(y, yhat)
    print(loss)
        
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    optim.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    
    return loss

In [31]:
from tensorflow.keras.metrics import Precision, Recall

In [32]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print(f'\n Epoch {epoch}/{EPOCHS}')
        progbar = tf.keras.utils.Progbar(len(data))
        
        r = Recall()
        p = Precision()
        
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        #print(loss.numpy())
        

In [33]:
train(train_data, 50)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
0.37013683 0.65457183 0.8667521

 Epoch 2/50
0.21318305 0.81160116 0.9019712

 Epoch 3/50
0.18969089 0.87291765 0.91608393

 Epoch 4/50
0.13963063 0.9026253 0.9287819

 Epoch 5/50
0.30000708 0.9276938 0.9272556

 Epoch 6/50
0.14820184 0.93762 0.93762

 Epoch 7/50
0.12285134 0.9431873 0.93956834

 Epoch 8/50
0.2381955 0.9561487 0.941784

 Epoch 9/50
0.32800925 0.95014244 0.95014244

 Epoch 10/50
0.12510845 0.9559513 0.9461967

 Epoch 11/50
0.021936757 0.96647507 0.951438

 Epoch 12/50
0.09648712 0.97077143 0.9565628

 Epoch 13/50
0.017644431 0.9649544 0.9621829

 Epoch 14/50
0.13292679 0.96953833 0.9599435

 Epoch 15/50
0.12079202 0.97079605 0.9617359

 Epoch 16/50
0.3344376 0.98343587 0.9701214

 Epoch 17/50
0.01177859 0.97815764 0.96713614

 Epoch 18/50
0.15809852 0.98079693 0.9756447

 Epoch 19/50
0.53689015 0.9844633 0

In [34]:
test_batch = train_data.as_numpy_iterator()

In [35]:
bla = test_batch.next()

In [36]:
out = siamese_model.predict(bla[:2])
out

array([[9.9937940e-01],
       [8.0694944e-01],
       [9.8504817e-01],
       [9.9599224e-01],
       [3.2039650e-02],
       [9.9945956e-01],
       [9.9129969e-01],
       [9.8297358e-01],
       [5.7694603e-08],
       [3.5545064e-13],
       [9.5312548e-01],
       [3.2268730e-13],
       [9.9999988e-01],
       [1.4862908e-12],
       [6.9120574e-08],
       [9.9997067e-01]], dtype=float32)

In [37]:
bce_loss(bla[2], out)

<tf.Tensor: shape=(), dtype=float32, numpy=4.658022>

# Test Model

In [38]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

0.9842873 0.9909605


# Save Model

In [39]:
# Save weights
siamese_model.save('siamesemodelv2.h5')

