# 1. Import backage



In [None]:
import re
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from keras.preprocessing import image

# Try to use TPU strategy, fall back to default strategy if not available
try:
    # Create a TPUClusterResolver and use it to initialize the TPU system
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)

    # Create a TPUStrategy using the TPUClusterResolver
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    # Fall back to the default strategy if a TPU is not available
    strategy = tf.distribute.get_strategy()

# Print the number of replicas in the strategy
print('Number of replicas:', strategy.num_replicas_in_sync)

# Print the TensorFlow version
print(tf.__version__)

Number of replicas: 1
2.8.2


In [None]:
import tensorflow as tf

# Mount Google Drive
drive.mount('/content/drive')

# Set the AUTOTUNE setting for the tf.data API
AUTOTUNE = tf.data.experimental.AUTOTUNE

# Get the path to the drive directory
PATH = "/content/drive/MyDrive"

# Set the batch size using the number of replicas in the strategy
BATCH_SIZE = 16 * strategy.num_replicas_in_sync

# Set the image size
IMAGE_SIZE = [180, 180]

# Set the number of epochs
EPOCHS = 100

Mounted at /content/drive


# 2. Load Input data : L labelled dataset, U unlabelled dataset

### Download the data  from Kaggle : https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia/code?datasetId=17810&sortBy=voteCount



In [None]:
# Get a list of filenames in the training and validation directories
filenames = tf.io.gfile.glob(str(PATH + '/chest_xray/train/*/*'))
filenames.extend(tf.io.gfile.glob(str(PATH + '/chest_xray/val/*/*')))

# Split the filenames into a training set and a validation set
train_filenames, val_filenames = train_test_split(filenames, test_size=0.2)

In [None]:
# Split the training filenames into a labelled set and an unlabelled set
L_labelled_filenames, U_unlabelled_filenames = train_test_split(train_filenames, test_size=0.8)

Run the following cell to see how many healthy/normal chest X-rays we have and how many pneumonia chest X-rays we have.

In [None]:
# Count the number of normal images in the labelled training set
COUNT_NORMAL = len([filename for filename in L_labelled_filenames if "NORMAL" in filename])
print("Normal images count in training set: " + str(COUNT_NORMAL))

# Count the number of pneumonia images in the labelled training set
COUNT_PNEUMONIA = len([filename for filename in L_labelled_filenames if "PNEUMONIA" in filename])
print("Pneumonia images count in training set: " + str(COUNT_PNEUMONIA))

Normal images count in training set: 57
Pneumonia images count in training set: 636


Notice that the there are way more images that are classified as pneumonia than normal. This shows that we have a imbalance in our data. We will correct for this imbalance later on in our notebook.

In [None]:
# Create a dataset from the list of training filenames
train_list_ds = tf.data.Dataset.from_tensor_slices(L_labelled_filenames)

# Create a dataset from the list of validation filenames
val_list_ds = tf.data.Dataset.from_tensor_slices(val_filenames)

Run the following cell to see how many images we have in our training dataset and how many images we have in our validation set. Verify that the ratio of images is 80:20.

In [None]:
# Get the number of training images
TRAIN_IMG_COUNT = tf.data.experimental.cardinality(train_list_ds).numpy()
print("Training images count: " + str(TRAIN_IMG_COUNT))

# Get the number of validation images
VAL_IMG_COUNT = tf.data.experimental.cardinality(val_list_ds).numpy()
print("Validating images count: " + str(VAL_IMG_COUNT))

Training images count: 693
Validating images count: 867


As expected, we have two labels for our images.

In [None]:
CLASS_NAMES = np.array([str(tf.strings.split(item, os.path.sep)[-1].numpy())[2:-1]
                        for item in tf.io.gfile.glob(str(PATH + "/chest_xray/train/*"))])

Currently our dataset is just a list of filenames. We want to map each filename to the corresponding (image, label) pair. The following methods will help us do that.

