# Imports

In [None]:
import glob
import cv2
import os

import numpy as np 
import pandas as pd 
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns


from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score, roc_auc_score, cohen_kappa_score, precision_score, recall_score, accuracy_score, confusion_matrix
from tensorflow.keras.utils import to_categorical

%matplotlib inline

In [None]:
print(os.listdir("../input/intel-mobileodt-cervical-cancer-screening"))

# Data import

In [None]:
#getting the total number of images in the training set

base_dir = '../input/intel-mobileodt-cervical-cancer-screening'

train_dir = os.path.join(base_dir,'train', 'train')

type1_dir = os.path.join(base_dir,'Type_1')
type2_dir = os.path.join(base_dir,'Type_2')
type3_dir = os.path.join(base_dir,'Type_3')

type1_files = glob.glob(type1_dir+'/*.jpg')
type2_files = glob.glob(type2_dir+'/*.jpg')
type3_files = glob.glob(type3_dir+'/*.jpg')

added_type1_files  =  glob.glob(os.path.join(base_dir, "additional_Type_1_v2", "Type_1")+'/*.jpg')
added_type2_files  =  glob.glob(os.path.join(base_dir, "additional_Type_2_v2", "Type_2")+'/*.jpg')
added_type3_files  =  glob.glob(os.path.join(base_dir, "additional_Type_3_v2", "Type_3")+'/*.jpg')

type1_files = type1_files + added_type1_files
type2_files = type2_files + added_type2_files
type3_files = type3_files + added_type3_files


print('Number of images in a train set of type 1: ', len(type1_files))
print('Number of images in a train set of type 2: ', len(type2_files))
print('Number of images in a train set of type 3: ', len(type3_files))
print('Total number of images in a train set: ', sum([len(type1_files), len(type2_files), len(type3_files)]))

In [None]:
# Building a dataframe mapping images and Cancer type

files_df = pd.DataFrame({
    'filename': type1_files + type2_files + type3_files,
    'label': ['Type_1'] * len(type1_files) + ['Type_2'] * len(type2_files) + ['Type_3'] * len(type3_files)
})

files_df

In [None]:
#Shuffle data

random_state = 42

files_df = files_df.sample(frac=1, random_state=random_state)
# files_df = files_df.sample(n=100, random_state=random_state)

files_df

# Data exploration

In [None]:
files_df.describe()

In [None]:
#Check for duplicates
len(files_df[files_df.duplicated()])

In [None]:
#Get count of each type 
type_count = pd.DataFrame(files_df['label'].value_counts())
type_count

In [None]:
print(list(type_count.columns)[0])

In [None]:
# Display barplot of type count

plt.figure(figsize = (15, 6))
sns.barplot(x= type_count[list(type_count.columns)[0]], y= type_count.index.to_list())
plt.title('Cervical Cancer Type Distribution')
plt.grid(True)
plt.show()

In [None]:
# Display sample images of types
for label in ('Type_1', 'Type_2', 'Type_3'):
    filepaths = files_df[files_df['label']==label]['filename'].values[:5]
    fig = plt.figure(figsize= (15, 6))
    for i, path in enumerate(filepaths):
        img = cv2.imread(path)
        img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
        img = cv2.resize(img, (224, 224))
        fig.add_subplot(1, 5, i+1)
        plt.imshow(img)
        plt.subplots_adjust(hspace=0.5)
        plt.axis(False)
        plt.title(label)

# Data propocessing

In [None]:
# Split training,val and test set : 70:15:15

train_files, test_files, train_labels, test_labels = train_test_split(files_df['filename'].values,
                                                                      files_df['label'].values, 
                                                                      test_size=0.3, 
                                                                      random_state=random_state)

test_files, val_files, test_labels, val_labels = train_test_split(test_files,
                                                                  test_labels, 
                                                                  test_size=0.5, 
                                                                  random_state=random_state)


