In [1]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

In [5]:
import zipfile
zip_ref = zipfile.ZipFile("/content/drive/MyDrive/dataset/data_1.zip","r")
zip_ref.extractall("./")
zip_ref.close()

In [6]:
!pip install split-folders
!pip install tensorflow==2.3.1
!pip install tensorflow-datasets==4.0.0
!pip install tensorflow-addons

Collecting split-folders
  Downloading split_folders-0.5.1-py3-none-any.whl (8.4 kB)
Installing collected packages: split-folders
Successfully installed split-folders-0.5.1
[31mERROR: Could not find a version that satisfies the requirement tensorflow==2.3.1 (from versions: 2.8.0rc0, 2.8.0rc1, 2.8.0, 2.8.1, 2.8.2, 2.8.3, 2.8.4, 2.9.0rc0, 2.9.0rc1, 2.9.0rc2, 2.9.0, 2.9.1, 2.9.2, 2.9.3, 2.10.0rc0, 2.10.0rc1, 2.10.0rc2, 2.10.0rc3, 2.10.0, 2.10.1, 2.11.0rc0, 2.11.0rc1, 2.11.0rc2, 2.11.0, 2.11.1, 2.12.0rc0, 2.12.0rc1, 2.12.0, 2.12.1, 2.13.0rc0, 2.13.0rc1, 2.13.0rc2, 2.13.0, 2.13.1, 2.14.0rc0, 2.14.0rc1, 2.14.0)[0m[31m
[0m[31mERROR: No matching distribution found for tensorflow==2.3.1[0m[31m
[0mCollecting tensorflow-datasets==4.0.0
  Downloading tensorflow_datasets-4.0.0-py3-none-any.whl (3.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
Collecting dill (from tensorflow-datasets==4.0.0)
  Downloading dill-0.3.

In [7]:
import numpy as np
import matplotlib.pyplot as plt
import splitfolders
import tensorflow as tf
from tensorflow import keras
from keras import Sequential
import math
import random
import os

In [8]:
# count the number of images in the respective classes 1 - Kidney tumor and 0 - Normal
ROOT_DIR ="/content/data/"

In [9]:
splitfolders.ratio(ROOT_DIR,output="splited_data",
                   seed=42,
                   ratio=(.7,.0,.3),
                   group_prefix=None)

Copying files: 0 files [00:00, ? files/s]


In [11]:
# generators
train_ds = tf.keras.utils.image_dataset_from_directory(
    directory="/content/splited_data/train",
    labels = "inferred",
    label_mode ="int",
    batch_size=16,
    image_size=(128,128)
)

test_ds = tf.keras.utils.image_dataset_from_directory(
    directory="/content/splited_data/test",
    labels = "inferred",
    label_mode ="int",
    batch_size=16,
    image_size=(128,128)
)

Found 0 files belonging to 1 classes.


ValueError: ignored

In [None]:
# Normalize
def process(image,label):
  image = tf.cast(image/255.,tf.float32)
  return image,label

train_ds = train_ds.map(process)
test_ds = test_ds.map(process)

In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.layers import (
    Dense,
    Dropout,
    LayerNormalization,
)
from tensorflow.keras.layers.experimental.preprocessing import Rescaling


class MultiHeadSelfAttention(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )
        self.projection_dim = embed_dim // num_heads
        self.query_dense = Dense(embed_dim)
        self.key_dense = Dense(embed_dim)
        self.value_dense = Dense(embed_dim)
        self.combine_heads = Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(
            x, (batch_size, -1, self.num_heads, self.projection_dim)
        )
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)
        query = self.separate_heads(query, batch_size)
        key = self.separate_heads(key, batch_size)
        value = self.separate_heads(value, batch_size)

        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(
            attention, (batch_size, -1, self.embed_dim)
        )
        output = self.combine_heads(concat_attention)
        return output


class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, mlp_dim, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.mlp = tf.keras.Sequential(
            [
                Dense(mlp_dim, activation=tfa.activations.gelu),
                Dropout(dropout),
                Dense(embed_dim),
                Dropout(dropout),
            ]
        )
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(dropout)
        self.dropout2 = Dropout(dropout)

    def call(self, inputs, training):
        inputs_norm = self.layernorm1(inputs)
        attn_output = self.att(inputs_norm)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = attn_output + inputs

        out1_norm = self.layernorm2(out1)
        mlp_output = self.mlp(out1_norm)
        mlp_output = self.dropout2(mlp_output, training=training)
        return mlp_output + out1


class VisionTransformer(tf.keras.Model):
    def __init__(
        self,
        image_size,
        patch_size,
        num_layers,
        num_classes,
        d_model,
        num_heads,
        mlp_dim,
        channels=3,
        dropout=0.1,
    ):
        super(VisionTransformer, self).__init__()
        num_patches = (image_size // patch_size) ** 2
        self.patch_dim = channels * patch_size ** 2

        self.patch_size = patch_size
        self.d_model = d_model
        self.num_layers = num_layers

        self.rescale = Rescaling(1.0 / 255)
        self.pos_emb = self.add_weight(
            "pos_emb", shape=(1, num_patches + 1, d_model)
        )
        self.class_emb = self.add_weight("class_emb", shape=(1, 1, d_model))
        self.patch_proj = Dense(d_model)
        self.enc_layers = [
            TransformerBlock(d_model, num_heads, mlp_dim, dropout)
            for _ in range(num_layers)
        ]
        self.mlp_head = tf.keras.Sequential(
            [
                LayerNormalization(epsilon=1e-6),
                Dense(mlp_dim, activation=tfa.activations.gelu),
                Dropout(dropout),
                Dense(num_classes-1),
            ]
        )

    def extract_patches(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patches = tf.reshape(patches, [batch_size, -1, self.patch_dim])
        return patches

    def call(self, x, training):
        batch_size = tf.shape(x)[0]
        x = self.rescale(x)
        patches = self.extract_patches(x)
        x = self.patch_proj(patches)

        class_emb = tf.broadcast_to(
            self.class_emb, [batch_size, 1, self.d_model]
        )
        x = tf.concat([class_emb, x], axis=1)
        x = x + self.pos_emb

        for layer in self.enc_layers:
            x = layer(x, training)

        # First (class token) is used for classification
        x = self.mlp_head(x[:, 0])
        return x

In [None]:
# Path to the folders containing the images
normal_folder = '/content/splited_data/train/Normal'
tumor_folder = '/content/splited_data/train/Tumor'

# Get the list of image filenames in the folders
normal_images = [os.path.join(normal_folder, filename) for filename in os.listdir(normal_folder)]
tumor_images = [os.path.join(tumor_folder, filename) for filename in os.listdir(tumor_folder)]

# Combine the lists of images
all_images = normal_images + tumor_images
random.shuffle(all_images)  # Shuffle the images

In [None]:
# Function to plot images with a fixed size
def plotImages(image_paths, image_size=(256, 256)):
    fig, axes = plt.subplots(2, 5, figsize=(15, 7))
    for img_path, ax in zip(image_paths, axes.ravel()):
        img = plt.imread(img_path)
        ax.imshow(img, extent=[0, image_size[0], 0, image_size[1]])
        ax.set_title(os.path.basename(os.path.dirname(img_path)))
        ax.axis('off')
    plt.tight_layout()
    plt.show()

In [None]:
# Select 10 random images
selected_images = random.sample(all_images, 10)

# Plot the selected images
plotImages(selected_images)

In [None]:
import os
from argparse import ArgumentParser

import tensorflow as tf
import tensorflow_addons as tfa
# import tensorflow_datasets as tfds
from tensorflow.keras.callbacks import TensorBoard


AUTOTUNE = tf.data.experimental.AUTOTUNE

# if __name__ == "__main__":
#     parser = ArgumentParser()
#     parser.add_argument("--logdir", default="logs")
#     parser.add_argument("--image-size", default=32, type=int)
#     parser.add_argument("--patch-size", default=4, type=int)
#     parser.add_argument("--num-layers", default=4, type=int)
#     parser.add_argument("--d-model", default=64, type=int)
#     parser.add_argument("--num-heads", default=4, type=int)
#     parser.add_argument("--mlp-dim", default=128, type=int)
#     parser.add_argument("--lr", default=3e-4, type=float)
#     parser.add_argument("--weight-decay", default=1e-4, type=float)
#     parser.add_argument("--batch-size", default=4096, type=int)
#     parser.add_argument("--epochs", default=300, type=int)
#     args = parser.parse_args()

args = {
    "logdir": "logs",
    "image_size": 128,
    "patch_size": 4,
    "num_layers": 4,
    "d_model": 64,
    "num_heads": 4,
    "mlp_dim": 128,
    "lr": 3e-4,
    "weight_decay": 1e-4,
    "batch_size": 16,
    "epochs": 50,
}



#     ds = tfds.load("imagenet_resized/32x32", as_supervised=True)
#     ds_train = (
#         ds["train"]
#         .cache()
#         .shuffle(5 * args.batch_size)
#         .batch(args.batch_size)
#         .prefetch(AUTOTUNE)
#     )
#     ds_test = (
#         ds["validation"]
#         .cache()
#         .batch(args.batch_size)
#         .prefetch(AUTOTUNE)
#     )

#     strategy = tf.distribute.MirroredStrategy()

#     with strategy.scope():

# model = VisionTransformer(
#             image_size=args.image_size,
#             patch_size=args.patch_size,
#             num_layers=args.num_layers,
#             num_classes=2,
#             d_model=args.d_model,
#             num_heads=args.num_heads,
#             mlp_dim=args.mlp_dim,
#             channels=3,
#             dropout=0.1,
#         )

model = VisionTransformer(
    image_size=args["image_size"],
    patch_size=args["patch_size"],
    num_layers=args["num_layers"],
    num_classes=2,
    d_model=args["d_model"],
    num_heads=args["num_heads"],
    mlp_dim=args["mlp_dim"],
    channels=3,
    dropout=0.1,
)

# Build the model
model.build(input_shape=(None, args["image_size"], args["image_size"], 3))

In [None]:
model.compile(optimizer="adam",loss="binary_crossentropy",metrics=['accuracy',tf.keras.metrics.Precision(),tf.keras.metrics.Recall()])

In [None]:
model.summary()

In [None]:
hist=model.fit(
        train_ds,
        validation_data=test_ds,
        epochs=args['epochs'],
        # callbacks=[TensorBoard(log_dir=args['logdir'], profile_batch=0),],
    )

In [None]:
model.save_weights(os.path.join(args.logdir, "vit"))
# Save the model
model_save_path = '/content/drive/MyDrive/model/vit.h5'
model.save(model_save_path)

In [None]:
plt.plot(hist.history["loss"],color="red",label="train")
plt.plot(hist.history['val_loss'],color="blue",label="validation")
plt.legend()
plt.show()

In [None]:
plt.plot(hist.history["accuracy"],color="red",label="train")
plt.plot(hist.history['val_accuracy'],color="blue",label="validation")
plt.legend()
plt.show()

In [None]:
# Load the saved model
model_path = 'simple_cnn_with_attention.h5'
loaded_model = tf.keras.models.load_model(model_path)