In [None]:
seed = 42

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd()+'/configs/'

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

import numpy as np

np.random.seed(seed)

import logging

import random
random.seed(seed)

# Import tensorflow
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)
print(tf.__version__)

# Import other libraries
import cv2
from tensorflow.keras.applications.mobilenet import preprocess_input
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

from tqdm import tqdm

import copy

#functions
def add_noise(cop):
    imagee = copy.copy(cop)
    var = random.randint(0, 1)
    if var == 1:
        '''Add random noise to an image'''
        imagee = imagee/255
        VARIABILITY = 0.02
        deviation = VARIABILITY
        noise = np.random.normal(0, deviation, imagee.shape)
        imagee += noise
        # np.clip(img, 0., 255.)
        # return imagee*255
    # else:
        imagee = imagee*255
        lista = [10, 20, 25, 30, 35]

        l = random.choice(lista)
        box = np.zeros(l**2).reshape(l, l)
        xx, yy = np.random.randint(0,95-l, size=2)
        for i in range(3):
            imagee[xx:xx+l, yy:yy+l, i] = box

        return imagee
    else:
        return imagee

def orthogonal_rot(cop):
    imagee = copy.copy(cop)
    return np.rot90(imagee, np.random.choice([-1, 0, 1]))



In [None]:
import time
from tensorflow.keras.layers import Dense, Activation,Dropout,Conv2D, MaxPooling2D,BatchNormalization
from tensorflow.keras.optimizers import Adam, Adamax
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras import regularizers
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
class LR_ASK(tf.keras.callbacks.Callback):
    def __init__ (self, model, epochs,  ask_epoch, dwell=True, factor=.75): # initialization of the callback
        super(LR_ASK, self).__init__()
        self.model=model
        self.ask_epoch=ask_epoch
        self.epochs=epochs
        self.ask=True # if True query the user on a specified epoch
        self.lowest_vloss=np.inf
        self.lowest_aloss=np.inf
        self.best_weights=self.model.get_weights() # set best weights to model's initial weights
        self.best_epoch=1
        self.plist=[]
        self.alist=[]
        self.dwell= dwell
        self.factor=factor

    def get_list(self): # define a function to return the list of % validation change
        return self.plist, self.alist
    def on_train_begin(self, logs=None): # this runs on the beginning of training
        if self.ask_epoch == 0:
            print('you set ask_epoch = 0, ask_epoch will be set to 1', flush=True)
            self.ask_epoch=1
        if self.ask_epoch >= self.epochs: # you are running for epochs but ask_epoch>epochs
            print('ask_epoch >= epochs, will train for ', epochs, ' epochs', flush=True)
            self.ask=False # do not query the user
        if self.epochs == 1:
            self.ask=False # running only for 1 epoch so do not query user
        else:
            msg =f'Training will proceed until epoch {ask_epoch} then you will be asked to'
            print(msg )
            msg='enter H to halt training or enter an integer for how many more epochs to run then be asked again'
            print(msg)
            if self.dwell:
                msg='learning rate will be automatically adjusted during training'
                print(msg, (0,255,0))
        self.start_time= time.time() # set the time at which training started

    def on_train_end(self, logs=None):   # runs at the end of training
        msg=f'loading model with weights from epoch {self.best_epoch}'
        print(msg)
        model.set_weights(self.best_weights) # set the weights of the model to the best weights
        tr_duration=time.time() - self.start_time   # determine how long the training cycle lasted
        hours = tr_duration // 3600
        minutes = (tr_duration - (hours * 3600)) // 60
        seconds = tr_duration - ((hours * 3600) + (minutes * 60))
        msg = f'training elapsed time was {str(hours)} hours, {minutes:4.1f} minutes, {seconds:4.2f} seconds)'
        print (msg) # print out training duration time

    def on_epoch_end(self, epoch, logs=None):  # method runs on the end of each epoch
        vloss=logs.get('val_loss')  # get the validation loss for this epoch
        aloss=logs.get('loss')
        if epoch >0:
            deltav = self.lowest_vloss- vloss
            pimprov=(deltav/self.lowest_vloss) * 100
            self.plist.append(pimprov)
            deltaa=self.lowest_aloss-aloss
            aimprov=(deltaa/self.lowest_aloss) * 100
            self.alist.append(aimprov)
        else:
            pimprov=0.0
            aimprov=0.0
        if vloss< self.lowest_vloss:
            self.lowest_vloss=vloss
            self.best_weights=self.model.get_weights() # set best weights to model's initial weights
            self.best_epoch=epoch + 1
            msg=f'\n validation loss of {vloss:7.4f} is {pimprov:7.4f} % below lowest loss, saving weights from epoch {str(epoch + 1):3s} as best weights'
            print(msg) # green foreground
        else: # validation loss increased
            pimprov=abs(pimprov)
            msg=f'\n validation loss of {vloss:7.4f} is {pimprov:7.4f} % above lowest loss of {self.lowest_vloss:7.4f} keeping weights from epoch {str(self.best_epoch)} as best weights'
            print(msg) # yellow foreground
            if self.dwell: # if dwell is True when the validation loss increases the learning rate is automatically reduced and model weights are set to best weights
                lr=float(tf.keras.backend.get_value(self.model.optimizer.lr)) # get the current learning rate
                new_lr=lr * self.factor
                msg=f'learning rate was automatically adjusted from {lr:8.6f} to {new_lr:8.6f}, model weights set to best weights'
                print(msg) # cyan foreground
                tf.keras.backend.set_value(self.model.optimizer.lr, new_lr) # set the learning rate in the optimizer
                model.set_weights(self.best_weights) # set the weights of the model to the best weights

        if aloss< self.lowest_aloss:
            self.lowest_aloss=aloss
        if self.ask: # are the conditions right to query the user?
            if epoch + 1 ==self.ask_epoch: # is this epoch the one for quering the user?
                msg='\n Enter H to end training or  an integer for the number of additional epochs to run then ask again'
                print(msg) # cyan foreground
                ans=input()

                if ans == 'H' or ans =='h' or ans == '0': # quit training for these conditions
                    msg=f'you entered {ans},  Training halted on epoch {epoch+1} due to user input\n'
                    print(msg)
                    self.model.stop_training = True # halt training
                else: # user wants to continue training
                    self.ask_epoch += int(ans)
                    if self.ask_epoch > self.epochs:
                        print('\nYou specified maximum epochs of as ', self.epochs, ' cannot train for ', self.ask_epoch, flush =True)
                    else:
                        msg=f'you entered {ans} Training will continue to epoch {self.ask_epoch}'
                        print(msg) # cyan foreground
                        if self.dwell==False:
                            lr=float(tf.keras.backend.get_value(self.model.optimizer.lr)) # get the current learning rate
                            msg=f'current LR is  {lr:8.6f}  hit enter to keep  this LR or enter a new LR'
                            print(msg) # cyan foreground
                            ans=input(' ')
                            if ans =='':
                                msg=f'keeping current LR of {lr:7.5f}'
                                print(msg) # cyan foreground
                            else:
                                new_lr=float(ans)
                                tf.keras.backend.set_value(self.model.optimizer.lr, new_lr) # set the learning rate in the optimizer
                                msg=f' changing LR to {ans}'
                                print(msg) # cyan foreground

