In [10]:
!pip install tensorflow-macos protobuf==3.20.3 wrapt==1.12.1 tensorflow-metal  # Optional for M1/M2 Macs


Collecting protobuf==3.20.3
  Downloading protobuf-3.20.3-py2.py3-none-any.whl.metadata (720 bytes)
Collecting tensorflow-metal
  Downloading tensorflow_metal-1.1.0-cp310-cp310-macosx_12_0_arm64.whl.metadata (1.2 kB)
Downloading protobuf-3.20.3-py2.py3-none-any.whl (162 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m162.1/162.1 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading tensorflow_metal-1.1.0-cp310-cp310-macosx_12_0_arm64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[0mInstalling collected packages: tensorflow-metal, protobuf
  Attempting uninstall: protobuf
    Found existing installation: protobuf 3.20.0
    Uninstalling protobuf-3.20.0:
      Successfully uninstalled protobuf-3.20.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of 

In [1]:
import tensorflow as tf

def data_augmentation(image):
    image = tf.image.random_crop(image, size=[224, 224, 3])
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, max_delta=0.5)
    image = tf.image.random_contrast(image, lower=0.5, upper=1.5)
    return image

In [2]:
import tensorflow as tf
from tensorflow.keras.datasets import cifar10

# Load CIFAR-10 dataset
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Resize images to (224, 224, 3) and normalize to [0, 1] range
def preprocess_image(image):
    image = tf.image.resize(image, [224, 224])  # Resize to match SimCLR input
    image = tf.cast(image, tf.float32) / 255.0  # Normalize
    return image

# Apply two distinct augmentations for SimCLR views
def create_views(image):
    augmented_view_1 = data_augmentation(preprocess_image(image))
    augmented_view_2 = data_augmentation(preprocess_image(image))
    return augmented_view_1, augmented_view_2

# Prepare the dataset with two augmented views per image
def prepare_dataset(x_data, batch_size=64):
    dataset = tf.data.Dataset.from_tensor_slices(x_data)
    dataset = dataset.map(create_views, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

train_dataset = prepare_dataset(x_train)
test_dataset = prepare_dataset(x_test)


In [3]:
train_dataset

<PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None))>

In [4]:
# Check the shape of one batch from the dataset
for batch in train_dataset.take(1):
    view_1, view_2 = batch
    print("View 1 batch shape:", view_1.shape)
    print("View 2 batch shape:", view_2.shape)

View 1 batch shape: (64, 224, 224, 3)
View 2 batch shape: (64, 224, 224, 3)


2024-11-04 15:07:08.448463: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


In [7]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras import layers, models

def create_encoder():
    base_model = ResNet50(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D()
    ])
    return model

In [8]:
def create_projection_head(encoder_output_dim=2048, projection_dim=128):
    return models.Sequential([
        layers.Dense(encoder_output_dim, activation='relu'),
        layers.Dense(projection_dim)
    ])

In [9]:
import tensorflow.keras.backend as K

def nt_xent_loss(batch_size, temperature=0.5):
    def loss_fn(z_i, z_j):
        z = tf.concat([z_i, z_j], axis=0)
        similarity_matrix = K.dot(z, K.transpose(z))
        
        # Create labels for positive pairs
        mask = tf.one_hot(tf.range(batch_size), batch_size * 2)
        mask = tf.concat([mask, mask], axis=0)
        
        # Exclude self-similarity
        logits_mask = 1 - tf.eye(batch_size * 2)
        mask *= logits_mask
        
        # Compute NT-Xent loss
        logits = similarity_matrix / temperature
        exp_logits = tf.exp(logits) * logits_mask
        log_prob = logits - tf.math.log(tf.reduce_sum(exp_logits, axis=1, keepdims=True))
        
        mean_loss = -tf.reduce_sum(mask * log_prob) / (batch_size * 2)
        return mean_loss
    return loss_fn

In [10]:
class SimCLR(tf.keras.Model):
    def __init__(self, encoder, projection_head):
        super(SimCLR, self).__init__()
        self.encoder = encoder
        self.projection_head = projection_head

    def call(self, x):
        h = self.encoder(x)                   # Encoder output
        z = self.projection_head(h)            # Projected output for contrastive loss
        return z


In [11]:
# Instantiate the encoder and projection head
encoder = create_encoder()
projection_head = create_projection_head()

# Create SimCLR model
simclr_model = SimCLR(encoder, projection_head)

# Define optimizer and loss function
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
contrastive_loss_fn = nt_xent_loss(batch_size=64, temperature=0.5)

# Training loop
epochs = 10
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}/{epochs}")
    for step, (view_1, view_2) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            # Pass both views through SimCLR model
            z_i = simclr_model(view_1)
            z_j = simclr_model(view_2)

            # Calculate contrastive loss
            loss = contrastive_loss_fn(z_i, z_j)
        
        # Apply gradients
        gradients = tape.gradient(loss, simclr_model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, simclr_model.trainable_variables))
        
        if step % 100 == 0:
            print(f"Step {step}: Loss = {loss.numpy()}")


Epoch 1/10
Step 0: Loss = nan


KeyboardInterrupt: 