##### Import required libraries

In [None]:
import cv2
import numpy as np
from tqdm import tqdm
import tensorflow as tf
from imutils import paths
from google.colab import drive
import matplotlib.pyplot as plt
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from sklearn.preprocessing import LabelEncoder
tqdm.pandas()
le = LabelEncoder()
np.random.seed(666)
tf.random.set_seed(666)
drive.mount('/content/gdrive')

##### Define Augmentation Class to augment image slices

In [None]:
class AugmentSlices(object):
    def __call__(self, sample):        
        # Random flips
        sample = self._random_apply(tf.image.flip_left_right, sample, p=0.5)
        # Randomly apply transformation (color distortions) with probability p.
        sample = self._random_apply(self._color_jitter, sample, p=0.8)
        sample = self._random_apply(self._color_drop, sample, p=0.2)
        return sample

    def _color_jitter(self, x, s=1):
        # one can also shuffle the order of following augmentations
        # each time they are applied.
        x = tf.image.random_brightness(x, max_delta=0.8*s)
        x = tf.image.random_contrast(x, lower=1-0.8*s, upper=1+0.8*s)
        x = tf.image.random_saturation(x, lower=1-0.8*s, upper=1+0.8*s)
        x = tf.image.random_hue(x, max_delta=0.2*s)
        x = tf.clip_by_value(x, 0, 1)
        return x
    
    def _color_drop(self, x):
        x = tf.image.rgb_to_grayscale(x)
        x = tf.tile(x, [1, 1, 1, 3])
        return x
    
    def _random_apply(self, func, x, p):
        return tf.cond(
          tf.less(tf.random.uniform([], minval=0, maxval=1, dtype=tf.float32),
                  tf.cast(p, tf.float32)),
          lambda: func(x),
          lambda: x)

# Build the augmentation pipeline
data_augmentation = Sequential([Lambda(AugmentSlices())])

##### Implementation of Gaussian Blur 

In [None]:
class GaussianBlur(object):
    def __init__(self, kernel_size, min=0.1, max=2.0):
        self.min = min
        self.max = max
        # kernel size is set to be 10% of the image height/width
        self.kernel_size = kernel_size

    def __call__(self, sample):
        sample = np.array(sample)
        # blur the image with a 50% chance
        prob = np.random.random_sample()
        if prob < 0.5:
            sigma = (self.max - self.min) * np.random.random_sample() + self.min
            sample = cv2.GaussianBlur(sample, (self.kernel_size, self.kernel_size), sigma)
        return sample

##### Image preprocessing utils
* Parse and process image

In [None]:

def parse_images(image_path):
    image_string = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image_string, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, size=[224, 224])
    return image

##### Create Custom TensorFlow dataset

In [None]:
train_images = list(paths.list_images("/content/gdrive/MyDrive/FYP/SSL_BinaryData/train"))
BATCH_SIZE = 16
train_ds = tf.data.Dataset.from_tensor_slices(train_images)
train_ds = (
    train_ds
    .map(parse_images, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    .shuffle(1024)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE)
)

##### Defining the Architecture for the pretext task

In [None]:
def get_resnet_simclr(hidden_1, hidden_2, hidden_3):
    base_model = tf.keras.applications.ResNet50(include_top=False, weights=None, input_shape=(224, 224, 3))
    base_model.trainable = True
    inputs = Input((224, 224, 3))
    h = base_model(inputs, training=True)
    h = GlobalAveragePooling2D()(h)

    projection_1 = Dense(hidden_1)(h)
    projection_1 = Activation("relu")(projection_1)
    projection_2 = Dense(hidden_2)(projection_1)
    projection_2 = Activation("relu")(projection_2)
    projection_3 = Dense(hidden_3)(projection_2)
    resnet_simclr = Model(inputs, projection_3)
    return resnet_simclr

