In [None]:
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras import layers, models
from tensorflow.keras.layers import Layer, Input
from tensorflow.keras import backend as K
from sklearn.model_selection import train_test_split
import os
from scipy.io import loadmat
import numpy as np
import pandas as pd

# Constants

In [2]:
IMG_SIZE = 224  # Image size for resizing
IMG_DEEP = 3
BATCH_SIZE = 32
EPOCHS = 10
VALIDATION_SPLIT = 0.2

BASE_IMAGE_ADDRESS = '102flowers'

# Feature Extractor (Resnet50)

In [3]:
def build_feature_extractor():
    base_model = model = ResNet50(weights=None, include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, IMG_DEEP))
    base_model.load_weights('resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5')
    base_model.trainable = False

    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(512, activation='relu')(x)
    x = Dense(256, activation='relu')(x)

    for layer in model.layers[-3:]: 
        layer.trainable = True

    return models.Model(inputs=base_model.input, outputs=x)

feature_extractor = build_feature_extractor()

# Siamese Network

In [4]:
class EuclideanDistance(Layer):
    def __init__(self, **kwargs):
        super(EuclideanDistance, self).__init__(**kwargs)

    def call(self, inputs):
        x1, x2 = inputs
        return K.sqrt(K.sum(K.square(x1 - x2), axis=-1, keepdims=True))

In [5]:
def contrastive_loss(y_true, y_pred, margin=1.0):
    squared_pred = K.square(y_pred)  # D^2
    margin_square = K.square(K.maximum(margin - y_pred, 0))  # (margin - D)^2
    return K.mean((1 - y_true) * squared_pred + y_true * margin_square)

In [6]:
def triplet_loss(margin=0.2):
    def loss(y_true, y_pred):
        anchor, positive, negative = tf.split(y_pred, num_or_size_splits=3, axis=1)
        
        # Compute pairwise distances
        pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1)
        neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1)
        
        # Compute loss
        loss = tf.maximum(pos_dist - neg_dist + margin, 0.0)
        
        return tf.reduce_mean(loss)
    
    return loss

In [7]:
def contrastive_accuracy(y_true, y_pred, threshold=0.5):
    return K.mean(K.equal(y_true, K.cast(y_pred < threshold, y_true.dtype)))

In [8]:
def mean_distance(y_true, y_pred):
    return tf.reduce_mean(y_pred)

In [59]:
def SiameseNet():
    input_a = Input(shape=(IMG_SIZE, IMG_SIZE, IMG_DEEP))
    input_b = Input(shape=(IMG_SIZE, IMG_SIZE, IMG_DEEP))
    feature_a = feature_extractor(input_a)
    feature_b = feature_extractor(input_b)
    distance = EuclideanDistance()([feature_a, feature_b])
    #output = layers.Dense(1, activation='sigmoid')(distance)
    siamese_model = models.Model(inputs=[input_a, input_b], outputs=distance)
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
    siamese_model.compile(loss=contrastive_loss, optimizer=optimizer, metrics=[mean_distance, contrastive_accuracy])
    return siamese_model


siamese_model = SiameseNet()
siamese_model.summary()

# Pairing and Train with DataLoader

In [60]:
images = os.listdir('102flowers')
text_lables = pd.read_csv('Oxford-102_Flower_dataset_labels.csv')
matlabels = loadmat('imagelabels.mat')
labels = matlabels['labels'][0]

In [64]:
class SiameseDataLoader(tf.keras.utils.Sequence):
    def __init__(self, image_paths, labels, batch_size=BATCH_SIZE, img_size=(IMG_SIZE, IMG_SIZE), shuffle=True):
        self.data = image_paths
        self.num_classes = len(image_paths)
        self.labels = labels
        self.batch_size = batch_size
        self.indexes = np.arange(len(image_paths)) 
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):      
        return int(np.ceil(len(self.data) / self.batch_size))
    
    def __getitem__(self, index):
        if index >= self.__len__():
            raise IndexError("Batch index out of range")    
        
        batch_indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        return self._generate_batch(batch_indexes)
    
    def _generate_batch(self, batch_indices):
        try:
            pairs = []
            pair_labels = []
            
            for idx in batch_indices:
                try:
                    current_label = self.labels[idx]
                    image_anchor = self.load_image(self.data[idx])
                
                    # Positive Pair
                    pos_idx = np.random.choice(np.where(self.labels == current_label)[0])
                    pos_image = self.load_image(self.data[pos_idx])
                    pairs.append([image_anchor, pos_image])
                    pair_labels.append(1)

                    # Negative Pair
                    neg_label = self.generate_random_negative_label(current_label, self.num_classes)
                    neg_idx = np.random.choice(np.where(self.labels == neg_label)[0]) 
                    neg_image = self.load_image(self.data[neg_idx])
                    pairs.append([image_anchor, neg_image])
                    pair_labels.append(0)
                except(Exception) as exp:
                    print(exp)
                
            return ((np.array([p[0] for p in pairs]), np.array([p[1] for p in pairs])), np.array(pair_labels).astype(np.float32))
        except(Exception) as exp:
            print(exp)
    
    def on_epoch_end(self):
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
    
    def load_image(self, image_path):
        image_path = os.path.join(BASE_IMAGE_ADDRESS, image_path)
        image = tf.io.read_file(image_path)
        image = tf.image.decode_png(image, channels= IMG_DEEP)
        image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
        image = image / 255.0
        return image
    
    def generate_random_negative_label(self, current_index, upper_limit):
        return np.random.choice(labels[ labels != current_index])

In [65]:

dataloader = SiameseDataLoader(images, labels, batch_size=BATCH_SIZE)

In [None]:
for pair,label in dataloader:
    print(pair[0])
    print(label.shape)

In [None]:
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath="best_model.h5",   
    save_best_only=True, 
    monitor="val_loss",
    mode="min",
    verbose=1
)

In [None]:
siamese_model.fit(
    dataloader,
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    callbacks=[checkpoint_callback]
)