In [228]:
import cv2
import os
import numpy as np
import matplotlib.pyplot as plt
import imageio.v3 as iio
import sklearn
from sklearn import preprocessing
from sklearn.decomposition import NMF
import random
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten, DepthwiseConv2D

In [229]:
#FUNCTIONS    
def preprocess(file_path):
    
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (128,128))
    # Scale image to be between 0 and 1 
    # img = img.numpy()
    img = img / 255.0

    # H, W, C = img.shape                      # (100, 100, 3) 255.0     # non-negative
    # X = img.reshape(H, W * C)    
    #             # → (100, 300)   2-D, non-negative
    # nmf = NMF(n_components=60, init="nndsvda", max_iter=200)
    # Ww = nmf.fit_transform(X)                 # (100, 50)
    # Hmat = nmf.components_                   # (50, 300)
 
    # # low-rank reconstruction back to image
    # img_lowrank = Ww @ Hmat
    # img_lowrank = img_lowrank.reshape(H, W, C) 
    # img_back = tf.convert_to_tensor(img_lowrank, dtype=tf.float32)   # ► NumPy → Tensor

    # Return image
    return img

def flip(image):
    image

    flipped=[]
    for r in range(len(image[:])):
        row=[]
        for i in range(len(image[r])):
            row.append(image[-r][-i])
        flipped.append(row)
    return flipped

def reduce(image,amount):

    reduced=[]

    #we pick out only a select number of the pixels from the image.


    row_list=np.linspace(0,len(image[:]),int((len(image[:]))/ amount ))
    row_indexes=[]
    for i in range(len(row_list)):
        row_indexes.append(int(row_list[i]))


    column_list=np.linspace(0,len(image[0]),int((len(image[0]))/ amount ))
    column_indexes=[]
    for i in range(len(column_list)):
        column_indexes.append(int(column_list[i]))


    for r in range(len(image[:])):
    
        if r in row_indexes:
        
            row=[]
            for i in range(len(image[r])):
                if i in column_indexes:
                    row.append(image[r][i])

    
            reduced.append(row)
    return reduced


DATASET=os.listdir('DATASET/train')

SET=[]
for j in range(len(DATASET)):
    person=DATASET[j]
    folder=os.listdir('DATASET/train/' + person)

    image_list= []
    for i in range(len(folder)):
        image_list.append(preprocess('DATASET/train/' + person + '/' + folder[i]))
    SET.append(image_list)

In [230]:
# SET LOFT FOR GPU
#We want to avoid out-of-memory errors by creating a sealing to GPU usage

gpus = tf.config.experimental.list_physical_devices('GPU') #defining/naming my gpu in python
 
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True) #setting a loft on it so it doesn't go crazy with memory, leading to oom errors

for directory in os.list.dir('DATASET/train'):
    for file in os.listdir('DATASET/train', directory):
        EX_PATH=os.path.join('DATASET/train',directory, file)
        NEW_PATH=os.path.join(NEG_PATH,file)
        os.replace(EX_PATH,NEW_PATH)

In [231]:
def make_pairs(class_to_imgs, n_pos_per_class=100, n_neg_total=10000, seed=None):
    """
    Build two lists:
        pairs  – list of (path1, path2)
        labels – 1 for positive (same person), 0 for negative
    Args
        class_to_imgs : dict  {class_name: [path, path, ...]}
        n_pos_per_class : how many positive pairs to draw *for each* class
        n_neg_total     : how many negative pairs to draw overall
    """
    rng = random.Random(seed)
    classes = list(class_to_imgs.keys())

    posval, posanch = [], []
    negval, neganch = [], []

    # positives
    for cls, imgs in class_to_imgs.items():
        if len(imgs) < 2:
            continue
        for _ in range(n_pos_per_class):
            a, b = rng.sample(imgs, 2)
            posval.append(a)
            posanch.append(b)
            

    # negatives
    for _ in range(n_neg_total):
        c1, c2 = rng.sample(classes, 2)           # two different people
        negval.append(rng.choice(class_to_imgs[c1]))
        neganch.append(rng.choice(class_to_imgs[c2]))
        

    return tf.data.Dataset.from_tensor_slices(posval), tf.data.Dataset.from_tensor_slices(negval), tf.data.Dataset.from_tensor_slices(posanch), tf.data.Dataset.from_tensor_slices(neganch)

def collect_images(root_dir):
    """
    Return {class_name: [img_path, img_path, …]} for every sub‑folder in root_dir.
    Accepts JPG/JPEG/PNG files only.
    """
    data = {}
    exts = ('.jpg', '.jpeg', '.png')
    
    for cls in os.listdir(root_dir):
        cls_path = os.path.join(root_dir, cls)
        if not os.path.isdir(cls_path):
            continue
        
        imgs = [
            os.path.join(cls_path, f)
            for f in os.listdir(cls_path)
            if f.lower().endswith(exts)
        ]
        
        if imgs:                     # keep folders that have at least one image
            data[cls] = imgs
    
    return data

Gathering the paths of the pairs to feed the training loop

In [232]:
path_collection = collect_images('/Users/hn/Documents/DTU/V25/Imaging/Project/1/train')
posval, negval, posanch, neganch = make_pairs(path_collection, seed=123, n_neg_total=1, n_pos_per_class=1)

positives = tf.data.Dataset.zip((posval,neganch, tf.data.Dataset.from_tensor_slices(tf.ones(len(posval)))))
negative = tf.data.Dataset.zip((negval, neganch, tf.data.Dataset.from_tensor_slices(tf.ones(len(negval)))))
data = positives.concatenate(negative)



**Model Structure**

Embedding layer

In [233]:
def make_embedding(): 
    inp = Input(shape=(128,128,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='PReLU')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='PReLU')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='PReLU')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='PReLU')(m3)
    f1 = Flatten()(c4)
    #c_out = DepthwiseConv2D(kernel_size=(H, W), use_bias=False)(feature_map) We can test this here

    d1 = Dense(4096, activation='sigmoid')(f1)
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

embedding = make_embedding()


class L1Dist(Layer):
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)


**Siamese layer**

In [234]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(128,128,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(128,128,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

siamese_model = make_siamese_model()
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         87520384    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [235]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

#checkpoint set
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)


In [236]:

@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

In [237]:
from tensorflow.keras.metrics import Precision, Recall

In [238]:
#Training loop
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [239]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [241]:
train(train_data, 3)


 Epoch 1/3
0.6452886 1.0 1.0

 Epoch 2/3
0.5317935 1.0 1.0

 Epoch 3/3
0.27402255 1.0 1.0
