In [164]:
import numpy as np
import cv2
import matplotlib.pyplot as plt

from pathlib import Path
from typing import List, Tuple
from tensorflow import Tensor, random, reduce_sum, constant, numpy_function, float32, data, Variable, transpose
from tensorflow.keras.layers import Dense, Multiply, Conv2D, GlobalAveragePooling2D, BatchNormalization, UpSampling2D, Input, MaxPooling2D, AveragePooling2D, Flatten, Layer, Conv1D, LayerNormalization, DepthwiseConv2D
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.backend import relu, sigmoid, concatenate, reshape
from tensorflow.keras.applications import VGG19
from tensorflow.keras.optimizers import Nadam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.callbacks import History
from sklearn.utils import shuffle
from tensorflow.keras.metrics import Recall, Precision
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, CSVLogger
from math import log

In [165]:
class Select_Layer(Layer):
    def __init__(self):
        super(Select_Layer, self).__init__()
        self.weight = Variable(random.uniform([1], 0, 1), trainable=True, name="se-W")

    def call(self, inputs, *args, **kwargs):
        return inputs[0] * self.weight + (1. - self.weight) * inputs[1]

def channel_shuffle(input_tensor: Tensor, group_num: int = 2) -> Tensor:
    """
    参考：https://blog.csdn.net/baidu_23388287/article/details/94456951
         https://blog.csdn.net/qq_36758914/article/details/106967780

    :param input_tensor: 输入特征图数据
    :param group_num: 分组
    :return:
    """
    _, high, width, channel = input_tensor.get_shape()
    # 注意下面的//一定要整除
    input_reshaped = reshape(input_tensor, [-1, high, width, group_num, channel // group_num])
    input_transpose = transpose(input_reshaped, [0, 1, 2, 4, 3])
    return reshape(input_transpose, [-1, high, width, channel])


def sECA_block(inputs: Tensor, gamma: int=2, b: int = 1) -> Tensor:
    _, _, _, c = inputs.get_shape()
    temp_k = int(abs((log(c, 2) + b) / gamma))
    kernel_size = temp_k if temp_k % 2 else temp_k + 1
    y1 = GlobalAveragePooling2D(keepdims=True)(inputs)
    y1 = channel_shuffle(y1)
    y1 = Conv1D(filters=y1.shape[-1], kernel_size=kernel_size, padding="same")(y1)
    y1 = sigmoid(y1)

    y2 = GlobalAveragePooling2D(keepdims=True)(inputs)
    y2 = Conv1D(filters=y2.shape[-1], kernel_size=kernel_size, padding="same")(y2)
    y2 = sigmoid(y2)
    # se = y2
    se = Select_Layer()((y1, y2))
    return Multiply()([inputs, se])

def squeeze_excite_block(inputs: Tensor, ratio: int=8) -> Tensor:
    """
    input默认为最后一个

    :param inputs: 输入数据
    :param ratio: 缩放系数，此处大于0使用除法
    :return: 返回与输入数据相乘后的数据
    """
    filters = inputs.shape[-1]
    se = GlobalAveragePooling2D(keepdims=True)(inputs)
    se = Dense(filters // ratio, kernel_initializer="he_normal", use_bias=False)(se)
    se = relu(se)
    se = Dense(filters, kernel_initializer="he_normal", use_bias=False)(se)
    se = sigmoid(se)

    return Multiply()([inputs, se])

In [166]:
def conv_block(inputs: Tensor, filters: int) -> Tensor:
    """
    混合se块的卷积块

    :param inputs: 输入数据
    :param filters: 需要的维度
    :return: 数据
    """
    # x = Conv2D(filters=filters, kernel_size=(3, 3), padding="same")(inputs)
    # x = BatchNormalization()(x)
    # x = relu(x)
    #
    # x = Conv2D(filters=filters, kernel_size=(3, 3), padding="same")(x)
    # x = BatchNormalization()(x)
    # x = relu(x)
    x = Conv2D(filters=filters, kernel_size=(1, 1), padding="same")(inputs)
    x = DepthwiseConv2D(kernel_size=(7, 7), padding="same")(x)
    x = LayerNormalization()(x)
    x = Conv2D(filters=4 * filters, kernel_size=(1, 1), padding="same")(x)
    x = relu(x)

    x = Conv2D(filters=filters, kernel_size=(1, 1), padding="same")(x)

    return  sECA_block(x)  # squeeze_excite_block(x)

In [167]:
def encoder_1(inputs: Tensor) -> Tuple[Tensor, List[Tensor]]:
    """
    第一个U-net的编码器

    :param inputs: 输入数据
    :return: vgg19各个层的输出
    """
    skip_connections = []
    vgg19_model = VGG19(include_top=False, input_tensor=inputs)  # weights为默认
    vgg19_layers_name = ["block1_conv2",
                         "block2_conv2",
                         "block3_conv4",
                         "block4_conv4"]
    for layer_name in vgg19_layers_name:
        skip_connections.append(vgg19_model.get_layer(layer_name).output)
    final_outputs = vgg19_model.get_layer("block5_conv4").output
    return final_outputs, skip_connections

In [168]:
def decoder_1(inputs: Tensor, skip_connections: List[Tensor]) -> Tensor:
    """
    第一个U-net的解码器

    :param inputs: 输入数据
    :param skip_connections: 跳跃连接层
    :return: 输出数据
    """
    filters_num = [256, 128, 64, 32]
    skip_connections.reverse()
    x = inputs
    for num, filters in enumerate(filters_num):
        x = UpSampling2D(interpolation="bilinear")(x)  # size为默认
        x = concatenate((x, skip_connections[num]))
        x = conv_block(x, filters)
    return x

In [169]:
def encoder_2(inputs: Tensor) -> Tuple[Tensor, List[Tensor]]:
    """
    第二个U-net的编码器

    :param inputs: 输入数据
    :return: 各层输出数据
    """
    filers_num = [32, 64, 128, 256]
    skip_connection = []
    x = inputs

    for num, filters in enumerate(filers_num):
        x = conv_block(x, filters)
        skip_connection.append(x)
        x = MaxPooling2D()(x)  # pool_size默认
    return x, skip_connection

In [170]:
def decoder_2(inputs: Tensor,
              skip_connection_1: List[Tensor],
              skip_connection_2: List[Tensor]) -> Tensor:
    """
    第二个U-net的解码器

    :param inputs: 输入数据
    :param skip_connection_1: 第一个U-net的跳跃连接
    :param skip_connection_2: 第二个U-net的跳跃连接
    :return: 输出数据
    """
    filters_num = [256, 128, 64, 32]
    skip_connection_2.reverse()
    x = inputs

    for num, filters in enumerate(filters_num):
        x = UpSampling2D(interpolation="bilinear")(x)  # size为默认
        x = concatenate((x, skip_connection_1[num], skip_connection_2[num]))
        x = conv_block(x, filters)
    return x

In [171]:
def output_block(inputs: Tensor) -> Tensor:
    """
    输出块

    :param inputs: 输入数据
    :return: 输出数据
    """
    x  = Conv2D(filters=1, kernel_size=(1, 1), padding="same")(inputs)
    x = sigmoid(x)
    return x

In [172]:
def as_pp(inputs: Tensor, filters: int) -> Tensor:
    """
    ASPP模块

    :param inputs: 输入数据
    :param filters: 核数量
    :return: 输出数据
    """
    y = []
    input_shape = inputs.get_shape()

    y1 = AveragePooling2D(pool_size=(input_shape[1], input_shape[2]))(inputs)
    y1 = Conv2D(filters=filters, kernel_size=1, padding="same")(y1)
    y1 = BatchNormalization()(y1)
    y1 = relu(y1)
    y1 = UpSampling2D((input_shape[1], input_shape[2]), interpolation='bilinear')(y1)

    y.append(y1)

    for kernel_size, dilation_rate in zip([1, 3, 3, 3],
                                          [1, 6, 12, 18]):
        y_ = Conv2D(filters=filters, kernel_size=kernel_size,
                    dilation_rate=dilation_rate, padding="same",
                    use_bias=False)(inputs)
        y_ = BatchNormalization()(y_)
        y_ = relu(y_)
        y.append(y_)

    y = concatenate(y)

    y = Conv2D(filters=filters, kernel_size=1, dilation_rate=1,
               padding="same", use_bias=False)(y)
    y = BatchNormalization()(y)
    y = relu(y)
    return y


In [173]:
def doubleU_net(input_shape: Tuple[int, int, int]) -> Model:
    """
    模型构建

    :param input_shape: 输入数据形状
    :return: 模型
    """
    inputs = Input(shape=input_shape)
    x, skip_c1 = encoder_1(inputs)  # x为原来的下采样16倍，skip_c1保存了下采样0倍，2，4， 8
    x = as_pp(x, 64)
    x = decoder_1(x, skip_c1)
    outputs1 = output_block(x)

    x = outputs1 * inputs

    x, skip_c2 = encoder_2(x)
    x = as_pp(x, 64)
    x = decoder_2(x, skip_c1, skip_c2)
    outputs2 = output_block(x)
    outputs = concatenate((outputs1, outputs2))

    return Model(inputs=inputs, outputs=outputs)

In [174]:
model = doubleU_net((288, 384, 3))
model.summary()

Model: "model_6"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_8 (InputLayer)            [(None, 288, 384, 3) 0                                            
__________________________________________________________________________________________________
block1_conv1 (Conv2D)           (None, 288, 384, 64) 1792        input_8[0][0]                    
__________________________________________________________________________________________________
block1_conv2 (Conv2D)           (None, 288, 384, 64) 36928       block1_conv1[0][0]               
__________________________________________________________________________________________________
block1_pool (MaxPooling2D)      (None, 144, 192, 64) 0           block1_conv2[0][0]               
____________________________________________________________________________________________

In [175]:
def dice_coef(y_true: Tensor, y_pred: Tensor) -> Tensor:
    """
    dice系数计算

    :param y_true:
    :param y_pred:
    :return:
    """
    y_true = Flatten()(y_true)
    y_pred = Flatten()(y_pred)
    intersection = reduce_sum(y_true * y_pred)
    smooth = 1e-5
    return (2. * intersection + smooth) / (reduce_sum(y_true) + reduce_sum(y_pred) + smooth)

In [176]:
def dice_loss(y_true: Tensor, y_pred: Tensor) -> Tensor:
    """
    dice损失

    :param y_true:
    :param y_pred:
    :return:
    """
    return constant(1.) - dice_coef(y_true, y_pred)

In [177]:
def iou(y_ture: Tensor, y_pred: Tensor):
    """
    iou计算

    :param y_ture:
    :param y_pred:
    :return:
    """
    smooth = 1e-5
    def temp_f(y_t: np.ndarray, y_p: np.ndarray):
        intersection = (y_t * y_p).sum()
        union = y_t.sum() + y_p.sum() - intersection
        result = (intersection + smooth) / (union + smooth)
        return result.astype(np.float32)
    return numpy_function(temp_f, [y_ture, y_pred], float32)  # 相当于funtion

In [178]:
def read_img(img_path: str) -> np.ndarray:
    """
    读取图片

    :param img_path:
    :return:
    """
    img_path = img_path.decode()
    img = cv2.cvtColor(cv2.imread(img_path, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB)
    if img.shape != (384, 288, 3):
        img = cv2.resize(img, (384, 288))
    img = (img / 255.).astype(np.float32)
    return img

def read_mask(mask_path: str) -> np.ndarray:
    """
    读取掩码图片

    :param mask_path:
    :return:
    """
    mask_path = mask_path.decode()
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    if mask.shape != (384, 288):
        mask = cv2.resize(mask, (384, 288))
    mask = (mask / 255.).astype(np.float32)
    mask = np.expand_dims(mask, axis=-1)
    return mask

def deal_img_mask(img_path: str, mask_path: str) -> Tuple[Tensor, Tensor]:
    def _deal(i: str, m: str):
        i = read_img(i)
        m = read_mask(m)
        m = np.concatenate([m, m], axis=-1)
        return i, m
    img, mask = numpy_function(_deal, [img_path, mask_path], [float32, float32])
    img.set_shape([288, 384, 3])
    mask.set_shape([288, 384, 2])
    return img, mask

def get_dataset(img_list: List[str], mask_list: List[str], batch: int=8):
    dataset = data.Dataset.from_tensor_slices((img_list, mask_list))
    dataset = dataset.shuffle(buffer_size=32)
    dataset = dataset.map(map_func=deal_img_mask, num_parallel_calls=2)
    dataset = dataset.repeat()
    dataset = dataset.batch(batch)
    return dataset


In [179]:
def get_path_list(img_path: str, mask_path: str):
    def _get_path_list(path: str):
        temp = Path(path)
        temp = [i for i in temp.glob('*.*')]
        temp.sort(key=lambda x: x.stem)
        return [str(i) for i in temp]
    return shuffle(_get_path_list(img_path), _get_path_list(mask_path), random_state=42)

# a, b = get_path_list('data/test/x', "data/test/y")
# print(a, b)

In [180]:
# a = cv2.cvtColor(cv2.imread("PNG/Original/1.png", cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB)
# print(a.shape)
# b = cv2.imread("PNG/Ground Truth/1.png", cv2.IMREAD_GRAYSCALE)
# print(b.shape)
# _ = plt.figure()
# plt.subplot(2, 1, 1)
# plt.imshow(a)
# plt.subplot(2, 1, 2)
# plt.imshow(b)
# plt.show()

In [181]:
callbacks = [
    ModelCheckpoint("model_check_point/weights_{epoch:03d}-{val_loss:.4f}.h5", save_weights_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=20),
    CSVLogger("csv_log/data.csv"),
    EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=False)
    ]

In [182]:
batch_size = 2
epochs = 300
lr = 1e-5
metrics = [dice_coef,
           iou,
           Recall(),
           Precision()]

In [183]:
train_x, train_y = get_path_list("data/train/x", "data/train/y")
valid_x, valid_y = get_path_list("data/valid/x", "data/valid/y")
train_dataset = get_dataset(train_x, train_y, batch=batch_size)
valid_dataset = get_dataset(valid_x, valid_y, batch=batch_size)

In [184]:
model.compile(loss=BinaryCrossentropy(), optimizer=Nadam(learning_rate=lr), metrics=metrics)

In [185]:
train_steps = (len(train_x) // batch_size)
valid_steps = (len(valid_x) // batch_size)

In [186]:
if len(train_x) % batch_size != 0:
    train_steps += 1

if len(valid_x) % batch_size != 0:
    valid_steps += 1

In [187]:
history = model.fit(train_dataset,
                    epochs=epochs,
                    validation_data=valid_dataset,
                    steps_per_epoch=train_steps,
                    validation_steps=valid_steps,
                    callbacks=callbacks,
                    shuffle=False)


Epoch 1/300
  10/6600 [..............................] - ETA: 1:03:38 - loss: 0.6717 - dice_coef: 0.1215 - iou: 0.0651 - recall_6: 0.2243 - precision_6: 0.0518

KeyboardInterrupt: 

In [None]:
def show_history(model_history: History, save_pic: str=None):
    plt.style.use("ggplot")
    plt.rcParams["figure.figsize"] = (14.0, 14.0)
    _ = plt.figure()
    for i in model_history.history.keys():
        plt.plot(np.arange(0, 300), model_history.history[i], label=i)
    plt.title("loss|miou|iou|recall|precision")
    plt.xlabel("Epoch #")
    plt.ylabel("Loss/Miou/Iou/Recall/Precision")
    plt.legend()

    if save_pic:
        plt.savefig(save_pic)
    plt.show()

In [None]:
show_history(model_history=history, save_pic='test.jpg')

In [None]:
a = cv2.cvtColor(cv2.imread('test/Original/1.tif'), cv2.COLOR_BGR2RGB)
plt.figure()
plt.axis("off")
plt.imshow(a)
plt.show()

In [None]:
# def change_file_name(file_path: str):
#     def change(name: Path):
#         temp_parent = name.parent
#         temp_suffix = name.suffix
#         temp_name = name.stem.replace("p", '')
#         name.rename(rf"{temp_parent}\{temp_name}{temp_suffix}")
#     return map(change, Path(file_path).glob('*.*'))
#
# _ = [i for i in change_file_name("test/Ground Truth")]

test_x, test_y = get_path_list("data/test/x", "data/test/y")
test_dataset = get_dataset(test_x, test_y, batch=batch_size)

In [None]:
test_steps = (len(test_x) // batch_size)

if len(test_x) % batch_size != 0:
    test_steps += 1

h = model.evaluate(test_dataset, steps=test_steps)

In [None]:
print(h)

In [None]:
new_model = load_model("CVC-612model.h5", custom_objects={'dice_coef': dice_coef,
                                                          'iou': iou})
new_model.summary()

In [None]:
def __read_img(img_path: str) -> np.ndarray:
    """
    读取图片

    :param img_path:
    :return:
    """
    img_path = img_path.decode()
    img = cv2.cvtColor(cv2.imread(img_path, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB)
    if img.shape != (384, 288, 3):
        img = cv2.resize(img, (384, 288))
    img = (img / 255.).astype(np.float32)
    return img

def __read_mask(mask_path: str) -> np.ndarray:
    """
    读取掩码图片

    :param mask_path:
    :return:
    """
    mask_path = mask_path.decode()
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    if mask.shape != (384, 288):
        mask = cv2.resize(mask, (384, 288))
    mask = (mask / 255.).astype(np.float32)
    mask = np.expand_dims(mask, axis=-1)
    return mask
def __deal_img_mask(img_path: str, mask_path: str) -> Tuple[Tensor, Tensor]:
    def _deal(i: str, m: str):
        i = __read_img(i)
        m = __read_mask(m)
        m = np.concatenate([m, m], axis=-1)
        return i, m
    img, mask = numpy_function(_deal, [img_path, mask_path], [float32, float32])
    img.set_shape([288, 384, 3])
    mask.set_shape([288, 384, 2])
    return img, mask

def __get_dataset(img_list: List[str], mask_list: List[str], batch: int=8):
    dataset = data.Dataset.from_tensor_slices((img_list, mask_list))
    dataset = dataset.shuffle(buffer_size=32)
    dataset = dataset.map(map_func=__deal_img_mask, num_parallel_calls=2)
    dataset = dataset.repeat()
    dataset = dataset.batch(batch)
    return dataset


In [None]:
test_x, test_y = get_path_list("data/test/x", "data/test/y")
test_dataset = __get_dataset(test_x, test_y, batch=16)

In [None]:
test_steps = (len(test_x) // 16)

if len(test_x) % 16 != 0:
    test_steps += 1

h = new_model.evaluate(test_dataset, steps=test_steps)