In [None]:
from utils.utils import *

from loss.CosFace import *

In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.vgg19 import VGG19

from tensorflow.keras.layers import Input, Lambda, Dense, Flatten
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, BatchNormalization, Conv2D, MaxPool2D, MaxPooling2D
from tensorflow.keras.models import Sequential

from keras.callbacks import LearningRateScheduler, Callback,ModelCheckpoint
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score


from sklearn.metrics import classification_report, precision_score, recall_score, f1_score, accuracy_score, matthews_corrcoef, cohen_kappa_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import average_precision_score

import os
import matplotlib.pyplot as plt
import numpy as np


from keras import backend as K
from tensorflow.keras import layers
from keras.regularizers import l2
from keras.layers import PReLU
from keras import initializers


from keras.layers import Input,Conv2D,Activation,Dense,Lambda,Flatten,Embedding,PReLU,BatchNormalization
from keras.models import Model
import keras.backend as K
import tensorflow as tf
import numpy as np
from keras.callbacks import EarlyStopping


# Load KCRC Data (NCT-CRC)

In [None]:
train_dir = r"...\NCT-CRC-HE-100K/"
test_dir = r"...\CRC-VAL-HE-7K/"

In [None]:
classes = ['ADI', 'BACK', 'DEB', 'LYM', 'MUC', 'MUS', 'NORM', 'STR', 'TUM']

datagen = ImageDataGenerator(horizontal_flip=True, vertical_flip=True, validation_split=0.15, preprocessing_function=tf.keras.applications.vgg19.preprocess_input)

train_batches = datagen.flow_from_directory(directory=train_dir, target_size=(112,112), 
                                            classes=classes, batch_size=32,subset='training')
valid_batches= datagen.flow_from_directory(directory=train_dir, target_size=(112,112), 
                                           classes=classes, batch_size=32,subset='validation',shuffle=False)

test_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.vgg19.preprocess_input).flow_from_directory(directory=test_dir, target_size=(112,112), classes=classes, batch_size=32, shuffle=False)

weights_kcrc = [10407, 10566, 11512, 11557, 8896, 13536, 8763, 10446, 14317]


In [None]:
def calc_effective_weights(counts, beta=0.999):
    effective_num = 1.0 - np.power(beta, counts)
    weights = (1.0 - beta) / np.array(effective_num)
    weights = weights / np.sum(weights) * int(len(counts))
    return weights

eff_weights = calc_effective_weights(weights_kcrc, beta=0.9999)
print(eff_weights)

# Define Model

In [None]:
from tensorflow.keras import layers
from tensorflow import keras

import matplotlib.pyplot as plt
import tensorflow_addons as tfa


In [None]:
positional_emb = True
conv_layers = 2
projection_dim = 128

num_heads = 2
transformer_units = [
    projection_dim,
    projection_dim,
]
transformer_layers = 2
stochastic_depth_rate = 0.1

learning_rate = 0.001
weight_decay = 0.0001
batch_size = 128
image_size = 112

In [None]:
num_epochs = 10
num_classes = 9 
input_shape = (112, 112, 3) 

In [None]:
class CCTTokenizer(layers.Layer):
    def __init__(
        self,
        kernel_size=3,
        stride=1,
        padding=1,
        pooling_kernel_size=3,
        pooling_stride=2,
        num_conv_layers=conv_layers,
        num_output_channels=[64, 128],
        positional_emb=positional_emb,
        **kwargs,
    ):
        super(CCTTokenizer, self).__init__(**kwargs)

        # This is our tokenizer.
        self.conv_model = keras.Sequential()
        for i in range(num_conv_layers):
            self.conv_model.add(
                layers.Conv2D(
                    num_output_channels[i],
                    kernel_size,
                    stride,
                    padding="valid",
                    use_bias=False,
                    activation="relu",
                    kernel_initializer="he_normal",
                )
            )
            self.conv_model.add(layers.ZeroPadding2D(padding))
            self.conv_model.add(
                layers.MaxPool2D(pooling_kernel_size, pooling_stride, "same")
            )

        self.positional_emb = positional_emb

    def call(self, images):
        outputs = self.conv_model(images)
        # After passing the images through our mini-network the spatial dimensions
        # are flattened to form sequences.
        reshaped = tf.reshape(
            outputs,
            (-1, tf.shape(outputs)[1] * tf.shape(outputs)[2], tf.shape(outputs)[-1]),
        )
        return reshaped

    def positional_embedding(self, image_size):
        # Positional embeddings are optional in CCT. Here, we calculate
        # the number of sequences and initialize an `Embedding` layer to
        # compute the positional embeddings later.
        if self.positional_emb:
            dummy_inputs = tf.ones((1, image_size, image_size, 3))
            dummy_outputs = self.call(dummy_inputs)
            sequence_length = tf.shape(dummy_outputs)[1]
            projection_dim = tf.shape(dummy_outputs)[-1]

            embed_layer = layers.Embedding(
                input_dim=sequence_length, output_dim=projection_dim
            )
            return embed_layer, sequence_length
        else:
            return None


