In [2]:
from tensorflow.keras.datasets import mnist
import numpy as np
from resnet import ResNet18
import tensorflow.keras as keras
import tensorflow as tf
from resnet_fsl_aytin import *

# load dataset
(X_train, Y_train), (X_test, Y_test) = mnist.load_data()
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')

Y_train = Y_train.astype('float32')
Y_test = Y_test.astype('float32')

In [3]:
def create_triplets(x, y, num_triplets):
    triplets = []
    for _ in range(num_triplets):
        # Choose a random anchor index
        idx_a = np.random.randint(0, len(y))
        
        # Get positive and negative indices for this anchor 
        pos_idx = np.where(y == y[idx_a])[0]
        neg_idx = np.where(y != y[idx_a])[0]
        
        # Select a random positive and negative index
        idx_p = np.random.choice(pos_idx)
        idx_n = np.random.choice(neg_idx)

        triplet = (x[idx_a], x[idx_p], x[idx_n])
        triplets.append(triplet)
    
    return np.array(triplets)

triplets = create_triplets(X_train, Y_train, 10)

In [4]:
from resnet_fsl_aytin import TripletModel
from resnet_fsl_aytin import ResNet18

embedding = ResNet18()

# Create a model
model = TripletModel(embedding)

# Compile the model
model.compile(optimizer=keras.optimizers.Adam(0.001), loss=triplet_loss)

# Fit the model
model.fit(triplets.reshape(-1, 28, 28, 1), np.zeros((len(triplets), 1)), epochs=10, batch_size=1)


Epoch 1/10


ValueError: Exception encountered when calling MyConv2D.call().

