In [None]:
import os
import pandas as pd
import glob2
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
NIH_14_DATASET_PATH = './NIH_14/'

In [None]:
dataset_path = os.path.abspath(NIH_14_DATASET_PATH) 
os.listdir(dataset_path)

In [None]:
data_entry_csv_path = os.path.join(dataset_path, 'Data_Entry_2017.csv')
data = pd.read_csv(data_entry_csv_path)
print(f"Data Shape : {data.shape}")
data.head()

In [None]:
data = data[data['Patient Age']<100]

print(f"New dataset dimensions: {data.shape}")

In [None]:
data = data[['Image Index', 'Finding Labels']]
print(data.shape)

In [None]:
all_images = sorted(glob2.glob(dataset_path + '/**/*.png'))
print(f'Number of Images: {len(all_images)}')

all_image_paths = {os.path.basename(x): x for x in all_images}

#Add path of images as column to the dataset
data['Path'] = data['Image Index'].map(all_image_paths.get)
data.sample(5, random_state=3)

In [None]:
from itertools import chain
all_labels = np.unique(list(chain(*data['Finding Labels'].map(lambda x: x.split('|')).tolist())))
print(all_labels)

In [None]:
all_labels = np.delete(all_labels, np.where(all_labels == 'No Finding'))
all_labels = [x for x in all_labels]
all_labels

In [None]:
for c_label in all_labels:
    if len(c_label)>1: # leave out empty labels
        # Add a column for each desease
        data[c_label] = data['Finding Labels'].map(lambda finding: 1 if c_label in finding else 0)
        
print(f"Dataset Dimension: {data.shape}")
data.head()

In [None]:
label_counts = data['Finding Labels'].value_counts()
label_counts

In [None]:
data = data.groupby('Finding Labels').filter(lambda x : len(x)>11)
label_counts = data['Finding Labels'].value_counts()
print(label_counts.shape)
print(label_counts)

In [None]:
train_and_valid_df, test_df = train_test_split(data,
                                               test_size = 0.30,
                                               random_state = 2018,
                                              )

train_df, valid_df = train_test_split(train_and_valid_df,
                                      test_size=0.30,
                                      random_state=2018,
                                     )

print(f'Training: {train_df.shape[0]} Validation: {valid_df.shape[0]} Testing: {test_df.shape[0]}')

In [None]:
base_generator = ImageDataGenerator(rescale=1./255)

In [None]:
IMG_SIZE = (224, 224)
def flow_from_dataframe(image_generator, dataframe, batch_size):

    df_gen = image_generator.flow_from_dataframe(dataframe,
                                                 x_col='Path',
                                                 y_col=all_labels,
                                                 target_size=IMG_SIZE,
                                                 classes=all_labels,
                                                 color_mode='rgb',
                                                 class_mode='raw',
                                                 shuffle=False,
                                                 batch_size=batch_size)
    
    return df_gen

In [None]:
train_gen = flow_from_dataframe(image_generator=base_generator, 
                                dataframe= train_df,
                                batch_size = 32)

valid_gen = flow_from_dataframe(image_generator=base_generator, 
                                dataframe=valid_df,
                                batch_size = 32)

test_gen = flow_from_dataframe(image_generator=base_generator, 
                               dataframe=test_df,
                               batch_size = 32)

In [None]:
train_x, train_y = next(train_gen)
print(f"Image Dimensions: {train_x[1].shape}")
print(f"Labels: {train_y[1]}")

In [None]:
from tensorflow.keras.layers import Input
from tensorflow.keras.applications.densenet import DenseNet121
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model

input_shape=(224, 224, 3)

img_input = Input(shape=input_shape)

base_model = DenseNet121(include_top=False, input_tensor=img_input, input_shape=input_shape, 
                         pooling="avg", weights='imagenet')
x = base_model.output

predictions = Dense(len(all_labels), activation="sigmoid", name="predictions")(x)
model = Model(inputs=img_input, outputs=predictions)

In [None]:
from evaluation_helper import EvaluationHelper

In [None]:
evaluation_helper_instance = EvaluationHelper()

In [None]:
BASELINE_FP32_MODEL_PATH = os.path.abspath('./weights/baseline_FP32.h5')
BASELINE_QAT_FP32_MODEL_PATH = os.path.abspath('./weights/baseline_QAT_FP32.h5')
INT8_TFLITE_MODEL_PATH = os.path.abspath('./weights/QAT_INT8.tflite')
FP16_TFLITE_MODEL_PATH = os.path.abspath('./weights/FP16.tflite')
DYNAMIC_QUANTIZED_TFLITE_MODEL_PATH = os.path.abspath('./weights/dynamic_quantized.tflite')
FP32_NO_QUANTIZED_TFLITE_MODEL_PATH = os.path.abspath('./weights/FP32_no_quantization.tflite')

