In [None]:
import cv2
import imgaug
import numpy as np
import glob
import matplotlib.pyplot as plt
import os
from skimage.exposure import match_histograms

In [None]:
root = '../input/cervical-adenocarcinoma-silva-classification/Cervial Silva/'
label = {
    'Class A': 0, 
    'Class B': 1,
    'Class C': 2
}

#### Color Normalization 

Here we use a templated-based histogram match algorithm to implement color normalization so that the difference of color in histology image will be minimized. Histogram matching algorithm from skimage.exposure package is used. 

In [None]:
# reference image is chosen by pathologist, which is stained in good condition. 
reference = '../input/cervical-adenocarcinoma-silva-classification/Cervial Silva/Class A/E2016523-2.jpg'

test_img = ['../input/cervical-adenocarcinoma-silva-classification/Cervial Silva/Class A/E1711105-2.jpg',
            '../input/cervical-adenocarcinoma-silva-classification/Cervial Silva/Class A/S1814246-2.jpg',
            '../input/cervical-adenocarcinoma-silva-classification/Cervial Silva/Class B/E1500920-2.jpg',
            '../input/cervical-adenocarcinoma-silva-classification/Cervial Silva/Class B/S1815863-2.jpg',
            '../input/cervical-adenocarcinoma-silva-classification/Cervial Silva/Class C/E1409496-2.jpg',
            '../input/cervical-adenocarcinoma-silva-classification/Cervial Silva/Class C/S1806161-2.jpg']

plt.figure(figsize=(15,30))
for i in range(len(test_img)):
    ref = cv2.imread(reference)
    img = cv2.imread(test_img[i])
    normed = match_histograms(img, ref, multichannel=True)
    
    plt.subplot(6,3,3*i+1)
    plt.imshow(ref)
    plt.title('Reference')
    
    plt.subplot(6,3,3*i+2)
    plt.imshow(img)
    plt.title('Original')
    
    plt.subplot(6,3,3*i+3)
    plt.imshow(normed)
    plt.title('Normalized')

plt.show()


#### Patches Extraction
- Get patches (tiles) of a histology image
- Implement patch-wise training of model
- Implement patient-wise test of model (vote)

