In [None]:
import math
import numpy as np
import h5py
import matplotlib.pyplot as plt
from matplotlib.pyplot import imread
import os
from PIL import Image
import pandas as pd
import tensorflow as tf
from tensorflow.python.framework import ops
import imageio.v3 as iio


from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tqdm import tqdm
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, BatchNormalization, Dense
from tensorflow.keras.layers import Flatten, Conv2D, MaxPooling2D, Dropout, concatenate
#from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras.applications import resnet

In [None]:
data_loc = "E:/yrq64132/Documents/MATLAB/DeepLearning/Recognition_Dataset"
images_A = os.listdir(data_loc + "/Train/Image_A")
images_P = os.listdir(data_loc + "/Train/Image_P")
images_N = os.listdir(data_loc + "/Train/Image_N")
#labels = os.listdir(data_loc + "/Train/Labels")
resolution = 200
image_shape = (resolution, resolution)

X_train_A_orig = list()
X_train_P_orig = list()
X_train_N_orig = list()
#Y_train_orig = list()
n_train_images = np.shape(images_A)[0]

for i in tqdm(range(np.shape(images_A)[0])):
    X_train_A_orig.append(iio.imread(data_loc + "/Train/Image_A/" + images_A[i]))
    X_train_P_orig.append(iio.imread(data_loc + "/Train/Image_P/" + images_P[i]))
    X_train_N_orig.append(iio.imread(data_loc + "/Train/Image_N/" + images_N[i]))
    #Y_train_orig.append(pd.read_csv(data_loc + "/Train/Labels/" + labels[i], header = None))

In [None]:
X_train_A = np.reshape(X_train_A_orig,[n_train_images, resolution, resolution, 1]).astype(np.float32)
X_train_P = np.reshape(X_train_P_orig,[n_train_images, resolution, resolution, 1]).astype(np.float32)
X_train_N = np.reshape(X_train_N_orig,[n_train_images, resolution, resolution, 1]).astype(np.float32)

In [None]:
image_augmenter = ImageDataGenerator(rotation_range=6,width_shift_range=15.,height_shift_range=15.,zoom_range=[0.9,1.1])
X_train_A = image_augmenter.flow(X_train_A,shuffle=False,batch_size=n_train_images)[0]
X_train_P = image_augmenter.flow(X_train_P,shuffle=False,batch_size=n_train_images)[0]
X_train_N = image_augmenter.flow(X_train_N,shuffle=False,batch_size=n_train_images)[0]

In [None]:
images_A = os.listdir(data_loc + "/Test/Image_A")
images_P = os.listdir(data_loc + "/Test/Image_P")
images_N = os.listdir(data_loc + "/Test/Image_N")
labels = os.listdir(data_loc + "/Test/Labels")

X_test_A_orig = list()
X_test_P_orig = list()
X_test_N_orig = list()
n_test_images = np.shape(images_A)[0]

#Y_test_orig = list()
for i in tqdm(range(n_test_images)):
    X_test_A_orig.append(iio.imread(data_loc + "/Test/Image_A/" + images_A[i]))
    X_test_P_orig.append(iio.imread(data_loc + "/Test/Image_P/" + images_P[i]))
    X_test_N_orig.append(iio.imread(data_loc + "/Test/Image_N/" + images_N[i]))
    #Y_test_orig.append(pd.read_csv(data_loc + "/Test/Labels/" + labels[i], header = None))
    
    
X_test_A = np.reshape(X_test_A_orig,[n_test_images, resolution, resolution, 1]).astype(np.float32)
X_test_P = np.reshape(X_test_P_orig,[n_test_images, resolution, resolution, 1]).astype(np.float32)
X_test_N = np.reshape(X_test_N_orig,[n_test_images, resolution, resolution, 1]).astype(np.float32)
#X_train_P = np.reshape(X_train_P_orig,[36000, 200, 200, 1]).astype(np.float32)

test_augmented_images = image_augmenter.flow(X_test_P,shuffle=False,batch_size=n_test_images)
X_test_A = image_augmenter.flow(X_test_A,shuffle=False,batch_size=n_test_images)[0]
X_test_P = image_augmenter.flow(X_test_P,shuffle=False,batch_size=n_test_images)[0]
X_test_N = image_augmenter.flow(X_test_N,shuffle=False,batch_size=n_test_images)[0]

In [None]:
anchor_train_dataset = tf.data.Dataset.from_tensor_slices(X_train_A)
positive_train_dataset = tf.data.Dataset.from_tensor_slices(X_train_P)
negative_train_dataset = tf.data.Dataset.from_tensor_slices(X_train_N)

train_dataset = tf.data.Dataset.zip((anchor_train_dataset, positive_train_dataset, negative_train_dataset))
train_dataset = train_dataset.batch(64, drop_remainder = False)
train_dataset = train_dataset.prefetch(8)

In [None]:
anchor_test_dataset = tf.data.Dataset.from_tensor_slices(X_test_A)
positive_test_dataset = tf.data.Dataset.from_tensor_slices(X_test_P)
negative_test_dataset = tf.data.Dataset.from_tensor_slices(X_test_N)