In [None]:
def get_negative_mask(batch_size):
    '''
    Return a mask that removes the similarity score of equal/similar images.
    This function ensures that only distinct pair of images get their similarity scores passed as negative examples
    '''
    negative_mask = np.ones((batch_size, 2 * batch_size), dtype=bool)
    for i in range(batch_size):
        negative_mask[i, i] = 0
        negative_mask[i, i + batch_size] = 0
    return tf.constant(negative_mask)

##### Defining Functions to calculate similarity values between vectors

In [None]:
cosine_sim_1d = tf.keras.losses.CosineSimilarity(axis=1, reduction=tf.keras.losses.Reduction.NONE)
cosine_sim_2d = tf.keras.losses.CosineSimilarity(axis=2, reduction=tf.keras.losses.Reduction.NONE)

# Mask to remove positive examples from the batch of negative samples
negative_mask = get_negative_mask(BATCH_SIZE)

In [None]:
def _cosine_simililarity_dim1(x, y):
    v = cosine_sim_1d(x, y)
    return v

def _cosine_simililarity_dim2(x, y):
    '''
    x shape: (N, 1, C)
    y shape: (1, 2N, C)
    v shape: (N, 2N)
    '''
    v = cosine_sim_2d(tf.expand_dims(x, 1), tf.expand_dims(y, 0))
    return v

def _dot_simililarity_dim1(x, y):
    '''
    x shape: (N, 1, C)
    y shape: (N, C, 1)
    v shape: (N, 1, 1)
    '''
    v = tf.matmul(tf.expand_dims(x, 1), tf.expand_dims(y, 2))
    return v

def _dot_simililarity_dim2(x, y):
    '''
    x shape: (N, 1, C)
    y shape: (1, C, 2N)
    v shape: (N, 2N)
    '''
    v = tf.tensordot(tf.expand_dims(x, 1), tf.expand_dims(tf.transpose(y), 0), axes=2)
    return v

##### Defining the train function

In [None]:
@tf.function
def train_step(xis, xjs, model, optimizer, criterion, temperature):
    with tf.GradientTape() as tape:
        zis = model(xis)
        zjs = model(xjs)
        # normalize projection feature vectors
        zis = tf.math.l2_normalize(zis, axis=1)
        zjs = tf.math.l2_normalize(zjs, axis=1)
        l_pos = _dot_simililarity_dim1(zis, zjs)
        l_pos = tf.reshape(l_pos, (BATCH_SIZE, 1))
        l_pos /= temperature

        negatives = tf.concat([zjs, zis], axis=0)
        loss = 0
        for positives in [zis, zjs]:
            l_neg = _dot_simililarity_dim2(positives, negatives)
            labels = tf.zeros(BATCH_SIZE, dtype=tf.int32)
            l_neg = tf.boolean_mask(l_neg, negative_mask)
            l_neg = tf.reshape(l_neg, (BATCH_SIZE, -1))
            l_neg /= temperature
            logits = tf.concat([l_pos, l_neg], axis=1) 
            loss += criterion(y_pred=logits, y_true=labels)
        loss = loss / (2 * BATCH_SIZE)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss

In [None]:
def train_simclr(model, dataset, optimizer, criterion, temperature = 0.1, epochs = 100):
    step_wise_loss = []
    epoch_wise_loss = []
    for epoch in tqdm(range(epochs)):
        for image_batch in dataset:
            a = data_augmentation(image_batch)
            b = data_augmentation(image_batch)
            loss = train_step(a, b, model, optimizer, criterion, temperature)
            step_wise_loss.append(loss)
        epoch_wise_loss.append(np.mean(step_wise_loss))
        if epoch % 10 == 0:
            print("epoch: {} loss: {:.3f}".format(epoch + 1, np.mean(step_wise_loss)))
    return epoch_wise_loss, model

##### Defining the Hyperparameter settings, loss functions, Decays, Optimizers

In [None]:
criterion = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True, reduction = tf.keras.losses.Reduction.SUM)
decay_steps = 1000
lr_decayed_fn = tf.keras.experimental.CosineDecay(initial_learning_rate = 0.1, decay_steps = decay_steps)
optimizer = tf.keras.optimizers.SGD(lr_decayed_fn)
resnet_simclr_2 = get_resnet_simclr(256, 128, 50)
epoch_wise_loss, resnet_simclr  = train_simclr(resnet_simclr_2, train_ds, optimizer, criterion, temperature = 0.1, epochs = 2)

