In [7]:
from tensorflow.keras import layers, models, losses, optimizers, initializers
import tensorflow as tf
import numpy as np

In [25]:
def load_and_preprocess_data():
    # loading the data as train and test
    data = np.load('../../data/electron-photon-pairs.npz', allow_pickle=True)
    pairs_train = data["pairs_train"]
    labels_train = data["labels_train"]
    pairs_test = data["pairs_test"]
    labels_test = data["labels_test"]
    return pairs_train, labels_train, pairs_test, labels_test

def create_data_augmentation_layer():
    return tf.keras.Sequential([
        tf.keras.layers.RandomFlip("horizontal"),
        tf.keras.layers.RandomRotation(0.2),
        tf.keras.layers.RandomZoom(0.2),
        tf.keras.layers.RandomContrast(0.2),
    ])

# Base model from classical nb, adding qlayer in between
def create_base_model(input_shape):
    model = models.Sequential()
    model.add(layers.Input(shape=input_shape))
    model.add(layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer=initializers.HeNormal())) # Conv layer 1
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.01))) # Conv layer 2
    model.add(layers.Dropout((0.5)))
    model.add(layers.Flatten())
    model.add(layers.Dense(64, activation='leaky_relu',kernel_regularizer=tf.keras.regularizers.l2(0.01)))
    return model
    

class SiameseModel(tf.keras.Model):
    def __init__(self, input_shape):
        super(SiameseModel, self).__init__()
        self.base_model = create_base_model(input_shape)
        self.data_augmentation = create_data_augmentation_layer()

    def call(self, inputs):
        img1, img2 = inputs
        img1 = self.data_augmentation(img1)
        img2 = self.data_augmentation(img2)
        feat1 = self.base_model(img1)
        feat2 = self.base_model(img2)
        distance = layers.Lambda(lambda embeddings: tf.sqrt(tf.reduce_sum(tf.square(embeddings[0] - embeddings[1]), axis=-1)), output_shape=(1,)
                            )([feat1, feat2])
    
        model = models.Model([img1, img2], distance)
        return model

def info_nce_loss(temperature=0.1):
    def loss(features1,features2):
        batch_size = tf.shape(features1)[0]
        labels = tf.range(batch_size)
            
        features1_norm = tf.math.l2_normalize(features1, axis=1, epsilon=tf.cast(1e-12, tf.float32))
        features2_norm = tf.math.l2_normalize(features2, axis=1, epsilon=tf.cast(1e-12, tf.float32))
    
            
        logits = tf.matmul(features1_norm, features2_norm, transpose_b=True) / temperature
         
        loss = tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
        return tf.reduce_mean(loss)
    return loss

In [26]:
# Load and preprocess your data
train_images, train_labels, test_images, test_labels = load_and_preprocess_data()

# Define the model
input_shape = train_images.shape[2:]
siamese_model = SiameseModel(input_shape)

siamese_model.compile(loss=info_nce_loss(), optimizer=optimizers.Adam(learning_rate=1e-3))

history = siamese_model.fit([train_images[:, 0], train_images[:, 1]], train_labels,
    validation_data=([test_images[:, 0], test_images[:, 1]], test_labels),
    epochs=10,
    batch_size=5000,
    # callbacks=[cp_callback]
)



# # Compile the model
# optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# @tf.function
# def train_step(images, labels):
#     with tf.GradientTape() as tape:
#         features1, features2 = siamese_model((images[:0], images[:1]), training=True)
#         loss = info_nce_loss(features1, features2)
    
#     gradients = tape.gradient(loss, siamese_model.trainable_variables)
#     optimizer.apply_gradients(zip(gradients, siamese_model.trainable_variables))
    
#     return loss

# # Training loop
# num_epochs = 10
# batch_size = 32

# for epoch in range(num_epochs):
#     print(f"Epoch {epoch+1}/{num_epochs}")
    
#     for i in range(0, len(train_images), batch_size):
#         batch_images = train_images[i:i+batch_size]
#         batch_labels = train_labels[i:i+batch_size]
        
#         loss = train_step(batch_images, batch_labels)
        
#         if i % 100 == 0:
#             print(f"  Step {i//batch_size}: loss = {loss:.4f}")

# After training, you can use the base_model for feature extraction or fine-tuning

Epoch 1/10


TypeError: in user code:

    File "C:\Users\Sanya Nanda\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\src\engine\training.py", line 1401, in train_function  *
        return step_function(self, iterator)
    File "C:\Users\Sanya Nanda\AppData\Local\Temp\ipykernel_138732\3549115620.py", line 52, in loss  *
        features1_norm = tf.math.l2_normalize(features1, axis=1, epsilon=tf.cast(1e-12, tf.float32))

    TypeError: Input 'y' of 'Maximum' Op has type float32 that does not match type int32 of argument 'x'.


$\mathcal{L}{\text{InfoNCE}} = -\log \frac{\exp(sim(f_i, f_j^+) / \tau)}{\sum{k=1}^N \exp(sim(f_i, f_k) / \tau)}$

$F(\rho_1, \rho_2) = |\langle \psi_1 | \psi_2 \rangle|^2$
where $|\psi_1\rangle$ and $|\psi_2\rangle$

$\mathcal{L}{\text{total}} = \alpha \mathcal{L}{\text{InfoNCE}} + (1 - \alpha) (1 - F(\rho_1, \rho_2))$