In [None]:
evaluation_helper_instance.test_generator = test_gen

In [None]:
model.load_weights(BASELINE_FP32_MODEL_PATH)

In [None]:
baseline_pred = evaluation_helper_instance.get_model_predictions(model)

In [None]:
auc_score = evaluation_helper_instance.get_auc_roc_score(baseline_pred)
print(f"Baseline FP32 AUC Score : {auc_score}")
with open('baseline_FP32_auc_score.txt', 'w') as fp:
    fp.write(f"AUC-ROC score for FP32 baseline model is {auc_score}")

In [None]:
evaluation_helper_instance.get_auc_plot(baseline_pred, all_labels, 'baseline_FP32.png')

In [None]:
evaluation_helper_instance.qunantized_model_path = FP16_TFLITE_MODEL_PATH

In [None]:
fp_16_pred = evaluation_helper_instance.get_tflite_predictions()

In [None]:
auc_score = evaluation_helper_instance.get_auc_roc_score(fp_16_pred)
print(f"FP16 AUC Score : {auc_score}")
with open('FP16_auc_score.txt', 'w') as fp:
    fp.write(f"AUC-ROC score for FP16 Quantized model is {auc_score}")

In [None]:
evaluation_helper_instance.get_auc_plot(fp_16_pred, all_labels, 'FP16.png')

In [None]:
evaluation_helper_instance.qunantized_model_path = DYNAMIC_QUANTIZED_TFLITE_MODEL_PATH

In [None]:
dynamic_quant_pred = evaluation_helper_instance.get_tflite_predictions()

In [None]:
auc_score = evaluation_helper_instance.get_auc_roc_score(dynamic_quant_pred)
print(f"Dynamic Quant AUC Score : {auc_score}")
with open('DynamicQuant_auc_score.txt', 'w') as fp:
    fp.write(f"AUC-ROC score for Dynamic Quantized model is {auc_score}")

In [None]:
evaluation_helper_instance.get_auc_plot(dynamic_quant_pred, all_labels, 'DynamicQunat.png')

In [None]:
evaluation_helper_instance.qunantized_model_path = INT8_TFLITE_MODEL_PATH

In [None]:
int8_quant_pred = evaluation_helper_instance.get_tflite_predictions()

In [None]:
auc_score = evaluation_helper_instance.get_auc_roc_score(int8_quant_pred)
print(f"INT8 Quant AUC Score : {auc_score}")
with open('INT8_auc_score.txt', 'w') as fp:
    fp.write(f"AUC-ROC score for INT8 Quantized model is {auc_score}")

In [None]:
evaluation_helper_instance.get_auc_plot(int8_quant_pred, all_labels, 'INT8.png')

In [None]:
import tensorflow_model_optimization as tfmot

In [None]:
class DefaultBNQuantizeConfig(tfmot.quantization.keras.QuantizeConfig):
    def get_weights_and_quantizers(self, layer):
        return []
    
    def get_activations_and_quantizers(self, layer):
        return []
    
    def set_quantize_weights(self, layer, quantize_weights):
        pass

    def set_quantize_activations(self, layer, quantize_activations):
        pass

    def get_output_quantizers(self, layer):
        return [tfmot.quantization.keras.quantizers.MovingAverageQuantizer(
    num_bits=8, per_axis=False, symmetric=False, narrow_range=False)]

    def get_config(self):
        return {}

In [None]:
def apply_quantization_to_batch_normalization(layer):
    if isinstance(layer, tf.keras.layers.BatchNormalization):
        return quantize_annotate_layer(layer, DefaultBNQuantizeConfig())
    
    return layer

In [None]:
annotated_model = tf.keras.models.clone_model(
                    model,
                    clone_function=apply_quantization_to_batch_normalization,
)

In [None]:
with quantize_scope(
  {'DefaultBNQuantizeConfig': DefaultBNQuantizeConfig}):
  # Use `quantize_apply` to actually make the model quantization aware.
  quant_aware_model = tfmot.quantization.keras.quantize_apply(annotated_model)

In [None]:
quant_aware_model.load_weights(BASELINE_QAT_FP32_MODEL_PATH)

In [None]:
qat_baseline_FP32_pred = evaluation_helper_instance.get_model_predictions(quant_aware_model)

In [None]:
auc_score = evaluation_helper_instance.get_auc_roc_score(qat_baseline_FP32_pred)
print(f"QAT Baseline FP32 AUC Score : {auc_score}")
with open('QAT_baseline_FP32_auc_score.txt', 'w') as fp:
    fp.write(f"AUC-ROC score for QAT FP32 baseline model is {auc_score}")

In [None]:
evaluation_helper_instance.get_auc_plot(qat_baseline_FP32_pred, all_labels, 'QAT_baseline_FP32.png')