<a href="https://colab.research.google.com/github/autoceres/VisionCERES/blob/master/DeeplabV3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
import os
import numpy as np
import cv2
from glob import glob
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from albumentations import HorizontalFlip, ChannelShuffle, CoarseDropout, Rotate, Equalize, RandomShadow, CenterCrop, RandomBrightnessContrast
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.layers import AveragePooling2D, GlobalAveragePooling2D, UpSampling2D, Reshape, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50, ResNet101
import tensorflow as tf
from sklearn.utils import shuffle
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision
from tensorflow.keras import backend as K

In [None]:
""" Diretório """
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def load_data(path, split = 0.15):
    """ Carregamento das imagens e máscaras """
    X = sorted(glob(os.path.join(path, "/content/gdrive/MyDrive/TCC/dataset_tcc/images_final", "*.png")))
    Y = sorted(glob(os.path.join(path, "/content/gdrive/MyDrive/TCC/dataset_tcc/masks_final", "*.png")))

    split_size = int(len(X) * split)

    train_x, test_x = train_test_split(X, test_size = split_size, random_state = 42)
    train_y, test_y = train_test_split(Y, test_size = split_size, random_state = 42)

    return (train_x, train_y), (test_x, test_y)

def augment_data(images, masks, save_path, augment = True):
    H = 512
    W = 512

    for x, y in tqdm(zip(images, masks), total = len(images)):
        name = x.split("/")[-1].split(".")[0]

        x = cv2.imread(x, cv2.IMREAD_COLOR)
        y = cv2.imread(y, cv2.IMREAD_COLOR)

        """ Data Augmentation """
        if augment == True:
            aug = HorizontalFlip(p = 1.0) #Flip horizontal
            augmented = aug(image = x, mask = y)
            x1 = augmented["image"]
            y1 = augmented["mask"]

            x2 = cv2.cvtColor(x, cv2.COLOR_RGB2GRAY)
            y2 = y

            """
            aug = ChannelShuffle(p = 1) #Reorganiza aleatoriamente os canais da imagem RGB de entrada.
            augmented = aug(image = x, mask = y)
            x3 = augmented['image']
            y3 = augmented['mask']
            """

            aug = CoarseDropout(p = 1, min_holes = 3, max_holes = 10, max_height = 32, max_width = 32) #Dropa uma caixa de pixels 32x32 da imagem, pode simular possíveis falhas na linha de plantação
            augmented = aug(image = x, mask = y)
            x4 = augmented['image']
            y4 = augmented['mask']
            
            """
            aug = Rotate(limit = 45, p = 1.0) #Rotaciona a imagem de entrada
            augmented = aug(image = x, mask = y)
            x5 = augmented["image"]
            y5 = augmented["mask"]
            """

            aug = Equalize(p = 1.0) #Equaliza a imagem de entrada
            augmented = aug(image = x, mask = y)
            x5 = augmented["image"]
            y5 = augmented["mask"]

            """
            aug = RandomShadow(shadow_roi = (0, 0.5, 1, 1), num_shadows_lower = 1, num_shadows_upper = 1, shadow_dimension = 4, always_apply = False, p=1) #Cria sombras nas imagens
            augmented = aug(image = x, mask = y)
            x6 = augmented["image"]
            y6 = augmented["mask"]
            """

            aug = RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, brightness_by_max=True, always_apply=False, p=1.5) #Brilho aleatório
            augmented = aug(image = x, mask = y)
            x6 = augmented["image"]
            y6 = augmented["mask"]

            X = [x, x1, x2, x4, x5, x6]
            Y = [y, y1, y2, y4, y5, y6]

        else:
            X = [x]
            Y = [y]

        index = 0
        for i, m in zip(X, Y):
            try:
                aug = CenterCrop(H, W, p = 1.0)
                augmented = aug(image = i, mask = m)
                i = augmented["image"]
                m = augmented["mask"]

            except Exception as e:
                i = cv2.resize(i, (W, H))
                m = cv2.resize(m, (W, H))

            tmp_image_name = f"{name}_{index}.png"
            tmp_mask_name = f"{name}_{index}.png"

            image_path = os.path.join(save_path, "image", tmp_image_name)
            mask_path = os.path.join(save_path, "mask", tmp_mask_name)

            cv2.imwrite(image_path, i)
            cv2.imwrite(mask_path, m)

            index += 1


if __name__ == "__main__":
    np.random.seed(42)

    """ Carregamento do Dataset """
    data_path = "dataset"
    (train_x, train_y), (test_x, test_y) = load_data(data_path)

    print(f"Train:\t {len(train_x)} - {len(train_y)}")
    print(f"Test:\t {len(test_x)} - {len(test_y)}")

    """ Criando os diretórios para o Data Augmentation """
    create_dir("new_data/train/image/")
    create_dir("new_data/train/mask/")
    create_dir("new_data/test/image/")
    create_dir("new_data/test/mask/")

    augment_data(train_x, train_y, "new_data/train/", augment = True)
    augment_data(test_x, test_y, "new_data/test/", augment = False)
    