##### Plotting Training Results

In [None]:
with plt.xkcd():
    plt.plot(epoch_wise_loss)
    plt.title("tau = 0.1, h1 = 256, h2 = 128, h3 = 50")
    plt.show()

##### Storing Model Weights

In [None]:
weights_folder = '/content/gdrive/MyDrive/FYP' + "_resnet_simclr.h5"
resnet_simclr.save_weights(weights_folder)
simclr_weights = resnet_simclr.weights

##### Testing (Downstream Task)

In [None]:
# Train and test image paths
train_images = list(paths.list_images("/content/gdrive/MyDrive/FYP/SSL_BinaryData/train"))
test_images = list(paths.list_images("/content/gdrive/MyDrive/FYP/SSL_BinaryData/test"))
print(len(train_images), len(test_images))

In [None]:
# 10% of the dataset
train_images_10 = np.random.choice(train_images, len(train_images)//10)
len(train_images_10)

In [None]:
def prepare_images(image_paths):
    '''
    Prepare the image for modelling
    '''
    images = []
    labels = []

    for image in tqdm(image_paths):
        image_pixels = tf.io.read_file(image)
        image_pixels = tf.image.decode_jpeg(image_pixels, channels=3)
        image_pixels = tf.image.convert_image_dtype(image_pixels, tf.float32)
        image_pixels = tf.image.resize(image_pixels, size=[224, 224])
        label = image.split("/")[-1].split("_")[0]
        images.append(image_pixels)
        labels.append(label)

    images = np.array(images)
    labels = np.array(labels)
    return images, labels

In [None]:
X_train, y_train = prepare_images(train_images_10)
X_test, y_test = prepare_images(test_images)

##### Label Encoding the labels (abnormal/normal)

In [None]:
y_train_enc = le.fit_transform(y_train)
y_test_enc = le.transform(y_test)

##### Loading Weights of the model from the pre-text task

In [None]:
resnet_simclr = get_resnet_simclr(256, 128, 50)
resnet_simclr.load_weights('/content/20220417-163415_resnet_simclr.h5')
resnet_simclr.summary()

##### Plotting Results

In [None]:
def plot_training(H):
	with plt.xkcd():
		plt.plot(H.history["loss"], label="train_loss")
		plt.plot(H.history["val_loss"], label="val_loss")
		plt.plot(H.history["accuracy"], label="train_acc")
		plt.plot(H.history["val_accuracy"], label="val_acc")
		plt.title("Training Loss and Accuracy")
		plt.xlabel("Epoch #")
		plt.ylabel("Loss/Accuracy")
		plt.legend(loc="lower left")
		plt.show()

In [None]:
def get_linear_model(features):
    '''
    Obtain the Linear layer for Downstream task
    '''
    linear_model = Sequential([Dense(2, input_shape = (features, ), activation = "softmax")])
    return linear_model

In [None]:
resnet_simclr.layers[1].trainable = False
resnet_simclr.summary()

# Encoder model with non-linear projections
projection = Model(resnet_simclr.input, resnet_simclr.layers[-2].output)
# Extract train and test features
train_features = projection.predict(X_train)
test_features = projection.predict(X_test)

##### Training model for Downstream task and Plotting the Train Results

In [None]:
# Early Stopping to prevent overfitting
es = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience = 2, verbose = 2, restore_best_weights=True)
# Linear model
linear_model = get_linear_model(128)
linear_model.compile(
    loss = "sparse_categorical_crossentropy",
    metrics = ["accuracy"],
    optimizer = "adam"
    )
history = linear_model.fit(
    train_features, 
    y_train_enc,
    validation_data=(test_features, y_test_enc),
    batch_size = 64,
    epochs = 35
)
plot_training(history)