In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install tensorflow-addons

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow-addons
  Downloading tensorflow_addons-0.19.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow-addons
Successfully installed tensorflow-addons-0.19.0


In [3]:
!cp /content/drive/MyDrive/kvasir_dataset.zip .

In [4]:
!unzip -qq kvasir_dataset.zip

In [5]:
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers

In [6]:
data_augmentation= Sequential([
                               #layers.Rescaling(scale=1.0 / 255),
                               layers.RandomBrightness(factor=0.2, value_range=(0, 255), seed=123)
])

In [7]:
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.layers import (
    Dense,
    Dropout,
    LayerNormalization,
)
from tensorflow.keras.layers.experimental.preprocessing import Rescaling


class MultiHeadSelfAttention(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )
        self.projection_dim = embed_dim // num_heads
        self.query_dense = Dense(embed_dim)
        self.key_dense = Dense(embed_dim)
        self.value_dense = Dense(embed_dim)
        self.combine_heads = Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(
            x, (batch_size, -1, self.num_heads, self.projection_dim)
        )
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)
        query = self.separate_heads(query, batch_size)
        key = self.separate_heads(key, batch_size)
        value = self.separate_heads(value, batch_size)

        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(
            attention, (batch_size, -1, self.embed_dim)
        )
        output = self.combine_heads(concat_attention)
        return output


class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = tf.keras.Sequential(
            [Dense(ff_dim, activation="relu"), Dense(embed_dim),]
        )
        self.augmentation=data_augmentation
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(dropout)
        self.dropout2 = Dropout(dropout)

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


class VisionTransformer(tf.keras.Model):
    def __init__(
        self,
        augmentation,
        image_size,
        patch_size,
        num_layers,
        num_classes,
        d_model,
        num_heads,
        mlp_dim,
        channels=3,
        dropout=0.1,
    ):
        super(VisionTransformer, self).__init__()
        num_patches = (image_size // patch_size) ** 2
        self.patch_dim = channels * patch_size ** 2

        self.patch_size = patch_size
        self.d_model = d_model
        self.num_layers = num_layers

        self.augmentation = data_augmentation

        self.rescale = Rescaling(1./255)
        self.pos_emb = self.add_weight(
            "pos_emb", shape=(1, num_patches + 1, d_model)
        )
        self.class_emb = self.add_weight("class_emb", shape=(1, 1, d_model))
        self.patch_proj = Dense(d_model)
        self.enc_layers = [
            TransformerBlock(d_model, num_heads, mlp_dim, dropout)
            for _ in range(num_layers)
        ]
        self.mlp_head = tf.keras.Sequential(
            [Dense(mlp_dim, activation=tfa.activations.gelu),
                Dropout(dropout),
                Dense(num_classes),
            ]
        )

    def extract_patches(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patches = tf.reshape(patches, [batch_size, -1, self.patch_dim])
        return patches

    def call(self, x, training):
        batch_size = tf.shape(x)[0]
        x = self.augmentation(x)
        x = self.rescale(x)
        patches = self.extract_patches(x)
        x = self.patch_proj(patches)

        class_emb = tf.broadcast_to(
            self.class_emb, [batch_size, 1, self.d_model]
        )
        x = tf.concat([class_emb, x], axis=1)
        x = x + self.pos_emb

        for layer in self.enc_layers:
            x = layer(x, training)

        # First (class token) is used for classification
        x = self.mlp_head(x[:, 0])
        return x


In [8]:
imagePaths = "kvasir_dataset/train/"
test_path = "kvasir_dataset/test/"

In [9]:
batch_size=64
img_height = 128
img_width = 128

In [10]:
import tensorflow as tf

In [11]:
!rm -rf kvasir_dataset/train/.ipynb_checkpoints

In [12]:
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
  imagePaths,
  #color_mode='grayscale',
  validation_split=0.15,
  subset="training",
  seed=123,
  image_size=(img_height, img_width),
  #label_mode="categorical",
  batch_size=batch_size)

Found 15300 files belonging to 6 classes.
Using 13005 files for training.


In [13]:
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
  imagePaths, 
  #color_mode='grayscale',
  validation_split=0.15,
  subset="validation",
  seed=123,
  image_size=(img_height, img_width),
  #label_mode="categorical",
  batch_size=batch_size)

Found 15300 files belonging to 6 classes.
Using 2295 files for validation.


In [14]:
test_ds = tf.keras.preprocessing.image_dataset_from_directory(
  test_path,
  seed=123,
  image_size=(img_height, img_width),
  #label_mode="categorical",
  batch_size=batch_size)

Found 2700 files belonging to 6 classes.


