In [1]:
from natsort import natsorted
import os
import re
from glob import glob
import json
import numpy as np
import matplotlib.pyplot as plt
import seaborn
import networkx as nx


import imageio
import cv2
import skimage
from skimage import img_as_float32, img_as_ubyte, img_as_uint
from skimage.feature import canny
from skimage.color import rgb2gray, rgb2hsv, gray2rgb, rgba2rgb
from sklearn.model_selection import train_test_split
from functools import partial
from tqdm.notebook import tqdm

import tensorflow as tf
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import VGG16
from tensorflow.keras import metrics

# caching with sane defaults
from cachier import cachier

In [2]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

In [3]:
tqdm = partial(tqdm, position=0, leave=True)


cachier = partial(cachier, pickle_reload=False, cache_dir="data/cache")

### Read and Scale data

In [4]:
############################## Stuff for loading and rescaling the puzzle pieces nicely ################################
SIZE = (768, 1024)

DATA_PATH_PAIRS = list(
    zip(
        natsorted(
            glob(
                f"puzzle_corners_{SIZE[1]}x{SIZE[0]}/images-{SIZE[1]}x{SIZE[0]}/*.png"
            )
        ),
        natsorted(
            glob(
                f"puzzle_corners_{SIZE[1]}x{SIZE[0]}/masks-{SIZE[1]}x{SIZE[0]}/*.png"
            )
        ),
    )
)
DATA_IMGS = np.array(
    [
        img_as_float32(imageio.imread(img_path))
        for img_path, _ in tqdm(DATA_PATH_PAIRS, "Loading Images")
    ]
)
DATA_MSKS = np.array(
    [
        img_as_float32(imageio.imread(msk_path))
        for _, msk_path in tqdm(DATA_PATH_PAIRS, "Loading Masks")
    ]
)

SCALE = 0.25
MATCH_IMGS = np.array([
    cv2.resize(img, None, fx=SCALE, fy=SCALE)
    for img in tqdm(DATA_IMGS, "Resizing Images")
])
MATCH_MSKS = np.array([
    np.expand_dims(cv2.resize(img, None, fx=SCALE, fy=SCALE), axis=2)
    for img in tqdm(DATA_MSKS, "Resizing Masks")
])

Loading Images:   0%|          | 0/48 [00:00<?, ?it/s]

Loading Masks:   0%|          | 0/48 [00:00<?, ?it/s]

Resizing Images:   0%|          | 0/48 [00:00<?, ?it/s]

Resizing Masks:   0%|          | 0/48 [00:00<?, ?it/s]

In [5]:
def plot_img(img):
    plt.imshow(img, cmap='gray')
    plt.show

# Data augmentations

In [6]:
def train_val_test_split(X, y, train_count=34, val_count=7, test_count=7, random_state=42):
    """ Split data into train, validation and test sets"""
    # Obtain percentage split values
    total_data_size = X.shape[0]
    train_size = train_count / total_data_size
    val_size = val_count / total_data_size
    test_size = val_count / total_data_size
   
    X_train, X_rem, y_train, y_rem = train_test_split(X, y, train_size=train_size, random_state=random_state)
    val_size = val_size/(val_size+test_size)
    X_val, X_test, y_val, y_test = train_test_split(X_rem, y_rem, train_size=val_size, random_state=random_state)
    
    print(f"Training images: {X_train.shape[0]}")
    print(f"Validation images: {X_val.shape[0]}")
    print(f"Test images: {X_test.shape[0]}")
    
    return X_train, X_val, X_test, y_train, y_val, y_test

In [7]:
X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split(MATCH_IMGS, MATCH_MSKS)

Training images: 34
Validation images: 7
Test images: 7


