<a href="https://colab.research.google.com/github/Tawheed-tariq/Machine-learning-course/blob/main/simple%20projects/malaria%20diagnosis/malaria_diagnosis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds
import tensorflow_probability as tfp
from tensorflow.keras.layers import Dense, InputLayer, Conv2D, MaxPool2D, Flatten, BatchNormalization, Input, Layer, Dropout, RandomFlip, RandomRotation, Resizing, Rescaling
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import RootMeanSquaredError, FalsePositives, FalseNegatives, TrueNegatives, TruePositives, Precision, Recall, AUC, BinaryAccuracy
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import L2, L1
from tensorflow.image import flip_left_right
import sklearn
from sklearn.metrics import confusion_matrix, roc_curve
import seaborn as sns
from tensorflow.keras.callbacks import Callback, CSVLogger, EarlyStopping, LearningRateScheduler, ModelCheckpoint


In [None]:
dataset, dataset_info = tfds.load('malaria', with_info = True, as_supervised = True, shuffle_files= True, split = ['train'])

In [None]:
dataset

In [None]:
def splits(dataset, TRAIN_RATIO, VAL_RATIO, TEST_RATIO):
  DATASET_SIZE = len(dataset)

  train_dataset = dataset.take(int(TRAIN_RATIO * DATASET_SIZE))
  val_test_dataset = dataset.skip(int(TRAIN_RATIO * DATASET_SIZE))

  val_dataset = val_test_dataset.take(int(VAL_RATIO * DATASET_SIZE))

  test_dataset = val_test_dataset.skip(int(VAL_RATIO * DATASET_SIZE))
  return train_dataset, val_dataset, test_dataset

In [None]:
TRAIN_RATIO = 0.8
VAL_RATIO = 0.1
TEST_RATIO = 0.1

train_dataset , val_dataset, test_dataset = splits(dataset[0], TRAIN_RATIO, VAL_RATIO, TEST_RATIO)

# print(list(train_dataset.take(1).as_numpy_iterator()), list(val_dataset.take(1).as_numpy_iterator()), list(test_dataset.take(1).as_numpy_iterator()))

# Data Visualization

In [None]:

for i , (image, label) in enumerate(train_dataset.take(16)):
  ax = plt.subplot(4, 4, i+1)
  plt.imshow(image)
  plt.title(dataset_info.features['label'].int2str(label))
  plt.axis('off')

# Data preprocessing

## Data Argumentation

In [None]:
def visualize(orignal, argumented):
    plt.subplot(1,2,1)
    plt.imshow(orignal)

    plt.subplot(1,2,2)
    plt.imshow(argumented)

In [None]:
orignal_image, label = next(iter(train_dataset))

In [None]:
argumented_image = tf.image.central_crop(orignal_image, 0.8)

In [None]:
visualize(orignal_image, argumented_image)

In [None]:
IM_SIZE = 224
def resize_rescale(image, label):
  return tf.image.resize(image, (IM_SIZE, IM_SIZE))/ 255.0, label

In [None]:
resize_rescale_layers = tf.keras.Sequential([
    Resizing(IM_SIZE, IM_SIZE),
    Rescaling(1.0/255.0)
])

In [None]:
#data argumentation using tf.image
def argument(image, label):
    image , label = resize_rescale(image, label)

    image = tf.image.rot90(image, k = 1)
    image = tf.image.flip_left_right(image)

    return image, label

In [None]:
class RotNienty(Layer):
  def __init__(self):
    super().__init__()

  def call(self, image):
    return tf.image.rot90(image, k = 1)

In [None]:
# data argumentation using tf.keras.layers
argument_layers = tf.keras.Sequential([
    RotNienty(),
    RandomFlip(mode = 'horizontal')
])

def argument_layer(image, label):
    return argument_layers(resize_rescale_layers(image) , training = True), label

## Data Loading

In [None]:
# test_dataset = test_dataset.map(resize_rescale_layers)

In [None]:
# for image, label in train_dataset.take(1):
#   print(image, label)

In [None]:
BATCH_SIZE = 32
train_dataset = (
    train_dataset
    .shuffle(buffer_size= 8, reshuffle_each_iteration= True)
   # .map(argument_layer)
    .batch(1)
    .prefetch(tf.data.AUTOTUNE)
)
val_dataset = (
    val_dataset
    .shuffle(buffer_size= 8, reshuffle_each_iteration= True)
    #.map(resize_rescale_layers)
    .batch(1)
    .prefetch(tf.data.AUTOTUNE)
)

## MIxUp Data argumentation

In [None]:
train_dataset_1 = train_dataset.shuffle(buffer_size= 8, reshuffle_each_iteration= True).map(resize_rescale)
train_dataset_2 = train_dataset.shuffle(buffer_size= 8, reshuffle_each_iteration= True).map(resize_rescale)

mixed_dataset = tf.data.Dataset.zip((train_dataset_1, train_dataset_2))