In [15]:
import os

import tensorflow as tf
import tensorflow_addons as tfa
import tensorflow_datasets as tfds

from tensorflow.keras.callbacks import TensorBoard


AUTOTUNE = tf.data.experimental.AUTOTUNE

if __name__ == "__main__":
    logdir="logs"
    image_size=128
    patch_size=4
    num_layers=8  
    d_model=64 
    num_heads=4
    mlp_dim=512
    lr= 3e-4
    weight_decay=1e-4
    epochs=25
    augmentation=data_augmentation
   


    strategy = tf.distribute.MirroredStrategy()

    with strategy.scope():
        model = VisionTransformer(
            augmentation = augmentation,
            image_size = img_height,
            patch_size= patch_size,
            num_layers= num_layers,
            num_classes=6,
            d_model=d_model,
            num_heads=num_heads,
            mlp_dim=mlp_dim,
            channels=3,
            dropout=0.1,
        )
        model.compile(
            loss=tf.keras.losses.SparseCategoricalCrossentropy(
                from_logits=True
            ),
            optimizer=tfa.optimizers.AdamW(
                learning_rate=lr, weight_decay=weight_decay
            ),
            metrics=["accuracy"],
        )

    early_stop = tf.keras.callbacks.EarlyStopping(patience=100),
    #mcp = tf.keras.callbacks.ModelCheckpoint(filepath='weights/best.h5', save_best_only=True, monitor='val_loss', mode='min')
    checkpoint_filepath = "./my_checkpoints/checkpoint"
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        checkpoint_filepath,
        monitor="val_accuracy",
        save_best_only=True,
        save_weights_only=True,
    )
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss', factor=0.1, patience=3, verbose=0, mode='auto',
    min_delta=0.0001, cooldown=0, min_lr=0)    

    model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=epochs,
        callbacks=[early_stop, checkpoint_callback, reduce_lr],
    )
    model.save_weights(os.path.join(logdir, "vit"))


Epoch 1/25




Epoch 2/25
Epoch 3/25
Epoch 4/25
Epoch 5/25
Epoch 6/25
Epoch 7/25
Epoch 8/25
Epoch 9/25
Epoch 10/25
Epoch 11/25
Epoch 12/25
Epoch 13/25
Epoch 14/25
Epoch 15/25
Epoch 16/25
Epoch 17/25
Epoch 18/25
Epoch 19/25
Epoch 20/25
Epoch 21/25
Epoch 22/25
Epoch 23/25
Epoch 24/25
Epoch 25/25


In [16]:
from sklearn.metrics import classification_report, confusion_matrix

In [17]:
import numpy as np

In [18]:
model.load_weights("./my_checkpoints/checkpoint")

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7fef6021c8e0>

In [19]:
predictions = np.array([])
labels =  np.array([])
for x, y in test_ds:
  Y_pred=model.predict(x)
  y_prediction = np.argmax(Y_pred, axis=1)
  predictions = np.concatenate([predictions, y_prediction])
  labels = np.concatenate([labels, y.numpy()])



In [24]:
print('Confusion Matrix')
print(confusion_matrix(y_true=labels, y_pred=predictions))

Confusion Matrix
[[334 115   0   0   1   0]
 [115 334   0   0   0   1]
 [  0   0 416  34   0   0]
 [  0   0  47 386  12   5]
 [  4   0   4  16 357  69]
 [  1   0   1  12 116 320]]


In [25]:
classes=['dyed-lifted-polyps','dyed-resection-margins', 'esophagitis','normal','polyps', 'ulcerative-colitis']
     
print('Classification Report')
target_names = classes
print(classification_report(y_true=labels, y_pred=predictions, target_names=target_names))

Classification Report
                        precision    recall  f1-score   support

    dyed-lifted-polyps       0.74      0.74      0.74       450
dyed-resection-margins       0.74      0.74      0.74       450
           esophagitis       0.89      0.92      0.91       450
                normal       0.86      0.86      0.86       450
                polyps       0.73      0.79      0.76       450
    ulcerative-colitis       0.81      0.71      0.76       450

              accuracy                           0.80      2700
             macro avg       0.80      0.80      0.79      2700
          weighted avg       0.80      0.80      0.79      2700



In [22]:
results = model.evaluate(test_ds)



In [23]:
!zip -r vit_checkpoints.zip my_checkpoints/

  adding: my_checkpoints/ (stored 0%)
  adding: my_checkpoints/checkpoint (deflated 48%)
  adding: my_checkpoints/checkpoint.data-00000-of-00001 (deflated 8%)
  adding: my_checkpoints/checkpoint.index (deflated 81%)