In [None]:
# Referred from: github.com:rwightman/pytorch-image-models.
class StochasticDepth(layers.Layer):
    def __init__(self, drop_prop, **kwargs):
        super(StochasticDepth, self).__init__(**kwargs)
        self.drop_prob = drop_prop

    def call(self, x, training=None):
        if training:
            keep_prob = 1 - self.drop_prob
            shape = (tf.shape(x)[0],) + (1,) * (tf.shape(x).shape[0] - 1)
            random_tensor = keep_prob + tf.random.uniform(shape, 0, 1)
            random_tensor = tf.floor(random_tensor)
            return (x / keep_prob) * random_tensor
        return x


In [None]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x


In [None]:
def create_cct_model(
    image_size=image_size,
    input_shape=input_shape,
    num_heads=num_heads,
    projection_dim=projection_dim,
    transformer_units=transformer_units,
):

    inputs = layers.Input(input_shape)
    label = Input(shape=(9,))

    # Encode patches.
    cct_tokenizer = CCTTokenizer()
    encoded_patches = cct_tokenizer(inputs)

    # Apply positional embedding.
    if positional_emb:
        pos_embed, seq_length = cct_tokenizer.positional_embedding(image_size)
        positions = tf.range(start=0, limit=seq_length, delta=1)
        position_embeddings = pos_embed(positions)
        encoded_patches += position_embeddings

    # Calculate Stochastic Depth probabilities.
    dpr = [x for x in np.linspace(0, stochastic_depth_rate, transformer_layers)]

    # Create multiple layers of the Transformer block.
    for i in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-5)(encoded_patches)

        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(x1, x1)

        # Skip connection 1.
        attention_output = StochasticDepth(dpr[i])(attention_output)
        x2 = layers.Add()([attention_output, encoded_patches])

        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-5)(x2)

        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0.1)

        # Skip connection 2.
        x3 = StochasticDepth(dpr[i])(x3)
        encoded_patches = layers.Add()([x3, x2])

    # Apply sequence pooling.
    representation = layers.LayerNormalization(epsilon=1e-5)(encoded_patches)
    attention_weights = tf.nn.softmax(layers.Dense(1)(representation), axis=1)
    weighted_representation = tf.matmul(
        attention_weights, representation, transpose_a=True
    )
    weighted_representation = tf.squeeze(weighted_representation, -2)

    # Classify outputs.
    #logits = layers.Dense(num_classes, activation='softmax')(weighted_representation)
    output = CosFace(n_classes=9, s=11.0, m=0.1)([weighted_representation, label])
    # Create the Keras model.
    model = keras.Model(inputs=[inputs, label], outputs=output)
    return model


In [None]:
cct_model = create_cct_model()

In [None]:
checkpoint_filepath = "/content/checkpoint"

def run_experiment(model):
    optimizer = tfa.optimizers.AdamW(learning_rate=0.001, weight_decay=0.0001)

    loss_weighted=weighted_categorical_crossentropy(eff_weights)
    model.compile(
        optimizer=optimizer,
        loss=loss_weighted,
        metrics=[
            keras.metrics.CategoricalAccuracy(name="accuracy"),
        ],
    )
    datagen_to_use = train_batches
    steps_per_epoch = len(datagen_to_use) 

    train_acc_metric = tf.keras.metrics.Accuracy()
    val_acc_metric = tf.keras.metrics.Accuracy()
    max_f1 = 0
    for epoch in range(num_epochs):
        print("\nStart of epoch %d" % (epoch,))
      # Iterate over the batches of the dataset.
        for step, (x_batch_train, y_batch_train) in enumerate(datagen_to_use):
            x_train = x_test = x_batch_train
            y_train = y_test = y_batch_train

            model.train_on_batch(x=[x_train,y_train],y=y_train)

            if step >= steps_per_epoch:  # manually detect the end of the epoch
                break 

          #print("Training acc over epoch: %.4f" % (float(train_acc),))
        y_test, y_pred = get_generator_output(valid_batches, model) # get validation score
        _ = train_acc_metric.update_state(y_test, y_pred)
        #print(classification_report(y_test, y_pred))
    return None





In [None]:
run_experiment(cct_model)

# Load Best Model and Test

In [None]:
filename = ''
cct_model.load_weights(filename)

In [None]:
y_test, y_pred = get_generator_output(test_batches, cct_model)
print(classification_report(y_test, y_pred,digits=4))