In [None]:
# Import required packages
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds

In [None]:
# Load malaria dataset and split 80% of the samples as train data
train_ds = tfds.load('malaria', split='train[:80%]', as_supervised=True)

In [None]:
# Load malaria dataset and split 20% of the samples as test data
test_ds = tfds.load('malaria', split='train[:-20%]', as_supervised=True)

In [None]:
# Show initial size of images
fig, ax = plt.subplots(1, 5)
c=0
for i in train_ds.take(5):
  img = i[0]
  lbl = int(i[1])

  ax[c].imshow((img), cmap='gray')
  ax[c].set_title(img.shape)
  ax[c].axis("off")

  c+=1

In [None]:
# Identify the maximum and minimum height and width of the images
width=[]
height=[]

for (x,y) in train_ds:
  width+=[len(x)]
  height+=[len(x[0])]

maxheight = max(height)
maxwidth = max(width)

In [None]:
# Resize and generate images wuth same height and width (= maximum height and width of the images in the dataset)

train_data = train_ds.map(lambda img, label: (tf.image.resize(img, [maxheight, maxwidth]), label))
test_data = test_ds.map(lambda img, label: (tf.image.resize(img, [maxheight, maxwidth]), label))

In [None]:
# Visualize resized images

fig, ax = plt.subplots(1, 5)
c=0
for i in train_data.take(5):
  img = i[0]
  lbl = int(i[1])

  ax[c].imshow((tf.cast(img, dtype=tf.int64)), cmap='gray')
  ax[c].set_title(img.shape)
  ax[c].axis("off")

  c+=1

In [None]:
# Define normalization function 
# range 0 to 1
# max = 255, min = 0
def normalization(x):
  max = tf.math.reduce_max(x)
  min = tf.math.reduce_min(x)
  result = tf.math.subtract(x, min)
  result = tf.math.divide(result, (max-min))
  return(result)

In [None]:
# Normalize the images using normalization function
# Apply one-hot-encoding for the labels
train_data = train_data.map(lambda img, label: (normalization(img), tf.one_hot(label, 2)))
test_data = test_data.map(lambda img, label: (normalization(img), tf.one_hot(label, 2)))

In [None]:
# Preprocessing steps: shiffling, batching, prefetching
train_data = train_data.shuffle(buffer_size=128)
train_data = train_data.batch(64)
train_data = train_data.prefetch(4)

test_data = test_data.shuffle(buffer_size=128).batch(64).prefetch(4)

In [None]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Layer

# Build a model 
# 5 convolutional layers, 5 pooling layers, 1 global layer, 1 hidden layer, 1 output layer

class Model(Model): 
    
    def __init__(self):
        super(Model, self).__init__()
        # 
        self.conv_1 = tf.keras.layers.Conv2D(filters=16, 
                                             kernel_size=3,
                                             strides = (1,1),
                                             padding = 'valid',
                                             activation=tf.keras.activations.relu,
                                             input_shape=(255,255,3)
                                             )
        self.max_pool_1 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2))
        self.conv_2 = tf.keras.layers.Conv2D(filters=32, 
                                             kernel_size=3,
                                             strides = (1,1),
                                             padding = 'valid',
                                             activation=tf.keras.activations.relu
                                             )
                                               
        self.max_pool_2 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2))

        self.conv_3 = tf.keras.layers.Conv2D(filters=64,
                                             kernel_size=3,
                                             strides = (1,1),
                                             padding = 'valid',
                                             activation=tf.keras.activations.relu
                                             )
        self.max_pool_3 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2))

        self.conv_4 = tf.keras.layers.Conv2D(filters=128,
                                             kernel_size=3,
                                             strides = (1,1),
                                             padding = 'valid',
                                             activation=tf.keras.activations.relu
                                             )
        self.max_pool_4 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2))

        self.conv_5 = tf.keras.layers.Conv2D(filters=256,
                                             kernel_size=3,
                                             strides = (1,1),
                                             padding = 'valid',
                                             activation=tf.keras.activations.relu
                                             )
        self.max_pool_5 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2))

        self.global_pool = tf.keras.layers.GlobalAveragePooling2D()
        self.hidden_layer = tf.keras.layers.Dense(units=256,
                                                  activation= tf.keras.activations.relu
                                                  )

        self.output_layer = tf.keras.layers.Dense(units=2,
                                                  activation=tf.keras.activations.softmax)


    def call(self, x):
        # Define the forward step.
        x = self.conv_1(x)
        x = self.max_pool_1(x)
        x = self.conv_2(x)
        x = self.max_pool_2(x)
        x = self.conv_3(x)
        x = self.max_pool_3(x)
        x = self.conv_4(x)
        x = self.max_pool_4(x)
        x = self.conv_5(x)
        x = self.max_pool_5(x)
        x = self.global_pool(x)
        x = self.hidden_layer(x)
        x = self.output_layer(x)
        return x

