# settings

In [None]:
import os, cv2
import numpy as np
import pandas as pd
import random, tqdm
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

import warnings
warnings.filterwarnings("ignore")

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import albumentations as album
from sklearn.model_selection import train_test_split

In [None]:
# !pip install -q -U segmentation-models-pytorch albumentations > /dev/null
import segmentation_models_pytorch as smp
from segmentation_models_pytorch import utils

In [None]:
train = os.listdir("../data/train")
test = os.listdir("../data/test")

In [None]:
class_names = ["Tumer", "Invasive Cancer", "else"]

class_values = [[0, 0, 0], [75, 75,75], [255, 255, 255]]

# utils

In [None]:
# Perform reverse one-hot-encoding on labels / preds
def reverse_one_hot(image):
    """
    Transform a 2D array in one-hot format (depth is num_classes),
    to a 2D array with only 1 channel, where each pixel value is
    the classified class key.
    # Arguments
        image: The one-hot format image 
        
    # Returns
        A 2D array with the same width and hieght as the input, but
        with a depth size of 1, where each pixel value is the classified 
        class key.
    """
    x = np.argmax(image, axis = -1)
    return x

# Perform colour coding on the reverse-one-hot outputs
def colour_code_segmentation(image, label_values):
    """
    Given a 1-channel array of class keys, colour code the segmentation results.
    # Arguments
        image: single channel array where each value represents the class key.
        label_values

    # Returns
        Colour coded image for segmentation visualization
    """
    colour_codes = np.array(label_values)
    x = colour_codes[image.astype(int)]

    return x

In [None]:
def get_validation_augmentation():   
    # Add sufficient padding to ensure image is divisible by 32
    test_transform = [
        album.PadIfNeeded(min_height=256, min_width=256, always_apply=True, border_mode=0),
    ]
    return album.Compose(test_transform)

def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')

def get_preprocessing(preprocessing_fn=None):
    """Construct preprocessing transform    
    Args:
        preprocessing_fn (callable): data normalization function 
            (can be specific for each pretrained neural network)
    Return:
        transform: albumentations.Compose
    """   
    _transform = []
    if preprocessing_fn:
        _transform.append(album.Lambda(image=preprocessing_fn))
    _transform.append(album.Lambda(image=to_tensor, mask=to_tensor))
        
    return album.Compose(_transform)

In [None]:
def concat_tile(im_list_2d):
    return cv2.vconcat([cv2.hconcat(im_list_h) for im_list_h in im_list_2d])


def cut_image(img, scale):
    # make image bag
    # 이미지가 너무 클 경우 gpu resource가 부족한 문제가 생김
    # overlap tile stragey를 사용
    img_bag = []
    size = img.shape
    h_num = size[1] // scale - 1
    w_num = size[2] // scale - 1
    for i in range(h_num):
        tmp = []
        for j in range(w_num):
            cropped_img = img[:, i*scale:(i+1)*scale+256, j*scale:(j+1)*scale+256]
            tmp.append(cropped_img)
        img_bag.append(tmp)


    return img_bag, h_num, w_num


def crop_image(image, target_image_dims=[256,256,3]):
   
    target_size = target_image_dims[0]
    image_size = len(image)
    padding = (image_size - target_size) // 2

    return image[
        padding:image_size - padding,
        padding:image_size - padding,
        :,
    ]

# Data Loader

In [None]:
class BuildingsDataset(torch.utils.data.Dataset):

    def __init__(
            self, 
            images_dir, 
            base_path,
            class_values=None, 
            augmentation=None, 
            preprocessing=None,
    ):
        
        self.image_paths = [os.path.join(base_path, image_id) for image_id in images_dir]
        self.image_id = images_dir
        self.class_values = class_values
        self.augmentation = augmentation
        self.preprocessing = preprocessing
    
    def __getitem__(self, i):
        
        # read images and masks
        image = cv2.cvtColor(cv2.imread(self.image_paths[i]), cv2.COLOR_BGR2RGB)
        size= image.shape
        h = 256 - size[0] % 256
        w = 256 - size[1] % 256
        image = np.pad(image, pad_width=[(128, h+128),(128, w+128),(0, 0)], mode="reflect")

        # apply augmentations
        if self.augmentation:
            sample = self.augmentation(image=image)
            image = sample['image']
        
        # apply preprocessing
        if self.preprocessing:
            sample = self.preprocessing(image=image)
            image = sample['image']
            
        return image, size, self.image_id[i]
        
    def __len__(self):
        # return length of 
        return len(self.image_paths)