print('Number of images in train set: ', train_files.shape)
print('Number of images in validation set: ', val_files.shape)
print('Number of images in test set: ', test_files.shape, '\n')

print('Train:', Counter(train_labels), '\nVal:', Counter(val_labels), '\nTest:', Counter(test_labels))

In [None]:
def load_images(files, labels):
    features = []
    correct_labels = []
    bad_images = 0
    
    for i in range(len(files)):
        try:
            img = cv2.imread(files[i])
            resized_img = cv2.resize(img, (160, 160))
            
            features.append(np.array(resized_img))
            correct_labels.append(labels[i])
                   
        except Exception as e:
            bad_images+=1
            print('Encoutered bad image')
    print('Bad images ecountered:', bad_images)
    return np.array(features), np.array(correct_labels)

In [None]:
# Load training and evaluation data
train_features, train_labels = load_images(train_files, train_labels)
print('Train images loaded')

val_features, val_labels = load_images(val_files, val_labels)
print('Validation images loaded')

test_features, test_labels = load_images(test_files, test_labels)
print('test images loaded')

In [None]:
# check lengths of training and evaluation  sets
len(train_features), len(train_labels), len(val_features), len(val_labels), len(test_features), len(test_labels) 

In [None]:
BATCH_SIZE = 32
NUM_CLASSES = 3
EPOCHS = 10
INPUT_SHAPE = (160, 160, 3)

In [None]:
# encode train+val sets text categories with labels
le = LabelEncoder()
le.fit(train_labels)

train_labels_enc = le.transform(train_labels)
val_labels_enc = le.transform(val_labels)

train_labels_1hotenc = tf.keras.utils.to_categorical(train_labels_enc, num_classes=NUM_CLASSES)
val_labels_1hotenc = tf.keras.utils.to_categorical(val_labels_enc, num_classes=NUM_CLASSES)

print(train_labels[:6], train_labels_enc[:6])
print(train_labels[:6], train_labels_1hotenc[:6])

In [None]:

le = LabelEncoder()
le.fit(test_labels)

test_labels_enc = le.transform(test_labels)

test_labels_1hotenc = tf.keras.utils.to_categorical(test_labels_enc, num_classes=NUM_CLASSES)


print(test_labels[:6], test_labels_enc[:6])
print(test_labels[:6], test_labels_1hotenc[:6])

# Data augmentation

In [None]:
data_augmentation = tf.keras.Sequential([
  tf.keras.layers.RandomFlip('horizontal'),
  tf.keras.layers.RandomRotation(0.2),
])

In [None]:
plt.figure(figsize=(10, 10))
first_image = train_features[0]
for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    augmented_image = data_augmentation(tf.expand_dims(first_image, 0))
    plt.imshow(augmented_image[0] / 255)
    plt.axis('off')
        

# Metrics

In [None]:
def get_accuracy_metrics(model, train_features=train_features, train_labels=train_labels_enc, test_features=test_features, test_labels=test_labels_enc, val_features=val_features, val_labels=val_labels_enc):    
    train_predicted = np.argmax(model.predict(train_features),axis=1)
    test_predicted = np.argmax(model.predict(test_features),axis=1)
    val_predicted = np.argmax(model.predict(val_features),axis=1)

    print("Train accuracy Score------------>")
    print ("{0:.3f}".format(accuracy_score(train_labels, train_predicted) *100), "%")
    
    print("Val accuracy Score--------->")
    print("{0:.3f}".format(accuracy_score(val_labels, val_predicted)*100), "%")
    
    print("Test accuracy Score--------->")
    print("{0:.3f}".format(accuracy_score(test_labels, test_predicted)*100), "%")
    
    print("F1 Score--------------->")
    print("{0:.3f}".format(f1_score(test_labels, test_predicted, average = 'weighted')*100), "%")
    
    print("Cohen Kappa Score------------->")
    print("{0:.3f}".format(cohen_kappa_score(test_labels, test_predicted)*100), "%")
    
    
    print("ROC AUC Score------------->")
    print("{0:.3f}".format(roc_auc_score(to_categorical(test_labels, num_classes = 3), test_predicted.reshape(-1, 1), multi_class='ovr')*100), "%")
    
    print("Recall-------------->")
    print("{0:.3f}".format(recall_score(test_labels, test_predicted, average = 'weighted')*100), "%")
    
    print("Precision-------------->")
    print("{0:.3f}".format(precision_score(test_labels, test_predicted, average = 'weighted')*100), "%")
    
    cf_matrix_test = confusion_matrix(test_labels, test_predicted)
    cf_matrix_val = confusion_matrix(val_labels, val_predicted)
    
    plt.figure(figsize = (12, 6))
    plt.subplot(121)
    sns.heatmap(cf_matrix_val, annot=True, cmap='Blues')
    plt.title("Val Confusion matrix")
    
    plt.subplot(122)
    sns.heatmap(cf_matrix_test, annot=True, cmap='Blues')
    plt.title("Test Confusion matrix")
    
    plt.show()

