<a href="https://colab.research.google.com/github/Lee-Gunju/AI-paper-code-review-for-personal-project/blob/master/Supervised_Contrastive_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install tensorflow-addons


Collecting tensorflow-addons
[?25l  Downloading https://files.pythonhosted.org/packages/66/4b/e893d194e626c24b3df2253066aa418f46a432fdb68250cde14bf9bb0700/tensorflow_addons-0.13.0-cp37-cp37m-manylinux2010_x86_64.whl (679kB)
[K     |▌                               | 10kB 18.8MB/s eta 0:00:01[K     |█                               | 20kB 25.6MB/s eta 0:00:01[K     |█▌                              | 30kB 24.0MB/s eta 0:00:01[K     |██                              | 40kB 17.9MB/s eta 0:00:01[K     |██▍                             | 51kB 8.6MB/s eta 0:00:01[K     |███                             | 61kB 8.8MB/s eta 0:00:01[K     |███▍                            | 71kB 9.2MB/s eta 0:00:01[K     |███▉                            | 81kB 10.2MB/s eta 0:00:01[K     |████▍                           | 92kB 10.6MB/s eta 0:00:01[K     |████▉                           | 102kB 8.2MB/s eta 0:00:01[K     |█████▎                          | 112kB 8.2MB/s eta 0:00:01[K     |█████▉   

In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
num_classes = 10
input_shape = (32, 32, 3)

# Load the train and test data splits
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()

# Display shapes of train and test datasets
print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
x_train shape: (50000, 32, 32, 3) - y_train shape: (50000, 1)
x_test shape: (10000, 32, 32, 3) - y_test shape: (10000, 1)


In [None]:
data_augmentation = keras.Sequential([
                                      layers.experimental.preprocessing.Normalization(),
                                      layers.experimental.preprocessing.RandomFlip('horizontal'),
                                      layers.experimental.preprocessing.RandomRotation(0.02),
                                      layers.experimental.preprocessing.RandomWidth(0.2),
                                      layers.experimental.preprocessing.RandomHeight(0.2)
])



data_augmentation.layers[0].adapt(x_train)

In [None]:
def create_encoder():
  resnet = keras.applications.ResNet50V2(
      include_top = False, weights = None, input_shape = input_shape, pooling='avg'
  )
  inputs = keras.Input(shape = input_shape)
  augmented = data_augmentation(inputs)
  outputs = resnet(augmented)
  model = keras.Model(inputs = inputs, outputs = outputs, name = 'cifar10-encoder')
  return model 

encoder = create_encoder()
encoder.summary()

learning_rate = 0.001
batch_size = 265
hidden_units = 512
projection_units = 128
num_epochs = 50
dropout_rate = 0.5
temperature = 0.05

Model: "cifar10-encoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_2 (InputLayer)         [(None, 32, 32, 3)]       0         
_________________________________________________________________
sequential (Sequential)      (None, None, None, 3)     7         
_________________________________________________________________
resnet50v2 (Functional)      (None, 2048)              23564800  
Total params: 23,564,807
Trainable params: 23,519,360
Non-trainable params: 45,447
_________________________________________________________________


In [None]:
def create_classifier(encoder, trainable=True):
  for layer in encoder.layers:
    layer.trainable = trainable 

  inputs = keras.Input(shape =input_shape)
  features = encoder(inputs)
  features = layers.Dropout(dropout_rate)(features)
  features = layers.Dense(hidden_units, activation='relu')(features)
  features = layers.Dropout(dropout_rate)(features)
  outputs = layers.Dense(num_classes, activation='softmax')(features)

  model = keras.Model(inputs = inputs, outputs = outputs, name = 'cifar10-classifier')
  model.compile(optimizer = keras.optimizers.Adam(learning_rate),
                loss = keras.losses.SparseCategoricalCrossentropy(),
                metrics = [keras.metrics.SparseCategoricalAccuracy()])
  
  return model 

In [None]:
endoer = create_encoder()
classifier = create_classifier(encoder)
classifier.summary()

history = classifier.fit(x =x_train, y=y_train, batch_size= batch_size, epochs = num_epochs)

accuracy = classifier.evaluate(x_test, y_test)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Model: "cifar10-classifier"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_5 (InputLayer)         [(None, 32, 32, 3)]       0         
_________________________________________________________________
cifar10-encoder (Functional) (None, 2048)              23564807  
_________________________________________________________________
dropout (Dropout)            (None, 2048)              0         
_________________________________________________________________
dense (Dense)                (None, 512)               1049088   
_________________________________________________________________
dropout_1 (Dropout)          (None, 512)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 10)                5130      
Total params: 24,619,025
Trainable params: 24,573,578
Non-trainable params: 45,447
_______________________________

KeyboardInterrupt: ignored

In [None]:
class SupervisedContrastiveLoss(keras.losses.Loss):
  def __init__(self, temperature = 1, name = None):
    super(SupervisedContrastiveLoss, self).__init__(name = name)
    self.temperature = temperature

  def __call__(self, labels, feature_vectors, sample_weight = None):
    # Normalize feature vectors
    feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis =1)
    #Compute logits
    logits = tf.divide(tf.matmul(feature_vectors_normalized, tf.transpose(feature_vectors_normalized)), self.temperature)
    return tfa.losses.npairs_loss(tf.squeeze(labels), logits)



def add_projection_head(encoder):
  inputs = keras.Input(shape = input_shape)
  features = encoder(inputs)
  outputs = layers.Dense(projection_units, activation='relu')(features)
  model = keras.Model(inputs = inputs, outputs = outputs, name = 'cifar-encoder_with_projection-head')
  return model 

In [None]:
encoder = create_encoder()

encoder_with_projection_head = add_projection_head(encoder)
encoder_with_projection_head.compile(optimizer = keras.optimizers.Adam(learning_rate),
                                     loss= SupervisedContrastiveLoss(temperature))

encoder_with_projection_head.summary()

history = encoder_with_projection_head.fit(x=x_train, y=y_train, batch_size = batch_size, epochs= num_epochs)

Model: "cifar-encoder_with_projection-head"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_8 (InputLayer)         [(None, 32, 32, 3)]       0         
_________________________________________________________________
cifar10-encoder (Functional) (None, 2048)              23564807  
_________________________________________________________________
dense_2 (Dense)              (None, 128)               262272    
Total params: 23,827,079
Trainable params: 23,781,632
Non-trainable params: 45,447
_________________________________________________________________
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
  8/189 [>.............................] - ETA: 25s - loss: 4.7224

KeyboardInterrupt: ignored

In [None]:
classifier = create_classifier(encoder, trainable = False)

history = classifier.fit(x=x_train, y=y_train, batch_size = batch_size, epochs = num_epochs)

accuracy = classifier.evaluate(x_test, y_test)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
Test accuracy: 60.7%