# compute ratio with model 1

In [None]:
ENCODER = 'resnet50'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = class_names
ACTIVATION = 'softmax2d' # could be None for logits or 'softmax2d' for multiclass segmentation

# create segmentation model with pretrained encoder
model = smp.Unet(
    encoder_name=ENCODER, 
    encoder_weights=ENCODER_WEIGHTS, 
    classes=len(CLASSES), 
    activation=ACTIVATION,
)

preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

In [None]:
# Set device: `cuda` or `cpu`
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# load best saved model checkpoint from previous commit (if present)
if os.path.exists('./unet_best_model1.pth'):
    best_model = torch.load('./unet_best_model1.pth', map_location=DEVICE)
    print('Loaded UNet model from a previous commit.')

In [None]:
# create test dataloader (with preprocessing operation: to_tensor(...))
test_dataset = BuildingsDataset(
    test, 
    "../data/test", 
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_values=class_values,
)

test_dataloader = DataLoader(test_dataset)

# create test dataloader (with preprocessing operation: to_tensor(...))
train_dataset = BuildingsDataset(
    train, 
    "../data/train", 
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_values=class_values,
)

train_dataloader = DataLoader(train_dataset)


In [None]:
train_ratio1 = []

for idx in range(len(train_dataset)):
    print(idx)
    image, size, path = train_dataset[idx]
    image_bag, h_num, w_num = cut_image(image, 256)
    mask_bag = []
    for i in range(h_num):
        tmp = []
        for j in range(w_num):
            img = image_bag[i][j]
            x_tensor = torch.from_numpy(img).to(DEVICE).unsqueeze(0)
            # Predict test image
            pred_mask = best_model(x_tensor)
            pred_mask = pred_mask.detach().squeeze().cpu().numpy()
            pred_mask = pred_mask[:, 128:-128, 128:-128]
            # Convert pred_mask from `CHW` format to `HWC` format
            pred_mask = np.transpose(pred_mask,(1,2,0))
            # Get prediction channel corresponding to building
            pred_mask = crop_image(colour_code_segmentation(reverse_one_hot(pred_mask), class_values), target_image_dims = [size[1], size[2], size[0]])
            tmp.append(pred_mask)
        mask_bag.append(tmp)

    mask = concat_tile(mask_bag)
    mask = mask[:size[0], :size[1], :]
    
    cancer = np.sum(mask == 75)
    tumor = np.sum(mask == 0)
    train_ratio1.append([path, cancer/(cancer+tumor)])

In [None]:
train_ratio1 = pd.DataFrame(train_ratio1, columns=["path", "ratio"])
train_ratio1.to_csv("./train_ratio1.csv", index=False)

In [None]:
test_ratio1 = []
for idx in range(len(test_dataset)):
    print(idx)
    image, size, path = test_dataset[idx]
    image_bag, h_num, w_num = cut_image(image, 256)
    mask_bag = []
    for i in range(h_num):
        tmp = []
        for j in range(w_num):
            img = image_bag[i][j]
            x_tensor = torch.from_numpy(img).to(DEVICE).unsqueeze(0)
            # Predict test image
            pred_mask = best_model(x_tensor)
            pred_mask = pred_mask.detach().squeeze().cpu().numpy()
            pred_mask = pred_mask[:, 128:-128, 128:-128]
            # Convert pred_mask from `CHW` format to `HWC` format
            pred_mask = np.transpose(pred_mask,(1,2,0))
            # Get prediction channel corresponding to building
            pred_mask = crop_image(colour_code_segmentation(reverse_one_hot(pred_mask), class_values), target_image_dims = [size[1], size[2], size[0]])
            tmp.append(pred_mask)
        mask_bag.append(tmp)

    mask = concat_tile(mask_bag)
    mask = mask[:size[0], :size[1], :]

    cancer = np.sum(mask == 75)
    tumor = np.sum(mask == 0)
    test_ratio1.append([path, cancer/(cancer+tumor)])