As we only have two labels, we will rewrite the label so that `1` or `True` indicates pneumonia and `0` or `False` indicates normal.

In [None]:
def get_label(file_path):
    # convert the path to a list of path components
    parts = tf.strings.split(file_path, os.path.sep)
    # The second to last is the class-directory
    return parts[-2] == "PNEUMONIA"

The images originally have values that range from [0, 255]. CNNs work better with smaller numbers so we will scale this down.

In [None]:
def decode_img(img):
  # convert the compressed string to a 3D uint8 tensor
  img = tf.image.decode_jpeg(img, channels=3)
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  img = tf.image.convert_image_dtype(img, tf.float32)
  # resize the image to the desired size.
  return tf.image.resize(img, IMAGE_SIZE)

In [None]:
def process_path(file_path):
    label = get_label(file_path)
    # load the raw data from the file as a string
    img = tf.io.read_file(file_path)
    img = decode_img(img)
    return img, label

In [None]:
# Create a dataset from the training filenames by applying the `process_path` function to each element
train_ds = train_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)

# Create a dataset from the validation filenames by applying the `process_path` function to each element
val_ds = val_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)

Let's visualize the shape of an (image, label) pair.

Load and format the test data as well.

In [None]:
# Create a dataset of filenames in the test directory
test_list_ds = tf.data.Dataset.list_files(str(PATH + '/chest_xray/test/*/*'))

# Get a list of filenames in the test directory
test_filenames=tf.io.gfile.glob(str(PATH + '/chest_xray/test/*/*'))

# Get the number of test images
TEST_IMAGE_COUNT = tf.data.experimental.cardinality(test_list_ds).numpy()

# Create a dataset from the test filenames by applying the `process_path` function to each element
test_ds = test_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)

# Batch the test dataset
test_ds = test_ds.batch(BATCH_SIZE)

# Print the number of test images
TEST_IMAGE_COUNT

624

In [None]:
def prepare_for_training(ds, cache=True, shuffle_buffer_size=1000):
    # This is a small dataset, only load it once, and keep it in memory.
    # use `.cache(filename)` to cache preprocessing work for datasets that don't
    # fit in memory.
    if cache:
        if isinstance(cache, str):
            ds = ds.cache(cache)
        else:
            ds = ds.cache()

    ds = ds.shuffle(buffer_size=shuffle_buffer_size)

    # Repeat forever
    ds = ds.repeat()

    ds = ds.batch(BATCH_SIZE)

    # `prefetch` lets the dataset fetch batches in the background while the model
    # is training.
    ds = ds.prefetch(buffer_size=AUTOTUNE)

    return ds

Call the next batch iteration of the training data.

In [None]:
train_ds = prepare_for_training(train_ds)
val_ds = prepare_for_training(val_ds)


Define the method to show the images in the batch.

# 4. Build the CNN



