In [1]:
import tensorflow as tf
import tensorflow_datasets as tfds

# Load CIFAR-10 dataset as NumPy arrays
(x_train, y_train), (x_test, y_test) = tfds.as_numpy(tfds.load(
    'cifar10',
    split=['train', 'test'],
    batch_size=-1,
    as_supervised=True,
))

# Normalize the images
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0


In [2]:
from tensorflow.keras import layers, Model
from tensorflow.keras.applications import ResNet50

def get_augmenter():
    return tf.keras.Sequential([
        layers.RandomFlip('horizontal'),
        layers.RandomRotation(0.2),
        layers.RandomZoom(0.3),
        layers.RandomContrast(0.1),
    ])

def generate_positive_pairs(images):
    augmenter = get_augmenter()
    augmented_images_1 = augmenter(images)
    augmented_images_2 = augmenter(images)
    return augmented_images_1, augmented_images_2

# Create a TensorFlow dataset
train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(256)

# Apply augmentations to generate positive pairs
train_dataset = train_dataset.map(lambda x: generate_positive_pairs(x), 
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)


In [3]:
def create_simclr_model(input_shape=(32, 32, 3)):
    # Input layers for the two augmented images
    inputs_1 = layers.Input(shape=input_shape)
    inputs_2 = layers.Input(shape=input_shape)

    # Shared base model with ResNet50
    base_model = ResNet50(include_top=False, weights='imagenet', input_shape=input_shape)
    base_model.trainable = False  # Freeze the base model

    # Pass both inputs through the same base model
    features_1 = base_model(inputs_1)
    features_2 = base_model(inputs_2)

    # Global average pooling
    pooled_features_1 = layers.GlobalAveragePooling2D()(features_1)
    pooled_features_2 = layers.GlobalAveragePooling2D()(features_2)

    # Projection head
    projection_1 = layers.Dense(128, activation='relu')(pooled_features_1)
    projection_1 = layers.Dense(128)(projection_1)
    
    projection_2 = layers.Dense(128, activation='relu')(pooled_features_2)
    projection_2 = layers.Dense(128)(projection_2)

    # Final SimCLR model
    simclr_model = Model(inputs=[inputs_1, inputs_2], outputs=[projection_1, projection_2])

    return simclr_model

# Create the SimCLR model
simclr_model = create_simclr_model()
simclr_model.summary()


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 32, 32, 3)]  0           []                               
                                                                                                  
 input_2 (InputLayer)           [(None, 32, 32, 3)]  0           []                               
                                                                                                  
 resnet50 (Functional)          (None, 1, 1, 2048)   23587712    ['input_1[0][0]',                
                                                                  'input_2[0][0]']                
                                                                                                  
 global_average_pooling2d (Glob  (None, 2048)        0           ['resnet50[0][0]']           

In [4]:
def contrastive_loss(projection_1, projection_2, temperature=0.1):
    # Normalize the projections
    projection_1 = tf.math.l2_normalize(projection_1, axis=1)
    projection_2 = tf.math.l2_normalize(projection_2, axis=1)
    
    # Concatenate both projections
    projections = tf.concat([projection_1, projection_2], axis=0)
    
    # Compute the similarity matrix
    similarity_matrix = tf.matmul(projections, projections, transpose_b=True)
    
    # Create the labels for contrastive loss
    batch_size = tf.shape(projection_1)[0]
    labels = tf.concat([tf.range(batch_size), tf.range(batch_size)], axis=0)
    
    # Temperature scaling
    logits = similarity_matrix / temperature
    
    # Compute contrastive loss
    contrastive_labels = tf.one_hot(labels, depth=2*batch_size)
    loss = tf.nn.softmax_cross_entropy_with_logits(contrastive_labels, logits)
    return tf.reduce_mean(loss)

# Custom training step to use the contrastive loss
@tf.function
def train_step(images_1, images_2):
    with tf.GradientTape() as tape:
        projection_1, projection_2 = simclr_model([images_1, images_2], training=True)
        loss = contrastive_loss(projection_1, projection_2)
    
    gradients = tape.gradient(loss, simclr_model.trainable_variables)
    simclr_model.optimizer.apply_gradients(zip(gradients, simclr_model.trainable_variables))
    return loss


In [5]:
# Set up optimizer
simclr_model.optimizer = tf.keras.optimizers.Adam()

# Training loop
for epoch in range(3):  # Adjust the number of epochs as needed
    for images_1, images_2 in train_dataset:
        loss = train_step(images_1, images_2)
    print(f"Epoch {epoch + 1}, Loss: {loss.numpy()}")


Epoch 1, Loss: 4.764829158782959
Epoch 2, Loss: 3.6147899627685547
Epoch 3, Loss: 3.283618211746216


In [6]:
# Function to create the classification model
def create_classification_model(simclr_model, num_classes=10):
    # Take the base model from SimCLR (up to the projection layer)
    base_input_1 = simclr_model.get_layer(index=0).input
    base_input_2 = simclr_model.get_layer(index=1).input
    
    # Use only one of the branches since they are identical
    base_output = simclr_model.get_layer(index=-2).output  # Output before the last projection layer
    
    # Create a new model using the base and add a classification head
    x = layers.Dense(256, activation='relu')(base_output)
    x = layers.Dropout(0.5)(x)
    classification_output = layers.Dense(num_classes, activation='softmax')(x)

    # Build the new model
    classification_model = Model(inputs=base_input_1, outputs=classification_output)

    return classification_model

# Create the classification model
classification_model = create_classification_model(simclr_model)
classification_model.summary()


Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 32, 32, 3)]       0         
                                                                 
 resnet50 (Functional)       (None, 1, 1, 2048)        23587712  
                                                                 
 global_average_pooling2d (G  (None, 2048)             0         
 lobalAveragePooling2D)                                          
                                                                 
 dense (Dense)               (None, 128)               262272    
                                                                 
 dense_1 (Dense)             (None, 128)               16512     
                                                                 
 dense_4 (Dense)             (None, 256)               33024     
                                                           

In [7]:
# Compile the classification model
classification_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
                             loss='sparse_categorical_crossentropy',
                             metrics=['accuracy'])

# Fine-tune the model using the labeled dataset
classification_model.fit(x_train, y_train, batch_size=256, epochs=5, validation_split=0.2)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x11d09b83f70>

In [8]:
# Evaluate the classification model on the test set
test_loss, test_accuracy = classification_model.evaluate(x_test, y_test)

# Print the test accuracy
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


Test Accuracy: 30.58%