In [None]:
def SqueezeAndExcite(inputs, ratio = 8):
    init = inputs
    filters = init.shape[-1]
    se_shape = (1, 1, filters)

    se = GlobalAveragePooling2D()(init)
    se = Reshape(se_shape)(se)
    se = Dense(filters // ratio, activation = 'relu', kernel_initializer = 'he_normal', use_bias = False)(se)
    se = Dense(filters, activation = 'sigmoid', kernel_initializer = 'he_normal', use_bias = False)(se)
    x = init * se
    return x

def ASPP(inputs):
    """ Image Pooling """
    shape = inputs.shape
    y1 = AveragePooling2D(pool_size=(shape[1], shape[2]))(inputs)
    y1 = Conv2D(256, 1, padding = "same", use_bias = False)(y1)
    y1 = BatchNormalization()(y1)
    y1 = Activation("relu")(y1)
    y1 = UpSampling2D((shape[1], shape[2]), interpolation = "bilinear")(y1)

    """ 1x1 conv """
    y2 = Conv2D(256, 1, padding = "same", use_bias = False)(inputs)
    y2 = BatchNormalization()(y2)
    y2 = Activation("relu")(y2)

    """ 3x3 conv rate = 6 """
    y3 = Conv2D(256, 3, padding = "same", use_bias = False, dilation_rate = 6)(inputs)
    y3 = BatchNormalization()(y3)
    y3 = Activation("relu")(y3)

    """ 3x3 conv rate = 12 """
    y4 = Conv2D(256, 3, padding = "same", use_bias = False, dilation_rate = 12)(inputs)
    y4 = BatchNormalization()(y4)
    y4 = Activation("relu")(y4)

    """ 3x3 conv rate = 18 """
    y5 = Conv2D(256, 3, padding = "same", use_bias = False, dilation_rate = 18)(inputs)
    y5 = BatchNormalization()(y5)
    y5 = Activation("relu")(y5)

    y = Concatenate()([y1, y2, y3, y4, y5])
    y = Conv2D(256, 1, padding = "same", use_bias = False)(y)
    y = BatchNormalization()(y)
    y = Activation("relu")(y)

    return y

def deeplabv3_plus(shape):
    """ Input """
    inputs = Input(shape)

    """ Encoder """
    #encoder = ResNet50(weights = "imagenet", include_top = False, input_tensor = inputs)
    encoder = ResNet101(weights = "imagenet", include_top = False, input_tensor = inputs)

    image_features = encoder.get_layer("conv4_block6_out").output
    x_a = ASPP(image_features)
    x_a = UpSampling2D((4, 4), interpolation = "bilinear")(x_a)

    x_b = encoder.get_layer("conv2_block2_out").output
    x_b = Conv2D(filters = 48, kernel_size = 1, padding = 'same', use_bias = False)(x_b)
    x_b = BatchNormalization()(x_b)
    x_b = Activation('relu')(x_b)

    x = Concatenate()([x_a, x_b])
    x = SqueezeAndExcite(x)

    x = Conv2D(filters = 256, kernel_size = 3, padding = 'same', use_bias = False)(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(filters = 256, kernel_size = 3, padding = 'same', use_bias = False)(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = SqueezeAndExcite(x)

    x = UpSampling2D((4, 4), interpolation = "bilinear")(x)
    x = Conv2D(1, 1)(x)
    x = Activation("sigmoid")(x)

    model = Model(inputs, x)
    return model

if __name__ == "__main__":
    model = deeplabv3_plus((512, 512, 3))

def iou(y_true, y_pred):
    def f(y_true, y_pred):
        intersection = (y_true * y_pred).sum()
        union = y_true.sum() + y_pred.sum() - intersection
        x = (intersection + 1e-15) / (union + 1e-15)
        x = x.astype(np.float32)
        return x
    return tf.numpy_function(f, [y_true, y_pred], tf.float32)

smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
H = 512
W = 512

def shuffling(x, y):
    x, y = shuffle(x, y, random_state = 42)
    return x, y

def load_data(path):
    x = sorted(glob(os.path.join(path, "image", "*png")))
    y = sorted(glob(os.path.join(path, "mask", "*png")))
    return x, y

def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = x/255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis = -1)
    return x

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y

def tf_dataset(X, Y, batch = 2):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(10)
    return dataset


if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)

    create_dir("files")

    """ Hiperparâmetros """
    batch_size = 8
    lr = 1e-4
    num_epochs = 20
    model_path = os.path.join("files", "model.h5")
    csv_path = os.path.join("files", "data.csv")

    """ Dataset """
    dataset_path = "new_data"
    train_path = os.path.join(dataset_path, "train")
    valid_path = os.path.join(dataset_path, "test")

    train_x, train_y = load_data(train_path)
    train_x, train_y = shuffling(train_x, train_y)
    valid_x, valid_y = load_data(valid_path)

    print(f"Train: {len(train_x)} - {len(train_y)}")
    print(f"Valid: {len(valid_x)} - {len(valid_y)}")

    train_dataset = tf_dataset(train_x, train_y, batch = batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch = batch_size)

    """ Model """
    model = deeplabv3_plus((H, W, 3))
    model.compile(loss = dice_loss, optimizer = Adam(lr), metrics = [dice_coef, iou, Recall(), Precision()])

    callbacks = [
        ModelCheckpoint(model_path, verbose = 1, save_best_only = True),
        ReduceLROnPlateau(monitor = 'val_loss', factor = 0.1, patience = 5, min_lr = 1e-7, verbose = 1),
        CSVLogger(csv_path),
        TensorBoard(),
        EarlyStopping(monitor = 'val_loss', patience = 20, restore_best_weights = False),
    ]

    model.fit(
        train_dataset,
        epochs = num_epochs,
        validation_data = valid_dataset,
        callbacks = callbacks
    )


In [None]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
import cv2
import pandas as pd
from glob import glob
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score

H = 512
W = 512

def iou(y_true, y_pred):
    def f(y_true, y_pred):
        intersection = (y_true * y_pred).sum()
        union = y_true.sum() + y_pred.sum() - intersection
        x = (intersection + 1e-15) / (union + 1e-15)
        x = x.astype(np.float32)
        return x
    return tf.numpy_function(f, [y_true, y_pred], tf.float32)

smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)


