In [None]:
import os
import math
import numpy as np
import pickle as p
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
from tensorflow.keras import layers
import tensorflow_addons as tfa
import numpy as np
import tensorflow as tf
from tensorflow import keras
import math
from PIL import Image
import cv2
from tensorflow.random import set_seed
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.metrics import accuracy_score
from utils.ResNet18 import *
from utils.Image_reader import Imagereader
from tensorflow.keras.preprocessing.image import ImageDataGenerator
%matplotlib inline

Set GPU Environment

In [None]:
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
gpus = tf.config.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0],True)
print('Num GPUs Available:', len(tf.config.experimental.list_physical_devices('GPU')))

from tensorflow.python.client import device_lib
def get_available_gpus():
    local_divice_protos = device_lib.list_local_devices()
    # print(local_divice_protos)
    return [x for x in local_divice_protos if x.device_type == 'GPU']
    
print(get_available_gpus())

Prepare for dataset

In [None]:
Data = Imagereader()
df_train, df_test = Data.datasetGen('Kaggle_dataset', testGen = True)

In [None]:
set_seed(1)
BS = 16
BATCH = BS 
SEED = 42
H = 224
W = 224

In [None]:
train_df, val_df = train_test_split(df_train, test_size = 0.2, random_state = SEED, stratify = df_train['class'])

In [None]:
train_datagen = ImageDataGenerator(rescale=1/255.,
                                  zoom_range = 0.1,
                                  rotation_range = 0.1,
                                  width_shift_range = 0.1,
                                  height_shift_range = 0.1)

val_datagen = ImageDataGenerator(rescale=1/255.)

ds_train = train_datagen.flow_from_dataframe(train_df,
                                             x_col = 'image',
                                             y_col = 'class',
                                             target_size = (H,W),
                                             batch_size = BATCH,
                                             seed = SEED)

ds_val = val_datagen.flow_from_dataframe(val_df,
                                            x_col = 'image',
                                            y_col = 'class',
                                            target_size = (H,W),
                                            batch_size = BATCH,
                                            seed = SEED)

ds_test = val_datagen.flow_from_dataframe(df_test,
                                            x_col = 'image',
                                            y_col = 'class',
                                            target_size = (H,W),
                                            batch_size = 1,
                                            shuffle = False)

Build PneuNet

In [None]:
class Encoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(Encoder,self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units = projection_dim)
        self.position_embedding = layers.Embedding(input_dim=num_patches, output_dim=projection_dim)

    def call(self, patch):
        positions = tf.range(start = 0, limit=self.num_patches, delta = 1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded

In [None]:
transformer_layers = 6

num_heads = 16
projection_dim = 80
mlp_head_units = [1024, 64, 16]
transformer_units = [
    projection_dim * 2,
    projection_dim,
]

In [None]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

In [None]:
baseModel  = resnet_18(input_shape=(H,W,3), nclass=2, Denselayer = False)

Inputs_ = layers.Input(shape=(H, W, 3))
x = baseModel(Inputs_)
x = layers.MaxPool2D((2,2))(x)
x = layers.BatchNormalization()(x)

temp_shape = keras.backend.int_shape(x)
batch_size = tf.shape(x)[0]
patch_dim = temp_shape[1]*temp_shape[2]
channel = temp_shape[-1]
num_patches = channel
print(temp_shape)
x = tf.reshape(x,[batch_size, -1, patch_dim])

encoded_patches = Encoder(num_patches, projection_dim)(x)
for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.2
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0.2)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
representation = layers.Flatten()(representation)
representation = layers.Dropout(0.2)(representation)
# Add MLP.
features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.2)
# Classify outputs.
logits = layers.Dense(3,activation = 'softmax')(features)

# Create the Keras model.
model = keras.Model(inputs=Inputs_ , outputs=logits )

model.summary()

Set up optimizer, metric and train the model

In [None]:
learning_rate = 0.0001
weight_decay = 0.00001

In [None]:
optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )
 
model.compile(
        optimizer=optimizer,
        loss=keras.losses.CategoricalCrossentropy(from_logits=False),
        metrics=[
            keras.metrics.CategoricalAccuracy(name="accuracy"),
        ],
    )

# checkpoint_callback = keras.callbacks.ModelCheckpoint(
#         checkpoint_filepath,
#         monitor="val_accuracy",
#         save_best_only=True,
#         save_weights_only=True,
#     )

In [None]:
history = model.fit(
        ds_train,
        batch_size=BS,
        epochs=100,
        validation_data=ds_val,
        # callbacks=[checkpoint_callback],
    )
# model.load_weights(checkpoint_filepath)