# Import Library

In [1]:
import numpy as np
import math
import natsort
import cv2
import os
import matplotlib.pyplot as plt
from google.colab import drive
from glob import glob

from PIL import Image

import tensorflow as tf
from tensorflow import keras

import tensorflow_datasets as tfds

from tensorflow.keras.utils import *
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *

In [2]:
drive.mount('/content/drive')

Mounted at /content/drive


# Data Loder

In [3]:
def sortlist(filelist):
    filelist = natsort.natsorted(filelist)
    return filelist

In [4]:
dir_path = '/content/drive/MyDrive/Cloud_data/cloud_train'
input_images = glob(os.path.join(dir_path, "patch_img", "*png"))
input_images = sortlist(input_images)

In [23]:
class CloudGenerator(Sequence):
    def __init__(self,
                 dir_path,
                 batch_size = 16,
                 img_size = (1024,1024,3),
                 output_size = (1024,1024),
                 is_train = True):
        
        self.dir_path = dir_path
        self.batch_size = batch_size
        self.img_size = img_size
        self.output_size = output_size
        self.is_train = is_train

        self.data = self.load_dataset()
    
    def load_dataset(self):
        input_images = glob(os.path.join(self.dir_path,"patch_img","*png"))
        label_images = glob(os.path.join(self.dir_path,"patch_label","*png"))
        input_images = sortlist(input_images)
        label_images = sortlist(label_images)
        
        assert len(input_images) == len(label_images)
        data = [ _ for _ in zip(input_images, label_images)]
        
        train_percent = int(len(data) * 0.8)

        if self.is_train:
            return data[:train_percent]
        return data[train_percent:]

    def __getitem__(self, index):
        batch_data = self.data[
                           index*self.batch_size:
                           (index + 1)*self.batch_size
                           ]
        inputs = np.zeros([self.batch_size, *self.img_size])
        outputs = np.zeros([self.batch_size, *self.output_size])
            
        for i, data in enumerate(batch_data):
            input_img_path, output_path = data
            _input = cv2.imread(input_img_path)
            _output = cv2.imread(output_path)
            _output = (_output==50).astype(np.uint8)*1
            #data = {
            #    "image": _input,
            #    "mask": _output,
            #    }
            #augmented = self.augmentation(**data)
            #inputs[i] = augmented["image"]/255
            #outputs[i] = augmented["mask"]
            _input = _input/255
            
            return _input, _output

    def __len__(self):
        return math.ceil(len(self.data) / self.batch_size)
    
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.data))
        if self.is_train == True :
            np.random.shuffle(self.indexes)
            return self.indexes

In [24]:
train_generator = CloudGenerator(
    dir_path,
    is_train=True
)

test_generator = CloudGenerator(
    dir_path,
    is_train=False
)

# Build U-net Model