In [None]:
def get_tiles(img, stride=(256,256), size=(256, 256)): 
    tile_list = []
    dim = ((img.shape[0]-size[0])//stride[0] + 1, (img.shape[1]-size[1])//stride[1] + 1)
    for y in range(dim[0]):
        for x in range(dim[1]):
            tile = np.zeros_like(size + (3,))
            tile = img[y*stride[0]:y*stride[0]+size[0], x*stride[1]:x*stride[1]+size[1], :]
            tile_list.append(tile)
    return tile_list, dim

def show_tiles(tiles_list, dim):
    f, ax = plt.subplots(dim[0], dim[1], figsize=(10,7.5), constrained_layout=True)
    for i in range(dim[0]):
        for j in range(dim[1]):
            ax[i, j].axis('off')
            ax[i, j].imshow(tiles_list[i*dim[1]+j])
#     f.subplots_adjust(wspace=0.01, hspace=0.01)
#     f.tight_layout()


tiles_list, dim = get_tiles(cv2.imread(reference), stride=(512, 512), size=(512, 512))
show_tiles(tiles_list, dim)

In [None]:
from sklearn.model_selection import train_test_split

def prepare_data():
    ref = cv2.imread(reference)
    dataset = []
    y_label = []
    for k, v in label.items():
        patients = glob.glob(os.path.join(root, k, '*-2.*')) # find all 100x images
        for p in patients:
            img = cv2.imread(p)
            img = match_histograms(img, ref, multichannel=True)
            dataset.append(img)
            y_label.append(v)
    
    dataset = np.array(dataset)
    y_label = np.array(y_label)
    
    print("Class A samples in total: ", np.sum(y_label == 0))
    print("Class B samples in total: ", np.sum(y_label == 1))
    print("Class C samples in total: ", np.sum(y_label == 2))
    
    return dataset, y_label
    


In [None]:
dataset, y_label = prepare_data()

In [None]:
import imgaug as ia
import imgaug.augmenters as iaa

def augment(img, aug=30):
    augmented = []

    # original image
    augmented.append(img)
    
    # imgaug 
    seq = iaa.Sequential([
        iaa.Sometimes(
            0.4,
            iaa.Fliplr(0.5)
        ), # horizontal flips
        
        iaa.Sometimes(
            0.2,
            iaa.Crop(percent=(0, 0.1))
        ), # random crops
        
        # Small gaussian blur with random sigma between 0 and 0.5.
        # But we only blur about 50% of all images.
        iaa.Sometimes(
            0.4,
            iaa.GaussianBlur(sigma=(0, 0.5))
        ),
        # Strengthen or weaken the contrast in each image.
        iaa.Sometimes(
            0.2,
            iaa.LinearContrast((0.75, 1.5))
        ),
        iaa.Sometimes(
            0.2,
            iaa.ElasticTransformation(alpha=90, sigma=9)
        ),
        # Add gaussian noise.
        # For 50% of all images, we sample the noise once per pixel.
        # For the other 50% of all images, we sample the noise per pixel AND
        # channel. This can change the color (not only brightness) of the
        # pixels.
        iaa.Sometimes(
            0.4,
            iaa.AdditiveGaussianNoise(loc=0, scale=(0.0, 0.05*255), per_channel=0.5)
        ),
        # Make some images brighter and some darker.
        # In 20% of all cases, we sample the multiplier once per channel,
        # which can end up changing the color of the images.
        iaa.Sometimes(
            0.2,
            iaa.Multiply((0.8, 1.2), per_channel=0.2)
        ),
        # Apply affine transformations to each image.
        # Scale/zoom them, translate/move them, rotate them and shear them.
        iaa.Sometimes(
            0.4,
            iaa.Affine(
                scale={"x": (0.8, 1.2), "y": (0.8, 1.2)},
                translate_percent={"x": (-0.2, 0.2), "y": (-0.2, 0.2)},
                rotate=(-25, 25),
                shear=(-8, 8)
            )
        )
    ], random_order=True) # apply augmenters in random order
    
    for _ in range(aug):
        img_aug = seq(image=img)
        augmented.append(img_aug)
    
    return augmented

In [None]:
randomize = np.arange(len(dataset))
np.random.shuffle(randomize)
dataset = dataset[randomize]
y_label = y_label[randomize]

patches_train = []
patches_label = []
x_test = []
y_test = []
num_test = 7
num = {
    0: 0,
    1: 0,
    2: 0
}

from progressbar import ProgressBar
pb = ProgressBar()
for i in pb(range(len(dataset))):
    if num[y_label[i]] < num_test:
        x_test.append(dataset[i])
        y_test.append(y_label[i])
        num[y_label[i]] = num[y_label[i]] + 1
    else:
        aug = 2
        if y_label[i] == 0 or y_label[i] == 1:
            aug = 20
        stride = (512, 512)
        size = (512, 512)
        tiles, _ = get_tiles(dataset[i], stride=stride, size=size)
        for t in tiles:
            augmented = augment(t, aug=aug)
            for a in augmented:
                patches_train.append(cv2.resize(a, (256,256)))
                patches_label.append(y_label[i])

patches_train = np.array(patches_train)
patches_label = np.array(patches_label)
x_test = np.array(x_test)
y_test = np.array(y_test)

rd = np.arange(len(patches_train))
np.random.shuffle(rd)
patches_train = patches_train[rd]
patches_label = patches_label[rd]

In [None]:
print('number of patches: ', len(patches_train))
print("Class A samples in total: ", np.sum(patches_label == 0))
print("Class B samples in total: ", np.sum(patches_label == 1))
print("Class C samples in total: ", np.sum(patches_label == 2))

#### ResNet Model Transfer Learning

In [None]:
import tensorflow as tf

In [None]:
IMG_SHAPE = (256, 256, 3)
base_model = tf.keras.applications.ResNet50(input_shape=IMG_SHAPE,
                                               include_top=False,
                                               weights='imagenet')

In [None]:
base_model.trainable = True
print("Number of layers in the base model: ", len(base_model.layers))
fine_tune_at = 300
for layer in base_model.layers[:fine_tune_at]:
  layer.trainable =  False

In [None]:
model = tf.keras.Sequential([
    base_model,
    tf.keras.layers.GlobalAveragePooling2D(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(3, activation='sigmoid')
])

model.summary()

In [None]:
base_learning_rate = 0.001
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

In [None]:
history = model.fit(x=patches_train, y=patches_label, validation_split=0.3, 
                    shuffle=True, epochs=20, batch_size=16)

In [None]:
def predict(x,model):
    y_predict = []
    for img in x:
        tiles, _ = get_tiles(img, size=(512,512), stride=(512,512))
        tiles = [ cv2.resize(t, (256,256)) for t in tiles]
        vote = model.predict(np.array(tiles))
        vote = np.argmax(vote, axis=1)
        y_predict.append(np.argmax(np.bincount(np.array(vote))))
    
    return y_predict

In [None]:
y_pred = predict(x_test, model)

In [None]:
from sklearn.metrics import confusion_matrix

try:
    print((y_test == np.array(y_pred)).sum() / len(y_pred))
except:
    print('Woops')

print(y_test)
print(np.array(y_pred))

cm = confusion_matrix(y_test, y_pred)
print(cm)