def save_results(image, mask, y_pred, save_image_path):
    ## i - m - yp - yp*i
    line = np.ones((H, 10, 3)) * 128

    mask = np.expand_dims(mask, axis = -1)  
    mask = np.concatenate([mask, mask, mask], axis = -1) 
    mask = mask * 255

    y_pred = np.expand_dims(y_pred, axis = -1) 
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis = -1) 

    masked_image = image * y_pred
    y_pred = y_pred * 255

    cat_images = np.concatenate([image, line, mask, line, y_pred, line, masked_image], axis = 1)
    cv2.imwrite(save_image_path, cat_images)


def save_results_pred(y_pred, save_image_path):
    y_pred = np.expand_dims(y_pred, axis = -1) 
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis = -1) 
    y_pred = y_pred * 255

    cv2.imwrite(save_image_path, y_pred)

def save_results_masked(image, y_pred, save_image_path):
    y_pred = np.expand_dims(y_pred, axis = -1) 
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis = -1) 
    masked_image = image * y_pred

    cv2.imwrite(save_image_path, masked_image)
    #cv2.imwrite(save_image_path, image)

if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)

    create_dir("results_pred")
    create_dir("results_masked")
    create_dir("results")

    """ Carregando o modelo """
    with CustomObjectScope({'iou': iou, 'dice_coef': dice_coef, 'dice_loss': dice_loss}):
        model = tf.keras.models.load_model("files/model.h5")

    """ Carregando o Dataset """
    dataset_path = "new_data"
    valid_path = os.path.join(dataset_path, "test")
    test_x, test_y = load_data(valid_path)
    print(f"Test: {len(test_x)} - {len(test_y)}")

    """ Avaliação e predição """
    SCORE = []
    for x, y in tqdm(zip(test_x, test_y), total = len(test_x)):

        name = x.split("/")[-1].split(".")[0]

        image = cv2.imread(x, cv2.IMREAD_COLOR)
        x = image/255.0
        x = np.expand_dims(x, axis = 0)

        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
        #mask = cv2.imread(y)

        #Predição
        y_pred = model.predict(x)[0]
        y_pred = np.squeeze(y_pred, axis = -1)
        y_pred = y_pred > 0.5
        y_pred = y_pred.astype(np.int32)

        #Salvando a predição 
        save_image_path = f"results/{name}.png"
        save_results(image, mask, y_pred, save_image_path)
        save_image_path_pred = f"results_pred/{name}.png"
        save_image_path_msk = f"results_masked/{name}.png"
        save_results_pred(y_pred, save_image_path_pred)
        save_results_masked(image, y_pred, save_image_path_msk)

        mask = mask.flatten()
        y_pred = y_pred.flatten()

        #Calcula os valores das métricas
        acc_value = accuracy_score(mask, y_pred)
        f1_value = f1_score(mask, y_pred, labels=[0, 1], average = "micro")
        jac_value = jaccard_score(mask, y_pred, labels=[0, 1], average = "micro")
        recall_value = recall_score(mask, y_pred, labels=[0, 1], average = "micro")
        precision_value = precision_score(mask, y_pred, labels=[0, 1], average = "micro")
        SCORE.append([name, acc_value, f1_value, jac_value, recall_value, precision_value])

    #Valores das métricas
    score = [s[1:]for s in SCORE]
    score = np.mean(score, axis = 0)
    print(f"Accuracy: {score[0]:0.5f}")
    print(f"F1: {score[1]:0.5f}")
    print(f"Jaccard: {score[2]:0.5f}")
    print(f"Recall: {score[3]:0.5f}")
    print(f"Precision: {score[4]:0.5f}")

    df = pd.DataFrame(SCORE, columns = ["Image", "Accuracy", "F1", "Jaccard", "Recall", "Precision"])
    df.to_csv("files/score.csv")

In [None]:
!zip -r /content/results_pred.zip /content/results_pred

In [None]:
from google.colab import files
files.download('/content/files.zip')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>