In [32]:
def build_unet_model(input_shape=(1024, 1024, 3)):
    inputs = Input(input_shape)

    #Contracting Path
    conv1 = Conv2D(64, 3, activation='relu', padding='same',kernel_initializer='he_normal')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same',kernel_initializer='he_normal')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same',kernel_initializer='he_normal')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same',kernel_initializer='he_normal')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv4)
    drop4 = Dropout(0.5)(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = Conv2D(1024, 3, activation='relu', padding='same',kernel_initializer='he_normal')(pool4)  
    conv5 = Conv2D(1024, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv5)

    # #Expanding Path
    drop5 = Dropout(0.5)(conv5)
    up6 = Conv2DTranspose(512, 2, activation='relu', strides=(2,2), kernel_initializer='he_normal')(drop5)
    merge6 = concatenate([drop4,up6], axis = 3)
    conv6 = Conv2D(512, 3, activation='relu', padding='same',kernel_initializer='he_normal')(merge6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv6)
    up7 = Conv2DTranspose(256, 2, activation='relu', strides=(2,2), kernel_initializer='he_normal')(conv6)
    merge7 = concatenate([conv3,up7], axis = 3)
    conv7 = Conv2D(256, 3, activation='relu', padding='same',kernel_initializer='he_normal')(merge7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv7)
    up8 = Conv2DTranspose(128, 2, activation='relu', strides=(2,2), kernel_initializer='he_normal')(conv7)
    merge8 = concatenate([conv2,up8], axis = 3)
    conv8 = Conv2D(128, 3, activation='relu', padding='same',kernel_initializer='he_normal')(merge8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv8)
    up9 = Conv2DTranspose(64, 2, activation='relu', strides=(2,2), kernel_initializer='he_normal')(conv8)
    merge9 = concatenate([conv1,up9], axis = 3)
    conv9 = Conv2D(64, 3, activation='relu', padding='same',kernel_initializer='he_normal')(merge9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv9)  
    conv9 = Conv2D(2, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv9)     
    conv10 = Conv2D(1, 1, activation='sigmoid')(conv9)

    model = Model(inputs = inputs, outputs = conv10)
    return model

In [33]:
unet_model = build_unet_model()
unet_model.summary()

Model: "model_9"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_12 (InputLayer)          [(None, 1000, 1000,  0           []                               
                                 3)]                                                              
                                                                                                  
 conv2d_172 (Conv2D)            (None, 1000, 1000,   1792        ['input_12[0][0]']               
                                64)                                                               
                                                                                                  
 conv2d_173 (Conv2D)            (None, 1000, 1000,   36928       ['conv2d_172[0][0]']             
                                64)                                                         

In [31]:
print(len(train_generator))

163


In [34]:
unet_model_path = '/content/drive/MyDrive/Cloud_data/cloud_model/seg_unet_model.h5'

unet_model = build_unet_model()
unet_model.compile(optimizer = Adam(lr = 1e-4), loss = 'binary_crossentropy')
unet_model.fit_generator(
     generator=train_generator,
     validation_data=test_generator,
     steps_per_epoch=len(train_generator),
     epochs=100,
 )

unet_model.save(unet_model_path)  #학습한 모델을 저장해 주세요.

  super(Adam, self).__init__(name, **kwargs)
  if __name__ == '__main__':


Epoch 1/100


ValueError: ignored

# Load U-netModel

In [None]:
Unet_model = tf.keras.models.load_model(unet_model_path)

In [None]:
def calculate_iou_score(target, prediction):
    intersection = np.logical_and(target, prediction)
    union = np.logical_or(target, prediction)
    iou_score = float(np.sum(intersection)) / float(np.sum(union))
    print('IoU : %f' % iou_score )
    return iou_score

In [None]:
def get_output(model, image_path, output_path, label_path):
    origin_img = cv2.imread(image_path)
    output = model(np.expand_dims(origin_img/255, axis=0))
    output = (output[0].numpy()>=0.5).astype(np.uint8).squeeze(-1)*255  #0.5라는 threshold를 변경하면 도로인식 결과범위가 달라집니다.
    prediction = output/255   # 도로로 판단한 영역
    
    output = Image.fromarray(output)
    background = Image.fromarray(origin_img).convert('RGBA')
    output = output.resize((origin_img.shape[1], origin_img.shape[0])).convert('RGBA')
    output = Image.blend(background, output, alpha=0.5)
    output.show()   # 도로로 판단한 영역을 시각화!
     
    if label_path:   
        label_img = cv2.imread(label_path)
        target = (label_img == 50).astype(np.uint8)*1   # 라벨에서 도로로 기재된 영역

        return output, prediction, target
    else:
        return output, prediction, _

In [None]:
 #dir_path = '/content/drive/MyDrive/Cloud_data/cloud_train'
 
 # 완성한 뒤에는 시각화한 결과를 눈으로 확인해봅시다!
i = 1    # i값을 바꾸면 테스트용 파일이 달라집니다. 
output, prediction, target = get_output(
     Unet_model, 
     image_path=dir_path + f'/image_2/00{str(i).zfill(4)}_10.png',
     output_path=dir_path + f'/result_{str(i).zfill(3)}.png',
     label_path=dir_path + f'/semantic/00{str(i).zfill(4)}_10.png'
 )

calculate_iou_score(target, prediction)