In [9]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow_datasets as tfds
import tensorflow.keras as keras
import sklearn
from sklearn.decomposition import PCA 
from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis as QDA
import cv2
from tensorflow.keras.layers import Input, Lambda, Conv2D,Conv2DTranspose, MaxPooling2D, BatchNormalization, Dense, Flatten, Activation, Dropout
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras import backend as K
from tensorflow.keras import layers
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
%matplotlib inline

In [10]:
(ds_train, ds_test), info = tfds.load('omniglot', split=['train', 'test'], with_info=True)

In [11]:
df_train = tfds.as_dataframe(ds_train, info)
df_test  = tfds.as_dataframe(ds_test, info)

2023-05-31 15:53:22.191874: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_4' with dtype int64 and shape [1]
	 [[{{node Placeholder/_4}}]]
2023-05-31 15:53:22.192197: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_4' with dtype int64 and shape [1]
	 [[{{node Placeholder/_4}}]]
2023-05-31 15:53:25.312340: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_3' with dtype int64 and shape [1]
	 [[{{node

In [12]:
def resize_images(images, size, to_grayscale=True):
    resized_images = []
    for img in images:
        resized_image = cv2.resize(img, (size, size))
        if to_grayscale:
            resized_image= cv2.cvtColor(resized_image, cv2.COLOR_BGR2GRAY)
        resized_images.append(resized_image)
    return np.array(resized_images).astype("float") / 255

def parse_omniglot_dataframe(df, img_size=56, reshape=False):
    images = resize_images(df['image'], img_size)
    if reshape:
        images = images.reshape(-1, img_size * img_size)
    else:
        images.reshape(-1, img_size, img_size)
    labels = df['label'].to_numpy()
    return (images, labels)

In [13]:
def cnn_encoder(w, h, encoding_size):
    return Sequential([
        Conv2D(32, (3, 3), input_shape=(w, h, 1), activation='relu', kernel_regularizer='l2'),
        BatchNormalization(),
        Activation('relu'),
        MaxPooling2D(pool_size=2, strides=(2, 2)),
        Dropout(0.25),

        Conv2D(64, (3, 3), kernel_regularizer='l2'),
        BatchNormalization(),
        Activation('relu'),
        MaxPooling2D(pool_size=2, strides=(2, 2)),
        Dropout(0.25),

        Flatten(),
        
        Dense(encoding_size),
    ])

In [14]:
class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)

In [15]:
def siamese_net_for_triplet_loss(w, h, encoding_size):
    anchor_input = layers.Input(name="anchor", shape=(w, h, 1))
    positive_input = layers.Input(name="positive", shape=(w, h, 1))
    negative_input = layers.Input(name="negative", shape=(w, h, 1))

    encoder = cnn_encoder(w, h, encoding_size)

    distances = DistanceLayer()(
        encoder(anchor_input),
        encoder(positive_input),
        encoder(negative_input),
    )

    siamese_network = Model(
        inputs=[anchor_input, positive_input, negative_input], outputs=distances
    )

    return siamese_network, encoder

In [16]:
class SiameseModel(Model):
    """
    The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=5):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result(), "pdistance": self.siamese_network(data)[0], "ndistance": self.siamese_network(data)[1]}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker]

In [17]:
def get_image_by_label(train_images, train_labels, label):
    return train_images[np.random.choice(np.where(train_labels == label)[0], 1, replace=False)[0]]

def get_triplets(train_images, train_labels, batch_size, w, h):
    triplets = [np.zeros((batch_size, w, h)) for _ in range(3)]
    labels = np.unique(train_labels)
    for i in range(batch_size):
        class1, class2 = np.random.choice(labels, 2, replace=False)
        assert(class1 != class2)
        triplets[0][i] = get_image_by_label(train_images, train_labels, class1)
        triplets[1][i] = get_image_by_label(train_images, train_labels, class1)
        triplets[2][i] = get_image_by_label(train_images, train_labels, class2)
    return triplets

In [18]:
def separate_fewshot(test_images, test_labels, n_shots):
    fewshot_pick = []
    val_pick = []
    for label in np.unique(test_labels):
        for i in np.random.choice(np.where(test_labels == label)[0], n_shots, False):
            fewshot_pick.append(i)
    temp = set(fewshot_pick)
    for i in range(len(test_labels)):
        if not i in temp:
            val_pick.append(i)
    fewshot_images = test_images[fewshot_pick]
    fewshot_labels = test_labels[fewshot_pick]
    val_images = test_images[val_pick]
    val_labels = test_labels[val_pick]
    return fewshot_images, fewshot_labels, val_images, val_labels

In [19]:
def train_fewshot(encoder, n_shots, fewshot_images, fewshot_labels):
    return KNeighborsClassifier(n_neighbors=min(n_shots, 5)).fit(encoder(fewshot_images), fewshot_labels)

def trains_fewshot(encoder, ns_shots, test_images, test_labels, verbose=True):
    accuracies = [None] * len(ns_shots)
    for i, n_shots in enumerate(ns_shots):
        if verbose: print(f'Learning {n_shots}-shot and predicting...')
        fewshot_images, fewshot_labels, val_images, val_labels = separate_fewshot(test_images, test_labels, n_shots)
        classifier = train_fewshot(encoder, n_shots, fewshot_images, fewshot_labels)
        pred = classifier.predict(encoder(val_images))
        accuracies[i] = np.sum(pred == val_labels) / val_labels.shape[0]
        if verbose:
            print(f'Accuracy for {n_shots}-shot: {accuracies[i]}')
    return accuracies

In [20]:
def test_SN_TL(df_train, df_test, margin=1, img_size=56, ns_shots=[1, 3, 5], n_iterations=3000, batch_size=10, encoding_size=32, verbose=True):
    train_images, train_labels = parse_omniglot_dataframe(df_train, img_size)
    test_images, test_labels = parse_omniglot_dataframe(df_test, img_size)

    if verbose: print("======= Siamese network with triplet loss method: Training and evaluating... =======")
    if verbose: print("Learning background...")

    siamese_network, encoder = siamese_net_for_triplet_loss(img_size, img_size, encoding_size)
    siamese_model = SiameseModel(siamese_network, margin=margin)
    siamese_model.compile(optimizer=optimizers.Adam())
    for _ in range(n_iterations):
        siamese_model.fit(get_triplets(train_images, train_labels, batch_size, img_size, img_size))

    accuracies = trains_fewshot(encoder, ns_shots, test_images, test_labels)
    
    if verbose: print("======= Siamese network with triplet loss method: Finished =======")

    return accuracies

In [21]:
img_size = 56
encoding_size = 32

train_images, train_labels = parse_omniglot_dataframe(df_train, img_size)
test_images, test_labels = parse_omniglot_dataframe(df_test, img_size)

In [14]:
siamese_network, encoder = siamese_net_for_triplet_loss(img_size, img_size, encoding_size)
siamese_model = SiameseModel(siamese_network, margin=5)
siamese_model.compile(optimizer=optimizers.Adam())

In [31]:
for _ in range(5000):
    siamese_model.fit(get_triplets(train_images, train_labels, 30, img_size, img_size))

accuracies = trains_fewshot(encoder, [1, 3, 5], test_images, test_labels)





































In [29]:
encoder.save('siamese_network_with_triplet_loss')





INFO:tensorflow:Assets written to: siamese_network_with_triplet_loss/assets


INFO:tensorflow:Assets written to: siamese_network_with_triplet_loss/assets


In [23]:
encoder = keras.saving.load_model('siamese_network_with_triplet_loss')





In [24]:
accuracies = trains_fewshot(encoder, [1, 3, 5], test_images, test_labels)

Learning 1-shot and predicting...


2023-05-31 15:54:20.676724: I tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:424] Loaded cuDNN version 8600
2023-05-31 15:54:21.428802: I tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:637] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.
2023-05-31 15:54:21.499753: W tensorflow/core/kernels/gpu_utils.cc:50] Failed to allocate memory for convolution redzone checking; skipping this check. This is benign and only means that we won't check cudnn for out-of-bounds reads and writes. This message will only be printed once.


Accuracy for 1-shot: 0.3565210446449964
Learning 3-shot and predicting...
Accuracy for 3-shot: 0.39864322056591983
Learning 5-shot and predicting...
Accuracy for 5-shot: 0.501062215477997


In [43]:
matches = total = 0
n_shots = 1
for alphabet in np.unique(df_test['alphabet']):
    ind_alphabet = np.where(df_test['alphabet'] == alphabet)[0]
    labels = test_labels[ind_alphabet]
    images = test_images[ind_alphabet]
    fewshot_images, fewshot_labels, val_images, val_labels = separate_fewshot(images, labels, n_shots)
    classifier = train_fewshot(encoder, n_shots, fewshot_images, fewshot_labels)
    pred = classifier.predict(encoder(val_images))
    matches += np.sum(pred == val_labels)
    total += val_labels.shape[0]
print(matches/total)

0.5059500039932913