In [8]:
def apply_image_aug(aug_func, X_train, X_val, y_train, y_val):
    
    X_train = np.append(X_train, aug_func(X_train[:34]).numpy(), 0)
    y_train = np.append(y_train, aug_func(y_train[:34]).numpy(), 0)
    
    X_val = np.append(X_val, aug_func(X_val[:7]).numpy(), 0)
    y_val = np.append(y_val, aug_func(y_val[:7]).numpy(), 0)
    
    return X_train, X_val, y_train, y_val
    

In [9]:
def add_data_augmentations(X_train, X_val, y_train, y_val):
    # Spatial augmentations
    
    ## Flipping
    
    X_train, X_val, y_train, y_val = apply_image_aug(tf.image.flip_left_right, X_train, X_val, y_train, y_val)
    X_train, X_val, y_train, y_val = apply_image_aug(tf.image.flip_up_down, X_train, X_val, y_train, y_val)    
    
    ## Rotation
    
    # Pixel augmentations
    
    ## Brightness
    
    ## Contrast
    
    ## Saturation
    
    ## Hue
    
    
    return X_train, X_val, y_train, y_val

In [10]:
print(f"Training images: {X_train.shape[0]}")
print(f"Validation images: {X_val.shape[0]}")
print(f"Test images: {X_test.shape[0]}")


Training images: 34
Validation images: 7
Test images: 7


In [11]:
X_train, X_val, y_train, y_val = add_data_augmentations(X_train, X_val, y_train, y_val)

In [12]:
print(f"Training images: {X_train.shape[0]}")
print(f"Validation images: {X_val.shape[0]}")
print(f"Test images: {X_test.shape[0]}")

Training images: 102
Validation images: 21
Test images: 7


In [13]:
def plot_all_data(X, y):
    assert X.shape[0] == y.shape[0]
    print(X.shape[0])
    for i in range(X.shape[0]):
        fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(10, 10))
        ax = axes.flatten()
        ax[0].imshow(X[i])
        ax[0].axis("off")
        ax[0].title.set_text(f'{i%(X.shape[0]/3)}')

        ax[1].imshow(y[i], cmap="gray")
        ax[1].axis("off")
        ax[1].title.set_text(f'{i%(X.shape[0]/3)}')
        plt.tight_layout()
        plt.show()

In [14]:
# plot_all_data(X_val, y_val)

## Evaluation metrics

In [28]:
# Accuracy
from sklearn.metrics import accuracy_score
def pixel_accuracy(y_true, y_pred):
    return accuracy_score(y_true, y_pred)


# Dice

# IoU

# map