In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Dense
from tensorflow.keras.layers import AvgPool2D, GlobalAveragePooling2D, MaxPool2D
from tensorflow.keras.models import Model
from tensorflow.keras.layers import ReLU, concatenate
import tensorflow.keras.backend as K
# Creating Densenet121
def densenet(input_shape, n_classes, filters = 32):
    
    #batch norm + relu + conv
    def bn_rl_conv(x,filters,kernel=1,strides=1):
        
        x = BatchNormalization()(x)
        x = ReLU()(x)
        x = Conv2D(filters, kernel, strides=strides,padding = 'same')(x)
        return x
    
    def dense_block(x, repetition):
        
        for _ in range(repetition):
            y = bn_rl_conv(x, 4*filters)
            y = bn_rl_conv(y, filters, 3)
            x = concatenate([y,x])
        return x
        
    def transition_layer(x):
        
        x = bn_rl_conv(x, K.int_shape(x)[-1] //2 )
        x = AvgPool2D(2, strides = 2, padding = 'same')(x)
        return x
    
    input = Input (input_shape)
    x = Conv2D(64, 7, strides = 2, padding = 'same')(input)
    x = MaxPool2D(3, strides = 2, padding = 'same')(x)
    
    for repetition in [6,12,24,16]:
        
        d = dense_block(x, repetition)
        x = transition_layer(d)
    x = GlobalAveragePooling2D()(d)
    output = Dense(n_classes, activation = 'softmax')(x)
    
    model = Model(input, output)
    return model
input_shape = 180, 180, 3
n_classes = 1


def build_model():
    
    
    return  densenet(input_shape,n_classes)

# 5. Correct for data imbalance

We saw earlier in this notebook that the data was imbalanced, with more images classified as pneumonia than normal. We will correct for that in this following section.

In [None]:
initial_bias = np.log([COUNT_PNEUMONIA/COUNT_NORMAL])
initial_bias

array([2.4121473])

In [None]:
weight_for_0 = (1 / COUNT_NORMAL)*(TRAIN_IMG_COUNT)/2.0 
weight_for_1 = (1 / COUNT_PNEUMONIA)*(TRAIN_IMG_COUNT)/2.0

class_weight = {0: weight_for_0, 1: weight_for_1}

print('Weight for class 0: {:.2f}'.format(weight_for_0))
print('Weight for class 1: {:.2f}'.format(weight_for_1))

Weight for class 0: 6.08
Weight for class 1: 0.54


The weight for class `0` (Normal) is a lot higher than the weight for class `1` (Pneumonia). Because there are less normal images, each normal image will be weighted more to balance the data as the CNN works best when the training data is balanced.

# Step 1: Train a DenseNet using an initial labelled training set L

In [None]:
def model_cloner(learning_rate):
    model = build_model()

    METRICS = [
        'accuracy',
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall')
    ]
    
    model.compile(
        optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss='binary_crossentropy',
        metrics=METRICS
    )
    return model

In [None]:
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("xray_model.h5",
                                                    save_best_only=True)

early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=10,
                                                     restore_best_weights=True)

def exponential_decay(lr0, s):
    def exponential_decay_fn(epoch):
        return lr0 * 0.1 **(epoch / s)
    return exponential_decay_fn

exponential_decay_fn = exponential_decay(0.01, 20)

lr_scheduler = tf.keras.callbacks.LearningRateScheduler(exponential_decay_fn)

# Step 3: Select samples from U using a query function Q,

In [None]:
from keras.preprocessing import image

img_width, img_height = 180, 180
threshold=len(U_unlabelled_filenames)
def new_labelled_dataset(U_unlabelled_filenames,model):

  df=pd.DataFrame(columns=["url", "score"])
  for url in U_unlabelled_filenames:
    
    img = image.load_img(url, target_size = (img_width, img_height))
    img = image.img_to_array(img)
    img = np.expand_dims(img, axis = 0)
    df.loc[len(df)] = [url,model.predict(img)[0][0]]

  df=df.sort_values(by='score', ascending=False)
  new_labelled_filenames=df.head(int(threshold*0.2)).url.tolist()

  
  U_unlabelled_filenames = [ele for ele in U_unlabelled_filenames if ele not in new_labelled_filenames]
  return U_unlabelled_filenames,new_labelled_filenames


# Step 4: request the labels for the samples selected in step 3 from the expert A,

#### we don't need this step because we have all the labels

# Step 5: remove the selected samples from the dataset U and add the selected samples to L

In [None]:
def update_L_labelled_filenames(new_labelled_filenames):
  return L_labelled_filenames + new_labelled_filenames

\# Step 6: retain the DenseNet using the dataset L

In [None]:
# Learning rate list
learning_rates=[0.1,0.05,0.01,0.005,0.001]

In [None]:
learning_rates

[0.1, 0.05, 0.01, 0.005, 0.001]

In [None]:
accuracy=[]