In [None]:


# Load images from the 'items/' folder
temp = np.load('/content/drive/MyDrive/Homework’s/public_data.npz', allow_pickle = True)
img = temp["data"]
label = temp["labels"]
  # Normalize image pixel values to a float range [0, 1]
img = (img).astype(np.float32)

# set 0,1 label
for i in range(len(label)):
    if(label[i] == 'healthy'):
        label[i] = 0
    else:
        label[i] = 1

# cleaning images from trol and shrek
ref_img = img[58]
ref_img2 = img[2150]
c = 0
c2 = 0
rm_indexes = []
rm2_indexes = []
for i in range(0, len(img)):
    deviation = np.mean(np.abs(ref_img - img[i]))
    deviation2 = np.mean(np.abs(ref_img2 - img[i]))
    if(deviation == 0.0):
        #print(i)
        c += 1
        rm_indexes.append(i)
    elif(deviation2 == 0.0):
        c2 += 1
        rm2_indexes.append(i)

clean_img = np.delete(img, rm_indexes + rm2_indexes, axis=0)
clean_label = np.delete(label, rm_indexes + rm2_indexes, axis=0)


#DATA AUGMENTATION TO BALANCE CLASSES!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

from keras.preprocessing.image import ImageDataGenerator
import numpy as np

# Create an instance of ImageDataGenerator
datagen = ImageDataGenerator(
    rotation_range=20,
#     shear_range=[0.3, 0.7],
#     zoom_range=[0.3, 0.9],
    horizontal_flip=True,
    vertical_flip=True,
    # brightness_range=[0, 0.7],
    fill_mode='reflect',
    preprocessing_function = orthogonal_rot
)

# Reshape the numpy array to fit the requirements of the flow method
x = clean_img[clean_label == 1]  # Replace 'your_numpy_array' with your actual numpy array