In [None]:
test_ratio1 = pd.DataFrame(test_ratio1, columns=["path", "ratio"])
test_ratio1.to_csv("./test_ratio1.csv", index=False)

# compute ratio with model 2

In [None]:
ENCODER = 'resnet50'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = class_names
ACTIVATION = 'softmax2d' # could be None for logits or 'softmax2d' for multiclass segmentation

# create segmentation model with pretrained encoder
model = smp.Unet(
    encoder_name=ENCODER, 
    encoder_weights=ENCODER_WEIGHTS, 
    classes=len(CLASSES), 
    activation=ACTIVATION,
)

preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

In [None]:
# Set device: `cuda` or `cpu`
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# load best saved model checkpoint from previous commit (if present)
if os.path.exists('./unet_best_model2.pth'):
    best_model = torch.load('./unet_best_model2.pth', map_location=DEVICE)
    print('Loaded UNet model from a previous commit.')

In [None]:
# create test dataloader (with preprocessing operation: to_tensor(...))
test_dataset = BuildingsDataset(
    test, 
    "../data/test", 
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_values=class_values,
)

test_dataloader = DataLoader(test_dataset)

# create test dataloader (with preprocessing operation: to_tensor(...))
train_dataset = BuildingsDataset(
    train, 
    "../data/train", 
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_values=class_values,
)

train_dataloader = DataLoader(train_dataset)


In [None]:
train_ratio2 = []

for idx in range(len(train_dataset)):
    print(idx)
    image, size, path = train_dataset[idx]
    image_bag, h_num, w_num = cut_image(image, 256)
    mask_bag = []
    for i in range(h_num):
        tmp = []
        for j in range(w_num):
            img = image_bag[i][j]
            x_tensor = torch.from_numpy(img).to(DEVICE).unsqueeze(0)
            # Predict test image
            pred_mask = best_model(x_tensor)
            pred_mask = pred_mask.detach().squeeze().cpu().numpy()
            pred_mask = pred_mask[:, 128:-128, 128:-128]
            # Convert pred_mask from `CHW` format to `HWC` format
            pred_mask = np.transpose(pred_mask,(1,2,0))
            # Get prediction channel corresponding to building
            pred_mask = crop_image(colour_code_segmentation(reverse_one_hot(pred_mask), class_values), target_image_dims = [size[1], size[2], size[0]])
            tmp.append(pred_mask)
        mask_bag.append(tmp)

    mask = concat_tile(mask_bag)
    mask = mask[:size[0], :size[1], :]
    
    cancer = np.sum(mask == 75)
    tumor = np.sum(mask == 0)
    train_ratio2.append([path, cancer/(cancer+tumor)])

In [None]:
train_ratio2 = pd.DataFrame(train_ratio2, columns=["path", "ratio"])
train_ratio2.to_csv("./train_ratio2.csv", index=False)

In [None]:
test_ratio2 = []
for idx in range(len(test_dataset)):
    print(idx)
    image, size, path = test_dataset[idx]
    image_bag, h_num, w_num = cut_image(image, 256)
    mask_bag = []
    for i in range(h_num):
        tmp = []
        for j in range(w_num):
            img = image_bag[i][j]
            x_tensor = torch.from_numpy(img).to(DEVICE).unsqueeze(0)
            # Predict test image
            pred_mask = best_model(x_tensor)
            pred_mask = pred_mask.detach().squeeze().cpu().numpy()
            pred_mask = pred_mask[:, 128:-128, 128:-128]
            # Convert pred_mask from `CHW` format to `HWC` format
            pred_mask = np.transpose(pred_mask,(1,2,0))
            # Get prediction channel corresponding to building
            pred_mask = crop_image(colour_code_segmentation(reverse_one_hot(pred_mask), class_values), target_image_dims = [size[1], size[2], size[0]])
            tmp.append(pred_mask)
        mask_bag.append(tmp)

    mask = concat_tile(mask_bag)
    mask = mask[:size[0], :size[1], :]

    cancer = np.sum(mask == 75)
    tumor = np.sum(mask == 0)
    test_ratio2.append([path, cancer/(cancer+tumor)])

In [None]:
test_ratio2 = pd.DataFrame(test_ratio2, columns=["path", "ratio"])
test_ratio2.to_csv("./test_ratio2.csv", index=False)