In [None]:
### Hyperparameters
num_epochs = 20
learning_rate = 0.0008
running_average_factor = 0.95
lossfunction = tf.keras.losses.BinaryCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate)
# Initialize the model
model = Model()

In [None]:
def train_step(model, input, target, loss_function, optimizer):
  # Loss_object and optimizer_object are instances of respective tensorflow classes
  with tf.GradientTape() as tape:
    prediction = model(input)
    loss = loss_function(target, prediction)
    gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss 

def test(model, test_data, loss_function):
  # Test over complete test data

  test_accuracy_aggregator = []
  test_loss_aggregator = []

  for (input, target) in test_data:
    prediction = model(input)
    sample_test_loss = loss_function(target, prediction)
    sample_test_accuracy =  np.argmax(target, axis=1) == np.argmax(prediction, axis=1)
    sample_test_accuracy = np.mean(sample_test_accuracy)
    test_loss_aggregator.append(sample_test_loss.numpy())
    test_accuracy_aggregator.append(np.mean(sample_test_accuracy))

  test_loss = np.mean(test_loss_aggregator)
  test_accuracy = np.mean(test_accuracy_aggregator)

  return test_loss, test_accuracy
  



In [None]:
tf.keras.backend.clear_session()

# Initialize lists for later visualization.
train_losses = []
test_losses = []
test_accuracies = []

# Testing once before we begin
test_loss, test_accuracy = test(model, test_data, lossfunction)
test_losses.append(test_loss)
test_accuracies.append(test_accuracy)

# Check how model performs on train data once before we begin
train_loss, _ = test(model, train_data, lossfunction)
train_losses.append(train_loss)



In [None]:
# We train for num_epochs epochs.
for epoch in range(num_epochs):
    print('Epoch: __ ' + str(epoch))

    train_dataset = train_dataset.shuffle(buffer_size=128)
    test_dataset = test_dataset.shuffle(buffer_size=128)

    # Training (and checking in with training)
    running_average = 0
    for (input,target) in train_data:
        train_loss = train_step(model, input, target, lossfunction, optimizer)
        running_average = running_average_factor * running_average  + (1 - running_average_factor) * train_loss
    train_losses.append(running_average)

    # Testing
    test_loss, test_accuracy = test(model, test_data, lossfunction)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)


In [None]:
# Visualize accuracy and loss for training and test data
# One plot training and test loss.
# One plot training and test accuracy.
plt.figure()
line1, = plt.plot(train_losses)
line2, = plt.plot(test_losses)
plt.xlabel("Training steps")
plt.ylabel("Loss")
plt.legend((line1,line2),("training","test"))
plt.show()

plt.figure()
line1, = plt.plot(test_accuracies)
plt.title('Accuracy: '+ str(np.max(test_accuracies)))
plt.xlabel("Training steps")
plt.ylabel("Accuracy")
plt.show()