# Create a generator to augment the images
augmented_images = []
for x_batch in datagen.flow(x, batch_size=1, seed=42, shuffle=False):
    augmented_images.append(x_batch[0])
    if len(augmented_images) >= 1198:
        break

# Convert the list of augmented images to a numpy array
augmented_images_array = np.array(augmented_images)

clean_img = np.append(clean_img, augmented_images_array, axis = 0)
clean_label = np.append(clean_label, np.ones(1198))
print(clean_img.shape)
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!


# Concatenate 'animals' and 'items' arrays along axis 0
X = clean_img
# Create labels: 1 for 'animals', 0 for 'items'
y = clean_label

y = tfk.utils.to_categorical(y, 2) #one hot encodi

# Split data into train_val and test sets
#X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, random_state=seed, test_size=1000, stratify=np.argmax(y,axis=1))
X_train, X_val, y_train, y_val = train_test_split(X, y, random_state=seed, test_size=.15, stratify=np.argmax(y,axis=1))
del X, y, img, clean_img, clean_label, temp


# NEW PART

In [None]:
!wget https://raw.githubusercontent.com/wangz10/contrastive_loss/master/losses.py
!pip install tensorflow_addons
import math
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tqdm.notebook import tqdm
# from wandb.keras import WandbCallback
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
import numpy as np
import losses
import time

tf.random.set_seed(666)
np.random.seed(666)

tfds.disable_progress_bar()

In [None]:
AUTO = tf.data.experimental.AUTOTUNE
HEIGHT = 96
WIDTH = 96

yy_train = np.argmax(y_train, axis = -1)
yy_val = np.argmax(y_val, axis = -1)
train_tf = tf.data.Dataset.from_tensor_slices((X_train, yy_train.astype(int)))#remove astype(int) at the end
validation_tf = tf.data.Dataset.from_tensor_slices((X_val, yy_val.astype(int)))

IMG_SHAPE = 96
BS = 128
AUTO = tf.data.experimental.AUTOTUNE

# Referred from: https://arxiv.org/pdf/2002.05709.pdf (Appendxi A
# corresponding GitHub: https://github.com/google-research/simclr/)
# I did not use all the augmentation policies proposed in the above paper

@tf.function
def custom_augment(image, label):
    # Random flips
    image = random_apply(tf.image.flip_left_right, image, p=0.4)
    image = random_apply(tf.image.flip_up_down, image, p = 0.2)
    image = random_apply(tf.image.rot90, image, p = 0.1)
    image = random_apply(cut_out, image, p = 0.6)
    image = random_apply(aug_shear, image, p = 0.4)
    image = random_apply(crop_img, image, p = 0.2)


    # Randomly apply transformation (color distortions) with probability p.
    # image = random_apply(color_jitter, image, p=0.4)
#     image = random_apply(color_drop, image, p=0.2)
#     image = random_apply(cacca_su_img, image, p = 0.4)
    image = random_apply(tf.image.transpose, image, p = 0.5)
    return (image, label)

@tf.function
def color_jitter(x, s=0.2):
    # one can also shuffle the order of following augmentations
    # each time they are applied.
    x = tf.image.random_brightness(x, max_delta=0.8*s)
    x = tf.image.random_contrast(x, lower=1-0.8*s, upper=1+0.8*s)
    x = tf.image.random_saturation(x, lower=1-0.8*s, upper=1+0.8*s)
    x = tf.image.random_hue(x, max_delta=0.2*s)
    x = tf.clip_by_value(x, 0, 1)
    return x

@tf.function
def color_drop(x):
    x = tf.image.rgb_to_grayscale(x)
    x = tf.tile(x, [1, 1, 3])
    return x

@tf.function
def cut_out(x):
  return data_augment_cutout(x)

def data_augment_cutout(image, min_mask_size=(int(HEIGHT * .1), int(HEIGHT * .1)),
                        max_mask_size=(int(HEIGHT * .125), int(HEIGHT * .125))):
    p_cutout = tf.random.uniform([], 0, 1.0, dtype=tf.float32)

    if p_cutout > .85: # 10~15 cut outs
        n_cutout = tf.random.uniform([], 10, 15, dtype=tf.int32)
        image = random_cutout(image, HEIGHT, WIDTH,
                              min_mask_size=min_mask_size, max_mask_size=max_mask_size, k=n_cutout)
    elif p_cutout > .6: # 5~10 cut outs
        n_cutout = tf.random.uniform([], 5, 10, dtype=tf.int32)
        image = random_cutout(image, HEIGHT, WIDTH,
                              min_mask_size=min_mask_size, max_mask_size=max_mask_size, k=n_cutout)
    elif p_cutout > .25: # 2~5 cut outs
        n_cutout = tf.random.uniform([], 2, 5, dtype=tf.int32)
        image = random_cutout(image, HEIGHT, WIDTH,
                              min_mask_size=min_mask_size, max_mask_size=max_mask_size, k=n_cutout)
    else: # 1 cut out
        image = random_cutout(image, HEIGHT, WIDTH,
                              min_mask_size=min_mask_size, max_mask_size=max_mask_size, k=1)

    return image