for learning_rate in learning_rates:
    x=[]
    L_labelled_filenames, U_unlabelled_filenames = train_test_split(train_filenames, test_size=0.8)
    x.append(len(L_labelled_filenames))
    learning_rate_accuracy=[]
    model=model_cloner(learning_rate)

    history = model.fit(
        train_ds,
        steps_per_epoch=TRAIN_IMG_COUNT // BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=val_ds,
        validation_steps=VAL_IMG_COUNT // BATCH_SIZE,
        class_weight=class_weight,
        callbacks=[checkpoint_cb, early_stopping_cb]
    )
    loss, acc, prec, rec = model.evaluate(test_ds)
    learning_rate_accuracy.append(acc)
    step = 0
    
    while U_unlabelled_filenames!= []:
    
      print( "Start of step : " + str(step))

      # Update training data
      
      U_unlabelled_filenames,new_labelled_filenames= new_labelled_dataset(U_unlabelled_filenames,model)
      L_labelled_filenames = update_L_labelled_filenames(new_labelled_filenames)
      x.append(len(L_labelled_filenames))
      # Count number of NORMAL and PNEUMONIA  image

      COUNT_NORMAL = len([filename for filename in L_labelled_filenames if "NORMAL" in filename])
      COUNT_PNEUMONIA = len([filename for filename in L_labelled_filenames if "PNEUMONIA" in filename])

      train_list_ds = tf.data.Dataset.from_tensor_slices(L_labelled_filenames)
      train_ds = train_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
      train_ds = prepare_for_training(train_ds)

      # Class weight

      weight_for_0 = (1 / COUNT_NORMAL)*(TRAIN_IMG_COUNT)/2.0 
      weight_for_1 = (1 / COUNT_PNEUMONIA)*(TRAIN_IMG_COUNT)/2.0
      class_weight = {0: weight_for_0, 1: weight_for_1}

      # Retrain model

      history = model.fit(
        train_ds,
        steps_per_epoch=TRAIN_IMG_COUNT // BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=val_ds,
        validation_steps=VAL_IMG_COUNT // BATCH_SIZE,
        class_weight=class_weight,
         callbacks=[checkpoint_cb, early_stopping_cb]
      )
      # save model
      loss, acc, prec, rec = model.evaluate(test_ds)
      learning_rate_accuracy.append(acc)
      print( "End of step : " + str(step))
      step+=1
    accuracy.append([learning_rate,x,learning_rate_accuracy])

print( "End of training")

Epoch 1/100
Epoch 2/100
Epoch 3/100
 5/43 [==>...........................] - ETA: 8s - loss: 0.8327 - accuracy: 0.9250 - precision: 0.9250 - recall: 1.0000

# proposed algorithm Uncertainty Sampling (Least confidence (LC)) with different learning rates and labelled samples

In [None]:
# plot lines
plt.plot(accuracy[0][1], accuracy[0][2], label = "learning rate = " + str(accuracy[0][0]))
plt.plot(accuracy[1][1], accuracy[1][2], label = "learning rate = " + str(accuracy[1][0]))
plt.plot(accuracy[2][1], accuracy[2][2], label = "learning rate = " + str(accuracy[2][0]))
plt.plot(accuracy[3][1], accuracy[3][2], label = "learning rate = " + str(accuracy[3][0]))
plt.plot(accuracy[4][1], accuracy[4][2], label = "learning rate = " + str(accuracy[4][0]))
plt.xlabel("# Labelled Samples")
plt.ylabel("Accuracy")
plt.grid()
plt.legend()
plt.show()

#  Accuracy of Uncertainty Sampling (Least confidence (LC)) on a single graph with same x-axis labelled samples with a learning rate 0.05,

In [None]:

plt.plot(accuracy[1][1], accuracy[1][2], label = "learning rate = " + str(accuracy[1][0]))
plt.xlabel("# Labelled Samples")
plt.ylabel("Accuracy")
plt.grid()
plt.legend()
plt.show()

# The accuracy gap in terms of labelled samples for the Uncertainty Sampling (Least confidence (LC)).

In [None]:
L=[]
for i in range(len(accuracy[1][2])):
  if i == 0:
    L.append(0)
  else:
    L.append(accuracy[1][2][i]-accuracy[1][2][i-1])