In [15]:
def conv_block(input, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(input)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

In [16]:
def decoder_block(input, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

In [17]:
def build_vgg16_unet(input_shape):
    """ Input """
    inputs = Input(input_shape)

    """ Pre-trained VGG16 Model """
    vgg16 = VGG16(include_top=False, weights="imagenet", input_tensor=inputs)

    """ Encoder """
    s1 = vgg16.get_layer("block1_conv2").output         ## (512 x 512)
    s2 = vgg16.get_layer("block2_conv2").output         ## (256 x 256)
    s3 = vgg16.get_layer("block3_conv3").output         ## (128 x 128)
    s4 = vgg16.get_layer("block4_conv3").output         ## (64 x 64)

    """ Bridge """
    b1 = vgg16.get_layer("block5_conv3").output         ## (32 x 32)

    """ Decoder """
    d1 = decoder_block(b1, s4, 512)                     ## (64 x 64)
    d2 = decoder_block(d1, s3, 256)                     ## (128 x 128)
    d3 = decoder_block(d2, s2, 128)                     ## (256 x 256)
    d4 = decoder_block(d3, s1, 64)                      ## (512 x 512)

    """ Output """
    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)

    model = Model(inputs, outputs, name="VGG16_U-Net")
    return model


In [18]:
input_shape = X_train[0].shape
model = build_vgg16_unet(input_shape)

In [22]:
model.compile(optimizer=Adam(learning_rate=1e-3), loss='binary_crossentropy', metrics=['accuracy', tf.keras.metrics.MeanIoU(num_classes=2)])

In [23]:
y_train = np.squeeze(y_train)
y_test = np.squeeze(y_test)

In [24]:
history = model.fit(X_train, y_train, 
                    batch_size=2, 
                    verbose=1, 
                    epochs=100, 
                    validation_data=(X_test, y_test), 
                    shuffle=False)

Epoch 1/100
Epoch 2/100
Epoch 3/100

KeyboardInterrupt: 

In [15]:
from keras.models import load_model
model = load_model("test.hdf5", compile=False)

In [24]:
def predict_batch(model, X):
    y = model.predict(X)
    # y = (y[:,:,:,0] > 0.5).astype(np.uint8)
    return y

In [25]:
y_test_pred = predict_batch(model, X_test)

In [26]:
y_test.shape

(7, 192, 256, 1)

In [27]:
def dice_coe(y_true,y_pred, loss_type='jaccard', smooth=1.):

    y_true_f = tf.reshape(y_true,[-1])
    y_pred_f = tf.reshape(y_pred,[-1])

    intersection = tf.reduce_sum(y_true_f * y_pred_f)

    if loss_type == 'jaccard':
        union = tf.reduce_sum(tf.square(y_pred_f)) + tf.reduce_sum(tf.square(y_true_f))

    elif loss_type == 'sorensen':
        union = tf.reduce_sum(y_pred_f) + tf.reduce_sum(y_true_f)

    else:
        raise ValueError("Unknown `loss_type`: %s" % loss_type)

    return (2. * intersection + smooth) / (union + smooth)

In [43]:
dice(y_test, y_test_pred)

(7, 192, 256, 1)


0.4116958686384441

In [44]:
iou_scores = []
for i in range(len(y_test)):
    iou_scores.append(dice(y_test[i], y_test_pred[i]))

(192, 256, 1)
(192, 256, 1)
(192, 256, 1)
(192, 256, 1)
(192, 256, 1)
(192, 256, 1)
(192, 256, 1)


In [47]:
np.array(iou_scores).mean()

0.4111153193676747

In [39]:
def dice(im1, im2):
    im1 = np.asarray(im1).astype(bool)
    im2 = np.asarray(im2).astype(bool)
    
    # print(im1.shape)
    
    im_sum = im1.sum() + im2.sum()

    # Compute Dice coefficient
    intersection = np.logical_and(im1, im2)

    return 2. * intersection.sum() / im_sum

In [None]:
def pixel_accuracy(im1, im2):
    return accuracy_score()


In [51]:
from sklearn.metrics import accuracy_score
def pixel_accuracy(y_true, y_pred):
    y_true = np.asarray(y_true).astype(bool)
    y_pred = np.asarray(y_pred).astype(bool)
    return accuracy_score(y_true, y_pred)


In [52]:
pixel_accuracy(y_test[0], y_test_pred[0])

ValueError: unknown is not supported

In [59]:
def iou(y_true, y_pred):
    y_true = np.asarray(y_true).astype(bool)
    y_pred = np.asarray(y_pred).astype(bool)
    intersection = np.logical_and(y_true, y_pred)
    union = np.logical_or(y_true, y_pred)
    iou_score = np.sum(intersection) / np.sum(union)
    return iou_score

In [58]:
iou(y_test, y_test_pred)

0.2592046828497024

In [60]:
iou(y_test, y_test_pred)

0.2592046828497024

In [64]:
def accuracy(y_true, y_pred):
    y_true = np.asarray(y_true).astype(bool)
    y_pred = np.asarray(y_pred).astype(bool)
    # intersection = np.logical_and(y_true, y_pred)
    # union = np.logical_or(y_true, y_pred)
    # iou_score = np.sum(intersection) / np.sum(union)
    accuracy_score(y_true, y_pred)
    return iou_score

In [65]:
accuracy(y_test, y_test_pred)

ValueError: unknown is not supported