def random_cutout(image, height, width, channels=3, min_mask_size=(10, 10), max_mask_size=(80, 80), k=1):
    assert height > min_mask_size[0]
    assert width > min_mask_size[1]
    assert height > max_mask_size[0]
    assert width > max_mask_size[1]
    for i in range(k):
      mask_height = tf.random.uniform(shape=[], minval=min_mask_size[0], maxval=max_mask_size[0], dtype=tf.int32)
      mask_width = tf.random.uniform(shape=[], minval=min_mask_size[1], maxval=max_mask_size[1], dtype=tf.int32)

      pad_h = height - mask_height
      pad_top = tf.random.uniform(shape=[], minval=0, maxval=pad_h, dtype=tf.int32)
      pad_bottom = pad_h - pad_top

      pad_w = width - mask_width
      pad_left = tf.random.uniform(shape=[], minval=0, maxval=pad_w, dtype=tf.int32)
      pad_right = pad_w - pad_left

      cutout_area = tf.zeros(shape=[mask_height, mask_width, channels], dtype=tf.uint8)

      cutout_mask = tf.pad([cutout_area], [[0,0],[pad_top, pad_bottom], [pad_left, pad_right], [0,0]], constant_values=1)
      cutout_mask = tf.squeeze(cutout_mask, axis=0)
      image = tf.multiply(tf.cast(image, tf.float32), tf.cast(cutout_mask, tf.float32))

    return image

@tf.function
def aug_shear(x):
  p_shear = tf.random.uniform([], 0, 1.0, dtype=tf.float32)
  if p_shear > .5:
      x = transform_shear(x, HEIGHT, shear=20.)
  else:
      x = transform_shear(x, HEIGHT, shear=-20.)
  return x