# General Model Fit

In [None]:
def learning_performance_chart(title, history):
    #plots a chart showing the change in accuracy and loss function over epochs
    f, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    t = f.suptitle(title, fontsize=12)
    f.subplots_adjust(top=0.85, wspace=0.3)

    max_epoch = len(history.history['accuracy'])+1
    epoch_list = list(range(1,max_epoch))
    ax1.plot(epoch_list, history.history['accuracy'], label='Train Accuracy')
    ax1.plot(epoch_list, history.history['val_accuracy'], label='Validation Accuracy')
    ax1.set_xticks(np.arange(1, max_epoch, 5))
    ax1.set_ylabel('Accuracy Value')
    ax1.set_xlabel('Epoch')
    ax1.set_title('Accuracy')
    l1 = ax1.legend(loc="best")

    ax2.plot(epoch_list, history.history['loss'], label='Train Loss')
    ax2.plot(epoch_list, history.history['val_loss'], label='Validation Loss')
    ax2.set_xticks(np.arange(1, max_epoch, 5))
    ax2.set_ylabel('Loss Value')
    ax2.set_xlabel('Epoch')
    ax2.set_title('Loss')
    l2 = ax2.legend(loc="best")

    

In [None]:
def fit_model(model_name, base_model, train_features, train_labels, validate_it,training = False, epochs = EPOCHS, batch_size= BATCH_SIZE):
    
    inputs = tf.keras.Input(shape=INPUT_SHAPE)
    
    x = data_augmentation(inputs)
    x = base_model(x, training=training)
    
    if not model_name.startswith('CNN'):
        x = tf.keras.layers.GlobalAveragePooling2D()(x)
    
    x = tf.keras.layers.Dropout(0.2)(x)
    
    outputs = tf.keras.layers.Dense(3, activation='softmax')(x)
    
    model = tf.keras.Model(inputs, outputs)
    
    es = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=5)
    
    model.compile(loss='categorical_crossentropy', optimizer ='adam', metrics=['accuracy'])
    
    print("Model Summary.")
    
    print(model.summary())
    
    history = model.fit(x=train_features,y=train_labels ,validation_data=validate_it, epochs=epochs, batch_size=batch_size, verbose=1, callbacks=[es])

    learning_performance_chart(title="{} learning performance.".format(model_name), history=history)
    
    return model

# MobileNet Pretranined

In [None]:
base_model = tf.keras.applications.MobileNet(include_top=False, 
                                               weights='imagenet', 
                                               input_shape=INPUT_SHAPE)

base_model.trainable = False

In [None]:
mobilenet = fit_model("MobileNet", base_model, train_features, train_labels_1hotenc, (val_features, val_labels_1hotenc))

In [None]:
print('MobileNet performance on the test set:')
get_accuracy_metrics(mobilenet)