In [None]:
import cv2
def mixup(train_dataset_1, train_dataset_2):
    (image_1, label_1), (image_2, label_2) = train_dataset_1, train_dataset_2

    lamda = tfp.distributions.Beta(0.1, 0.1)
    lamda = lamda.sample(1)[0]

    image = lamda * image_1 + (1 - lamda) * image_2
    label = lamda * tf.cast(label_1, dtype=float32) + (1 - lamda) * tf.cast(label_2, dtype= float32)

    return image , label

In [None]:
BATCH_SIZE = 32
train_dataset = (
    mixed_dataset
    .shuffle(buffer_size= 8, reshuffle_each_iteration= True)
   .map(mixup)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

In [None]:
val_dataset = (
    val_dataset
    .map(resize_rescale)
    .batch(BATCH_SIZE)
)

# Model Creation and Training

In [None]:
dropout_rate = 0.2
regularizing_rate = 0.01
model = tf.keras.Sequential([
    InputLayer(input_shape=(None, None, 3)),
    resize_rescale_layers,
    argument_layers,

    Conv2D(filters = 6, kernel_size = 3, padding = 'valid', strides = 1, activation = 'relu', kernel_regularizer = L2(regularizing_rate)),
    BatchNormalization(),
    MaxPool2D(pool_size = 2, strides = 2),
    Dropout(rate = dropout_rate),

    Conv2D(filters = 16, kernel_size = 3, padding = 'valid', strides = 1, activation = 'relu', kernel_regularizer = L2(regularizing_rate)),
    BatchNormalization(),
    MaxPool2D(pool_size = 2, strides = 2),

    Flatten(),
    Dense(100, activation = 'relu', kernel_regularizer = L2(regularizing_rate)),
    BatchNormalization(),
    Dropout(rate = dropout_rate),
    Dense(10, activation = 'relu', kernel_regularizer = L2(regularizing_rate)),
    BatchNormalization(),
    Dense(1, activation = 'sigmoid'),
])
model.summary()

In [None]:
func_input = Input(shape = (IM_SIZE, IM_SIZE, 3), name = 'Input Image')
x = Conv2D(filters = 6, kernel_size = 3, padding = 'valid', strides = 1, activation = 'relu')(func_input)
x = BatchNormalization()(x)
x = MaxPool2D(pool_size = 2, strides = 2)(x)
x = Conv2D(filters = 16, kernel_size = 3, padding = 'valid', strides = 1, activation = 'relu')(x)
x = BatchNormalization()(x)
output = MaxPool2D(pool_size = 2, strides = 2)(x)

feature_extractor_model = Model(func_input, output, name = "feature_extractor")
feature_extractor_model.summary()


In [None]:
feature_extractor_seq = tf.keras.Sequential([
    InputLayer(input_shape=(IM_SIZE, IM_SIZE, 3)),
    Conv2D(filters = 6, kernel_size = 3, padding = 'valid', strides = 1, activation = 'relu'),
    BatchNormalization(),
    MaxPool2D(pool_size = 2, strides = 2),

    Conv2D(filters = 16, kernel_size = 3, padding = 'valid', strides = 1, activation = 'relu'),
    BatchNormalization(),
    MaxPool2D(pool_size = 2, strides = 2),
])
feature_extractor_seq.summary()

In [None]:
func_input = Input(shape = (IM_SIZE, IM_SIZE, 3), name = 'Input Image')
# x = feature_extractor_model(func_input)
x = feature_extractor_seq(func_input)

x = Flatten()(x)
x = Dense(100, activation = 'relu')(x)
x = BatchNormalization()(x)
x = Dense(10, activation = 'relu')(x)
x = BatchNormalization()(x)
func_output = Dense(1, activation = 'sigmoid')(x)


lenet_model_func = Model(func_input, func_output, name = "lenet_Model")
lenet_model_func.summary()


# Model Subclassing

In [None]:
class FeatureExtractor(Layer):
  def __init__(self, filters, kernel_size, padding, strides, activation, pool_size):
    super(FeatureExtractor, self).__init__()
    self.conv1 = Conv2D(filters = filters , kernel_size = kernel_size, padding = padding, strides = strides, activation = activation)
    self.bn1 = BatchNormalization()
    self.maxpool1 = MaxPool2D(pool_size = pool_size, strides = 2*strides)

    self.conv2 = Conv2D(filters = 2*filters , kernel_size = kernel_size, padding = padding, strides = strides, activation = activation)
    self.bn2 = BatchNormalization()
    self.maxpool2 = MaxPool2D(pool_size = pool_size, strides = 2*strides)

  def call(self, x, training):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.maxpool1(x)

    x = self.conv2(x)
    x = self.bn2(x)
    x = self.maxpool2(x)
    return x

feature_sub_classed = FeatureExtractor(8, 3, 'valid', 1, 'relu', 2)


In [None]:
func_input = Input(shape = (IM_SIZE, IM_SIZE, 3), name = 'Input Image')
x = feature_sub_classed(func_input)

x = Flatten()(x)
x = Dense(100, activation = 'relu')(x)
x = BatchNormalization()(x)
x = Dense(10, activation = 'relu')(x)
x = BatchNormalization()(x)
func_output = Dense(1, activation = 'sigmoid')(x)


lenet_model_func = Model(func_input, func_output, name = "lenet_Model")
lenet_model_func.summary()


In [None]:
class LenetModel(Model):
  def __init__(self):
    super(LenetModel, self).__init__()
    self.feature_extractor = FeatureExtractor(8, 3, 'valid', 1, 'relu', 2)

    self.flatten = Flatten()
    self.dense1 = Dense(100, activation = 'relu')

    self.bn1 = BatchNormalization()
    self.dense2 = Dense(10, activation = 'relu')
    self.bn2 = BatchNormalization()

    self.dense3 = Dense(1, activation = 'sigmoid')

  def call(self, x, training):
    x = self.feature_extractor(x)

    x = self.flatten(x)

    x = self.dense1(x)
    x = self.bn1(x)

    x = self.dense2(x)
    x = self.bn2(x)

    x = self.dense3(x)
    return x

lenet_subclassed_model = LenetModel()
lenet_subclassed_model(tf.zeros([1, 224, 224, 3]), training = False)
lenet_subclassed_model.summary()

# Callbacks

In [None]:
class LossCallback(Callback):
    def on_epoch_end(self, epoch, logs):
        print(f'\n for epoch number {epoch} the model has loss of {logs["loss"]}')
    def on_batch_end(self, batch, logs):
        print(f'\n for batch number {batch} the model has loss of {logs}')

## CSV Logger

In [None]:
csv_callback = CSVLogger(
    'logs.csv',
    separator = ',',
    append = False
)

## Early Stopping

In [None]:
es_callback = EarlyStopping(
    monitor='val_loss', min_delta = 0, patience = 2, verbose = 1, mode = 'auto', baseline = None, restore_best_weights = False
)

## Learning Rate Schedular

In [None]:
def scheduler(epoch, lr):
    if epoch < 3:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

scheduler_callack = LearningRateScheduler(scheduler, verbose = 1)

## Model Checkpointing

In [None]:
checkpoint_callback = ModelCheckpoint(
    'ceheckpoints/', mode = 'auto', monitor = 'val_loss', save_best_only = True, save_weights_only = False, save_freq = 'epoch', verbose = 1
)

# Model Training

In [None]:
metrics = [TruePositives(name= 'tp'), FalsePositives(name = 'fp'), TrueNegatives(name = 'tn'), FalseNegatives(name = 'fn'),
           BinaryAccuracy(name = 'accuracy'), Precision(name = 'precision'), Recall(name = 'recall'), AUC(name = 'auc')]

In [None]:
model.compile(
    optimizer = Adam(learning_rate = 0.01),
    loss = BinaryCrossentropy(),
    metrics = metrics
)

In [None]:
history = model.fit(train_dataset, validation_data=val_dataset, epochs=5, verbose=1, callbacks = [scheduler_callack, checkpoint_callback])

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['train', 'val_loss'])
plt.show()

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model performance')
plt.xlabel('epoch')
plt.ylabel('rmse')
plt.legend(['train', 'val performance'])
plt.show()