def transform_shear(image, height, shear):
    # input image - is one image of size [dim,dim,3] not a batch of [b,dim,dim,3]
    # output - image randomly sheared
    DIM = height
    XDIM = DIM%2 #fix for size 331

    shear = shear * tf.random.uniform([1],dtype='float32')
    shear = math.pi * shear / 180.

    # SHEAR MATRIX
    one = tf.constant([1],dtype='float32')
    zero = tf.constant([0],dtype='float32')
    c2 = tf.math.cos(shear)
    s2 = tf.math.sin(shear)
    shear_matrix = tf.reshape(tf.concat([one,s2,zero, zero,c2,zero, zero,zero,one],axis=0),[3,3])

    # LIST DESTINATION PIXEL INDICES
    x = tf.repeat( tf.range(DIM//2,-DIM//2,-1), DIM )
    y = tf.tile( tf.range(-DIM//2,DIM//2),[DIM] )
    z = tf.ones([DIM*DIM],dtype='int32')
    idx = tf.stack( [x,y,z] )

    # ROTATE DESTINATION PIXELS ONTO ORIGIN PIXELS
    idx2 = K.dot(shear_matrix,tf.cast(idx,dtype='float32'))
    idx2 = K.cast(idx2,dtype='int32')
    idx2 = K.clip(idx2,-DIM//2+XDIM+1,DIM//2)

    # FIND ORIGIN PIXEL VALUES
    idx3 = tf.stack( [DIM//2-idx2[0,], DIM//2-1+idx2[1,]] )
    d = tf.gather_nd(image, tf.transpose(idx3))

    return tf.reshape(d,[DIM,DIM,3])

@tf.function
def crop_img(x):
  p_crop = tf.random.uniform([], 0, 1.0, dtype=tf.float32)
  if p_crop > .6:
    if p_crop > .9:
        x = tf.image.central_crop(x, central_fraction=.5)
    elif p_crop > .8:
        x = tf.image.central_crop(x, central_fraction=.6)
    elif p_crop > .7:
        x = tf.image.central_crop(x, central_fraction=.7)
    else:
        x = tf.image.central_crop(x, central_fraction=.8)
  elif p_crop > .3:
    crop_size = tf.random.uniform([], int(HEIGHT*.6), HEIGHT, dtype=tf.int32)
    x = tf.image.random_crop(x, size=[crop_size, crop_size, 3])
  x = tf.image.resize(x, size=[HEIGHT, WIDTH])
  return x



@tf.function
def random_apply(func, x, p):
    return tf.cond(
        tf.less(tf.random.uniform([], minval=0, maxval=1, dtype=tf.float32),
                tf.cast(p, tf.float32)),
        lambda: func(x),
        lambda: x)

# # @tf.function
# def cacca_su_img(imageee):
#     imagee = 255*imageee.numpy()
#     lista = [20, 25, 30, 35]
#     l = random.choice(lista)
#     box = np.zeros(l**2).reshape(l, l)
#     xx, yy = np.random.randint(0,95-l, size=2)
#     for i in range(3):
#         imagee[xx:xx+l, yy:yy+l, i] = box

#     imagee = tf.convert_to_tensor(imagee)

#     return imagee/255



@tf.function
def preprocess_image(image, label):
  # image = image/255
  image = tf.image.convert_image_dtype(image, tf.float32)
  #     image = tf.image.resize(image, (IMG_SHAPE, IMG_SHAPE))
  image = image/255
  return (image, label)





@tf.function
def preprocess_image_class(image, label):
  image = image/255
  label = tf.one_hot()
  return (image, label)

train_tf = (
    train_tf
    .map(preprocess_image, num_parallel_calls=AUTO)
    .shuffle(100)
    .map(custom_augment, num_parallel_calls=AUTO)
    .batch(BS)
    .prefetch(AUTO)
)

validation_tf = (
    validation_tf
    .map(preprocess_image, num_parallel_calls=AUTO)
    .shuffle(100)
    .batch(32)#BS
    .prefetch(AUTO)
)

Plotting the augmented images

In [None]:
image_batch, label_batch = next(iter(train_tf))

plt.figure(figsize=(10, 10))
for n in range(25):
    ax = plt.subplot(5, 5, n+1)
    plt.imshow(image_batch[n])
    plt.title(label_batch[n].numpy())
    plt.axis('off')

# Training Convnext again

In [None]:
train_tf = (train_tf.map(lambda x, y: (x, tf.one_hot(y, depth =2)))) #let's go back to categorical values
validation_tf = (validation_tf.map(lambda x, y: (x, tf.one_hot(y, depth = 2))))

from tensorflow.keras.layers import Dense, Activation,Dropout,Conv2D, MaxPooling2D,BatchNormalization
from tensorflow.keras.optimizers import Adam, Adamax
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras import regularizers
from tensorflow.keras.models import Model
convlarge = tf.keras.applications.ConvNeXtBase(
    include_top=False,
    include_preprocessing=True,
    weights="imagenet",
    input_tensor=None,
    input_shape= (96,96, 3),
    pooling="max",
    classes=2,
    classifier_activation="softmax",
)

convlarge.trainable = False


from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
# Build the neural network layer by layer
inputs = tfkl.Input(shape=(96, 96, 3))
# x = img_augmentation(inputs)
x = convlarge(inputs*255)
# x = layers.GlobalAveragePooling2D(name = "gap")(x)
x=BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001, name = "batch")(x)


x = layers.Dense(256, kernel_regularizer = regularizers.l2(l = 0.016),activity_regularizer=regularizers.l1(0.006),
                    bias_regularizer=regularizers.l1(0.006) ,activation='relu')(x)
# x = layers.Dense(256, kernel_initializer=tf.keras.initializers.HeUniform(seed=42))(x)
x = layers.BatchNormalization(name = "batch2")(x)
x = layers.Dropout(0.3, seed = 42, name = "dropout")(x)

x = layers.Dense(128, kernel_regularizer = regularizers.l2(l = 0.016),activity_regularizer=regularizers.l1(0.006),
                    bias_regularizer=regularizers.l1(0.006) ,activation='relu')(x)
x = layers.BatchNormalization(name = "batch3")(x)
x = layers.Dropout(0.3, seed = 42, name = "dropout2")(x)

outputs = tfkl.Dense(units=2, activation='softmax',name='Output2')(x)  #you cannnot use softmax with only one neuron since it normalizes over the output neurons
model = tfk.Model(inputs, outputs)
model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adamax(learning_rate=0.01), metrics=['accuracy'])

In [None]:
epochs=100
ask_epoch=20
ask=LR_ASK(model, epochs,  ask_epoch)
callbacks=[ask]

history=model.fit(x=train_tf,  epochs=epochs, verbose=1, callbacks=callbacks,  validation_data=validation_tf,
               validation_steps=None,  shuffle=False,  initial_epoch=0)


## FINE TUNING