test_dataset = tf.data.Dataset.zip((anchor_test_dataset, positive_test_dataset, negative_test_dataset))
test_dataset = test_dataset.batch(64, drop_remainder = False)
test_dataset = test_dataset.prefetch(8)

In [None]:
def visualize(anchor, positive, negative):
    """Visualize a few triplets from the supplied batches."""

    def show(ax, image):
        ax.imshow(image)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    fig = plt.figure(figsize=(9, 9))

    axs = fig.subplots(3, 3)
    for i in range(3):
        show(axs[i, 0], anchor[i])
        show(axs[i, 1], positive[i])
        show(axs[i, 2], negative[i])


visualize(*list(train_dataset.take(1).as_numpy_iterator())[0])

In [None]:
def conv_block(inputs, n_filters, k_size, stride, pad = "same",max_pool = False):
    X = Conv2D(n_filters, k_size, stride, activation = "relu",padding = pad)(inputs)
    X = BatchNormalization(axis=-1)(X)
    if max_pool:
        X = MaxPooling2D(pool_size=(4, 4),padding = "same")(X)
    
    return X
    

input_img = tf.keras.Input(shape = image_shape + (1,))
conv1 = conv_block(input_img,16,5,1)
conv2 = conv_block(conv1,16,5,1)
conv3 = conv_block(conv2,16,5,2)
conv4 = conv_block(conv3,32,5,1)
conv5 = conv_block(conv4,32,5,1)
conv6 = conv_block(conv5,32,5,2)
conv7 = conv_block(conv6,64,5,1)
conv8 = conv_block(conv7,64,5,1)
conv9 = conv_block(conv8,64,5,2)
conv10 = conv_block(conv9,128,5,1)
conv11 = conv_block(conv10,128,5,1)
conv12 = conv_block(conv11,128,5,2)
conv13 = conv_block(conv12,256,5,1)
conv14 = conv_block(conv13,256,5,1)
conv15 = conv_block(conv14,256,5,2)
conv16 = conv_block(conv15,512,5,1)
conv17 = conv_block(conv16,512,5,1)
conv18 = conv_block(conv17,512,5,2)


flat5 = Flatten()(conv18)

fc6 = Dense(1000,activation = "relu")(flat5)
drop6 = Dropout(0.5)(fc6)
merge6 = concatenate([drop6,flat5])
fc7 = Dense(256,activation = "relu")(merge6)
drop7 = Dropout(0.5)(fc7)
merge7 = concatenate([drop7,merge6])
fc8 = Dense(128,activation = None)(merge7)

embedding = tf.keras.Model(inputs = input_img, outputs = fc8, name="Embedding")
embedding.summary()

In [None]:
tf.keras.utils.plot_model(embedding,show_shapes=True)

In [None]:
class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        positive_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        negative_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (positive_distance, negative_distance)


anchor_input = layers.Input(name="anchor", shape=image_shape + (1,))
positive_input = layers.Input(name="positive", shape=image_shape + (1,))
negative_input = layers.Input(name="negative", shape=image_shape + (1,))

distances = DistanceLayer()(
    embedding(anchor_input),
    embedding(positive_input),
    embedding(negative_input),
)

siamese_network = Model(inputs=[anchor_input, positive_input, negative_input], outputs=distances)
siamese_network.summary()

In [None]:
tf.keras.utils.plot_model(siamese_network,show_shapes=True)

In [None]:
class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=100):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")
        self.ap_d_tracker = metrics.Mean(name="ap_d")
        self.an_d_tracker = metrics.Mean(name="an_d")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss,ap_d,an_d = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        self.ap_d_tracker.update_state(ap_d)
        self.an_d_tracker.update_state(an_d)
        return {"loss": self.loss_tracker.result(), "ap_d": self.ap_d_tracker.result(), "an_d": self.an_d_tracker.result()}

    def test_step(self, data):
        loss,ap_d,an_d = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss,ap_distance,an_distance
    

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker,self.ap_d_tracker,self.an_d_tracker]

In [None]:
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001),weighted_metrics = [siamese_model.metrics])

In [None]:
history = siamese_model.fit(train_dataset, epochs=4, validation_data = test_dataset)

In [None]:
sample = next(iter(train_dataset))
anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    embedding(anchor),
    embedding(positive),
    embedding(negative),
)

cosine_similarity = metrics.CosineSimilarity()
i = 1

positive_similarity = cosine_similarity(anchor_embedding[i], positive_embedding[i])
print("Positive similarity: %.10f" % positive_similarity.numpy())


negative_similarity = cosine_similarity(anchor_embedding[i], negative_embedding[i])
print("Negative similarity: %.10f" % negative_similarity.numpy())

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
history2 = siamese_model.fit(train_dataset, epochs=10, validation_data = test_dataset)

In [None]:
save_loc = "C:/Users/yrq64132/OneDrive - Science and Technology Facilities Council/Documents/Python Scripts/"
embedding.save_weights('encoder.h5',save_loc)