plt.plot(accuracy[1][1], L, label = "learning rate = " + str(accuracy[1][0]))
plt.xlabel("# Labelled Samples")
plt.ylabel("Accuracy")
plt.grid()
plt.legend()
plt.show()

# Accuracy with cross validation

In [None]:
n_folds=2

learning_rate=1
#save the model history in a list after fitting so that we can plot later
model_history = [] 
accuracy=[]
for i in range(n_folds):
    x=[]
    print("Training on Fold: ",i+1)
    train_filenames, val_filenames = train_test_split(filenames, test_size=0.2,random_state = np.random.randint(1,1000, 1)[0])
                                               

    
    L_labelled_filenames, U_unlabelled_filenames = train_test_split(train_filenames, test_size=0.8)
    train_list_ds = tf.data.Dataset.from_tensor_slices(L_labelled_filenames)
    val_list_ds = tf.data.Dataset.from_tensor_slices(val_filenames)
    train_ds = train_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)

    val_ds = val_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
    train_ds = prepare_for_training(train_ds)
    val_ds = prepare_for_training(val_ds)


    x.append(len(L_labelled_filenames))
    learning_rate_accuracy=[]
    model=model_cloner(learning_rate)

    history = model.fit(
        train_ds,
        steps_per_epoch=TRAIN_IMG_COUNT // BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=val_ds,
        validation_steps=VAL_IMG_COUNT // BATCH_SIZE,
        class_weight=class_weight,
        callbacks=[checkpoint_cb, early_stopping_cb]
    )
    loss, acc, prec, rec = model.evaluate(test_ds)
    learning_rate_accuracy.append(acc)
    step = 0
    
    while U_unlabelled_filenames!= []:
    
      print( "Start of step : " + str(step))

      # Update training data
      
      U_unlabelled_filenames,new_labelled_filenames= new_labelled_dataset(U_unlabelled_filenames,model)
      print(len(U_unlabelled_filenames))
      L_labelled_filenames = update_L_labelled_filenames(new_labelled_filenames)
      x.append(len(L_labelled_filenames))
      # Count number of NORMAL and PNEUMONIA  image

      COUNT_NORMAL = len([filename for filename in L_labelled_filenames if "NORMAL" in filename])
      COUNT_PNEUMONIA = len([filename for filename in L_labelled_filenames if "PNEUMONIA" in filename])

      train_list_ds = tf.data.Dataset.from_tensor_slices(L_labelled_filenames)
      train_ds = train_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
      train_ds = prepare_for_training(train_ds)

      # Class weight

      weight_for_0 = (1 / COUNT_NORMAL)*(TRAIN_IMG_COUNT)/2.0 
      weight_for_1 = (1 / COUNT_PNEUMONIA)*(TRAIN_IMG_COUNT)/2.0
      class_weight = {0: weight_for_0, 1: weight_for_1}

      # Retrain model

      history = model.fit(
        train_ds,
        steps_per_epoch=TRAIN_IMG_COUNT // BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=val_ds,
        validation_steps=VAL_IMG_COUNT // BATCH_SIZE,
        class_weight=class_weight,
         callbacks=[checkpoint_cb, early_stopping_cb]
      )
      # save model
      loss, acc, prec, rec = model.evaluate(test_ds)
      learning_rate_accuracy.append(acc)
      print( "End of step : " + str(step))
      step+=1
    accuracy.append([n_folds,x,learning_rate_accuracy])
    model_history.append(history)

print( "End of training")

# 	Training and validation accuracy  for proposed algorithm with a learning rate 0.05.

In [None]:
plt.title('Train Accuracy vs Val Accuracy')
plt.plot(model_history[0].history['accuracy'], label='Train Accuracy Fold 1', color='black')
plt.plot(model_history[0].history['val_accuracy'], label='Val Accuracy Fold 1', color='black', linestyle = "dashdot")

plt.legend()
plt.show()