# Model Evaluation and Testing

In [None]:
test_dataset

In [None]:
train_dataset

In [None]:
test_dataset = test_dataset.batch(1)

In [None]:
test_dataset

In [None]:
model.evaluate(test_dataset)

In [None]:
model.predict(test_dataset.take(1))[0][0]

# Visualizing Confusion matrix

In [None]:
labels = []
inp = []
for x,y in test_dataset.as_numpy_iterator():
    labels.append(y)
    inp.append(x)

In [None]:
print(np.array(inp).shape)
print(np.array(inp)[:,0, ...].shape)

In [None]:
labels = np.array([i[0] for i in labels])

In [None]:
labels

In [None]:
predicted = model.predict(np.array(inp)[:,0, ...])
predicted[:, 0]

In [None]:
threshold = 0.41

cm = confusion_matrix(labels, predicted > threshold)
print(cm)

plt.figure(figsize = (8,8))

sns.heatmap(cm, annot=True)
plt.title('confusion matrix - {}'.format(threshold))
plt.ylabel('actual')
plt.xlabel('predicted')


# ROC plot

In [None]:
fp, tp, thresholds = roc_curve(labels, predicted)
plt.figure(figsize=(16, 12))
plt.plot(fp, tp)
plt.xlabel('False positive rate')
plt.ylabel('true positive rate')
plt.grid()

skip = 20
for i in range(0, len(thresholds), skip):
    plt.text(fp[i], tp[i], thresholds[i])
plt.show()

In [None]:
def parasite_or_not(x):
  if(x < 0.5):
    return 'P'
  else:
    return 'U'

In [None]:
parasite_or_not(model.predict(test_dataset.take(1))[0][0])


In [None]:
for i , (image, label) in enumerate(test_dataset.take(16)):
  ax = plt.subplot(4, 4, i+1)
  plt.imshow(image[0])
  plt.title(str(parasite_or_not(label.numpy()[0])) + ':' + str(parasite_or_not(model.predict(image)[0][0])))
  plt.axis('off')

In [None]:
# model.save('malaria_diagnosis.keras')