[1mInput 0 of layer "conv2d" is incompatible with the layer: expected min_ndim=4, found ndim=2. Full shape received: (28, 28)[0m

Arguments received by MyConv2D.call():
  • in_x=tf.Tensor(shape=(28, 28), dtype=float32)
  • training=True

In [5]:
from tensorflow.keras.applications import resnet
from tensorflow.keras import layers
from tensorflow import keras
import tensorflow as tf

def get_embedding_module(imageSize):
    # construct the input layer and pass the inputs through a
    # pre-processing layer
    inputs = keras.Input(imageSize + (3,))
    x = resnet.preprocess_input(inputs)
    
    # fetch the pre-trained resnet 50 model and freeze the weights
    baseCnn = resnet.ResNet50(weights="imagenet", include_top=False)
    baseCnn.trainable=False
    
    # pass the pre-processed inputs through the base cnn and get the
    # extracted features from the inputs
    extractedFeatures = baseCnn(x)
    # pass the extracted features through a number of trainable layers
    x = layers.GlobalAveragePooling2D()(extractedFeatures)
    x = layers.Dense(units=1024, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(units=512, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(units=256, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    outputs = layers.Dense(units=128)(x)
    # build the embedding model and return it
    embedding = keras.Model(inputs, outputs, name="embedding")
    return embedding


def get_siamese_network(imageSize, embeddingModel):
    # build the anchor, positive and negative input layer
    anchorInput = keras.Input(name="anchor", shape=imageSize + (3,))
    positiveInput = keras.Input(name="positive", shape=imageSize + (3,))
    negativeInput = keras.Input(name="negative", shape=imageSize + (3,))
    # embed the anchor, positive and negative images
    anchorEmbedding = embeddingModel(anchorInput)
    positiveEmbedding = embeddingModel(positiveInput)
    negativeEmbedding = embeddingModel(negativeInput)
    # build the siamese network and return it
    siamese_network = keras.Model(
        inputs=[anchorInput, positiveInput, negativeInput],
        outputs=[anchorEmbedding, positiveEmbedding, negativeEmbedding]
    )
    return siamese_network

class SiameseModel(keras.Model):
    def __init__(self, siameseNetwork, margin, lossTracker):
        super().__init__()
        self.siameseNetwork = siameseNetwork
        self.margin = margin
        self.lossTracker = lossTracker
    def _compute_distance(self, inputs):
        print(inputs)
        (anchor, positive, negative) = inputs

        # embed the images using the siamese network
        embeddings = self.siameseNetwork((anchor, positive, negative))
        anchorEmbedding = embeddings[0]
        positiveEmbedding = embeddings[1]
        negativeEmbedding = embeddings[2]

        # calculate the anchor to positive and negative distance
        apDistance = tf.reduce_sum(
            tf.square(anchorEmbedding - positiveEmbedding), axis=-1
        )
        anDistance = tf.reduce_sum(
            tf.square(anchorEmbedding - negativeEmbedding), axis=-1
        )
        
        # return the distances
        return (apDistance, anDistance)
    def _compute_loss(self, apDistance, anDistance):
        loss = apDistance - anDistance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss
    def call(self, inputs):
        # compute the distance between the anchor and positive,
        # negative images
        (apDistance, anDistance) = self._compute_distance(inputs)
        return (apDistance, anDistance)
    def train_step(self, inputs):
        with tf.GradientTape() as tape:
            # compute the distance between the anchor and positive,
            # negative images
            (apDistance, anDistance) = self._compute_distance(inputs)
            # calculate the loss of the siamese network
            loss = self._compute_loss(apDistance, anDistance)
        # compute the gradients and optimize the model
        gradients = tape.gradient(
            loss,
            self.siameseNetwork.trainable_variables)
        self.optimizer.apply_gradients(
            zip(gradients, self.siameseNetwork.trainable_variables)
        )
        # update the metrics and return the loss
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}
    def test_step(self, inputs):
        # compute the distance between the anchor and positive,
        # negative images
        (apDistance, anDistance) = self._compute_distance(inputs)
        # calculate the loss of the siamese network
        loss = self._compute_loss(apDistance, anDistance)
        
        # update the metrics and return the loss
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}
    @property
    def metrics(self):
        return [self.lossTracker]
    

# # initialize the image size and the margin
# imageSize = (28, 28)

# # get the embedding model
# embeddingModel = get_embedding_module(imageSize)

# # get the siamese network
# siameseNetwork = get_siamese_network(imageSize, embeddingModel)

# # initialize the margin and the loss tracker
# margin = 1
# lossTracker = tf.metrics.Mean(name="loss")

# # build the siamese model
# model = SiameseModel(siameseNetwork, margin, lossTracker)

# # compile the model
# model.compile(optimizer=keras.optimizers.Adam(0.001))

# # train the model
# for _ in range(10):
#     model.train_step(triplets[_])
# # model.train_step(triplets)


In [11]:

(x_train, y_train), (x_test, y_test) = mnist.load_data()
train_triplets = create_triplets(x_train, y_train, num_triplets=200)
test_triplets = create_triplets(x_test, y_test, num_triplets=10)

# Prepare the embedding model and siamese network
embedding_model = get_embedding_module((28, 28))
siamese_network = get_siamese_network((28, 28), embedding_model)

# Compile and train the SiameseModel
siamese_model = SiameseModel(siamese_network, margin=1, lossTracker=tf.metrics.Mean())
siamese_model.compile(optimizer=keras.optimizers.Adam())

# Prepare the data
anchor_train = train_triplets[:, 0]
positive_train = train_triplets[:, 1]
negative_train = train_triplets[:, 2]
train_data = (anchor_train, positive_train, negative_train)

anchor_test = test_triplets[:, 0]
positive_test = test_triplets[:, 1]
negative_test = test_triplets[:, 2]
test_data = (anchor_test, positive_test, negative_test)
print(len(train_data))

# send only 1 data at a time 
for _ in range(10):
    siamese_model.train_step(train_data[_])


# Train the model
history = siamese_model.fit(
    train_data,
    epochs=10,
    batch_size=32,
    validation_data=test_data
)


3
[[[0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  ...
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]]

 [[0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  ...
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]]

 [[0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  ...
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]]

 ...

 [[0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  ...
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]]

 [[0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  ...
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]]

 [[0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  ...
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]]]


ValueError: too many values to unpack (expected 3)