In [3]:
import os
import glob
import cv2

import pandas as pd
import numpy as np
import requests

import tensorflow as tf

from skimage import measure
from skimage.io import imread, imsave, imshow
from skimage.transform import resize
from skimage.morphology import dilation, disk
from skimage.draw import polygon, polygon_perimeter

from livelossplot.tf_keras import PlotLossesCallback
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input

from tensorflow.keras.layers import AveragePooling2D, GlobalAveragePooling2D, UpSampling2D, Reshape, Dense

from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
#Здесь мы загрузили все пути к изображениям и маскам в переменные X и Y соответственно.
def load_data(path, split=0.1, val_split=0.1):
    X = sorted(glob(os.path.join(path, "images", "*.jpg")))
    Y = sorted(glob(os.path.join(path, "masks", "*.png")))

    split_size = int(len(X) * split)
    train_x, test_x = train_test_split(X, test_size=split_size, random_state=42)
    train_y, test_y = train_test_split(Y, test_size=split_size, random_state=42)

    val_split_size = int(len(train_x) * val_split)
    train_x, val_x = train_test_split(train_x, test_size=val_split_size, random_state=42)
    train_y, val_y = train_test_split(train_y, test_size=val_split_size, random_state=42)

    return (train_x, train_y), (val_x, val_y), (test_x, test_y)

In [None]:
data_path = "/content/drive/MyDrive/dataset2"
(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(data_path)

In [None]:
def unet_model(image_size, output_classes):

    #Входной слой
    input_layer = tf.keras.layers.Input(shape=(H, W, 3))
    conv_1 = tf.keras.layers.Conv2D(64, 4,
                                    activation=tf.keras.layers.LeakyReLU(),
                                    strides=2, padding='same',
                                    kernel_initializer='glorot_normal',
                                    use_bias=False)(input_layer)
    #Сворачиваем
    conv_1_1 = tf.keras.layers.Conv2D(128, 4,
                                      activation=tf.keras.layers.LeakyReLU(),
                                      strides=2,
                                      padding='same',
                                      kernel_initializer='glorot_normal',# иинициализация ксавьера
                                      use_bias=False)(conv_1)
    batch_norm_1 = tf.keras.layers.BatchNormalization()(conv_1_1)

    #2
    conv_2 = tf.keras.layers.Conv2D(256, 4,
                                    activation=tf.keras.layers.LeakyReLU(),
                                    strides=2,
                                    padding='same',
                                    kernel_initializer='glorot_normal',
                                    use_bias=False)(batch_norm_1)
    batch_norm_2 = tf.keras.layers.BatchNormalization()(conv_2)

    #3
    conv_3 = tf.keras.layers.Conv2D(512, 4,
                                    activation=tf.keras.layers.LeakyReLU(),
                                    strides=2,
                                    padding='same',
                                    kernel_initializer='glorot_normal',
                                    use_bias=False)(batch_norm_2)
    batch_norm_3 = tf.keras.layers.BatchNormalization()(conv_3)

    #4
    conv_4 = tf.keras.layers.Conv2D(512, 4,
                                    activation=tf.keras.layers.LeakyReLU(),
                                    strides=2,
                                    padding='same',
                                    kernel_initializer='glorot_normal',
                                    use_bias=False)(batch_norm_3)
    batch_norm_4 = tf.keras.layers.BatchNormalization()(conv_4)

    #5
    conv_5 = tf.keras.layers.Conv2D(512, 4,
                                    activation=tf.keras.layers.LeakyReLU(),
                                    strides=2,
                                    padding='same',
                                    kernel_initializer='glorot_normal',
                                    use_bias=False)(batch_norm_4)
    batch_norm_5 = tf.keras.layers.BatchNormalization()(conv_5)

    #6
    conv_6 = tf.keras.layers.Conv2D(512, 4,
                                    activation=tf.keras.layers.LeakyReLU(),
                                    strides=2,
                                    padding='same',
                                    kernel_initializer='glorot_normal',
                                    use_bias=False)(batch_norm_5)
#Разворачиваем
    #1
    up_1 = tf.keras.layers.Concatenate()([tf.keras.layers.Conv2DTranspose(512, 4, activation='relu', strides=2,
                                                                          padding='same',
                                                                          kernel_initializer='glorot_normal',
                                                                          use_bias=False)(conv_6), conv_5])
    batch_up_1 = tf.keras.layers.BatchNormalization()(up_1)

    #Добавим Dropout от переобучения
    batch_up_1 = tf.keras.layers.Dropout(0.25)(batch_up_1)

    #2
    up_2 = tf.keras.layers.Concatenate()([tf.keras.layers.Conv2DTranspose(512, 4, activation='relu', strides=2,
                                                                          padding='same',
                                                                          kernel_initializer='glorot_normal',
                                                                          use_bias=False)(batch_up_1), conv_4])
    batch_up_2 = tf.keras.layers.BatchNormalization()(up_2)
    batch_up_2 = tf.keras.layers.Dropout(0.25)(batch_up_2)




    #3
    up_3 = tf.keras.layers.Concatenate()([tf.keras.layers.Conv2DTranspose(512, 4, activation='relu', strides=2,
                                                                          padding='same',
                                                                          kernel_initializer='glorot_normal',
                                                                          use_bias=False)(batch_up_2), conv_3])
    batch_up_3 = tf.keras.layers.BatchNormalization()(up_3)
    batch_up_3 = tf.keras.layers.Dropout(0.25)(batch_up_3)




    #4
    up_4 = tf.keras.layers.Concatenate()([tf.keras.layers.Conv2DTranspose(256, 4, activation='relu', strides=2,
                                                                          padding='same',
                                                                          kernel_initializer='glorot_normal',
                                                                          use_bias=False)(batch_up_3), conv_2])
    batch_up_4 = tf.keras.layers.BatchNormalization()(up_4)


    #5
    up_5 = tf.keras.layers.Concatenate()([tf.keras.layers.Conv2DTranspose(128, 4, activation='relu', strides=2,
                                                                          padding='same',
                                                                          kernel_initializer='glorot_normal',
                                                                          use_bias=False)(batch_up_4), conv_1_1])
    batch_up_5 = tf.keras.layers.BatchNormalization()(up_5)


    #6
    up_6 = tf.keras.layers.Concatenate()([tf.keras.layers.Conv2DTranspose(64, 4, activation='relu', strides=2,
                                                                          padding='same',
                                                                          kernel_initializer='glorot_normal',
                                                                          use_bias=False)(batch_up_5), conv_1])
    batch_up_6 = tf.keras.layers.BatchNormalization()(up_6)


    #Выходной слой
    output_layer = tf.keras.layers.Conv2DTranspose(output_classes, 4, activation='sigmoid', strides=2,
                                                   padding='same',
                                                   kernel_initializer='glorot_normal')(batch_up_6)

    model = tf.keras.Model(inputs=input_layer, outputs=output_layer)
    return model



In [18]:
def dice_loss(y_true, y_pred, smooth=1):
    intersection = tf.reduce_sum(y_true * y_pred)
    dice_coef = (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)
    return 1 - dice_coef

def dice_coef(y_true, y_pred, smooth=1):
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)

def iou(y_true, y_pred, smooth=1):
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    union = tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) - intersection
    return (intersection + smooth) / (union + smooth)

In [19]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def shuffling(x, y):
    x, y = shuffle(x, y, random_state=42)
    return x, y

In [21]:
def read_image(path):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (256, 256))
    x = x / 255.0  # Нормализация
    x = x.astype(np.float32)
    return x

def read_mask(path):
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (256, 256))
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)  # Добавляем размерность
    return x

In [None]:
def tf_parse(x, y):
    
    def f(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    image, mask = tf.numpy_function(f, [x, y], [tf.float32, tf.float32]) #применяет функцию ф для х и у, возвращая результат преобразованный в tf.float32
    image.set_shape([256, 256, 3])
    mask.set_shape([256, 256, 1])
    return image, mask

In [None]:
#настройка датасета
def tf_dataset(X, Y, batch_size=32):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.shuffle(buffer_size=5000) #перемешивание
    dataset = dataset.map(tf_parse) #применение tf_parse
    dataset = dataset.batch(batch_size) #группирует элементы набора данных в батчи 
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE) #данные готовятся зараннее, пока модель обучается на текущем батче
    return dataset

In [None]:
if __name__ == "__main__":
    H = 256
    W = 256
    np.random.seed(42)
    tf.random.set_seed(42)

    create_dir("files_Unet")

    batch_size = 100
    lr = 1e-4
    num_epochs = 10
    model_path = os.path.join("files_Unet", "model.keras")
    csv_path = os.path.join("files_Unet", "data.csv")

    dataset_path = "/content/drive/MyDrive/dataset2"
    train_path = os.path.join(dataset_path, "train")
    valid_path = os.path.join(dataset_path, "test")

    train_x, train_y = shuffling(train_x, train_y)

    print(f"Train: {len(train_x)} - {len(train_y)}")
    print(f"Valid: {len(valid_x)} - {len(valid_y)}")

    train_dataset = tf_dataset(train_x, train_y)
    valid_dataset = tf_dataset(valid_x, valid_y)

    output_classes = 1  # Human
    model = unet_model((H, W, 3), output_classes)
    model.compile(loss=dice_loss, optimizer=Adam(lr), metrics=[dice_coef, iou, Recall(), Precision()])

    callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True, monitor='val_dice_coef', mode='max'),
        ReduceLROnPlateau(monitor='val_dice_coef', factor=0.1, patience=5, min_lr=1e-7, verbose=1, mode='max'),
        CSVLogger(csv_path),
        TensorBoard(),
        EarlyStopping(monitor='val_dice_coef', patience=20, restore_best_weights=False, mode='max'),
    ]

    model.fit(
        train_dataset.repeat(),
        epochs=num_epochs,
        steps_per_epoch=max(1, len(train_x) // batch_size),
        validation_data=valid_dataset.repeat(),
        validation_steps=max(1, len(valid_x) // batch_size),
        callbacks=callbacks
    )

Train: 4600 - 4600
Valid: 511 - 511
Epoch 1/10
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21s/step - dice_coef: 0.4189 - iou: 0.2687 - loss: 0.5811 - precision: 0.3541 - recall: 0.6853 
Epoch 1: val_dice_coef improved from -inf to 0.30869, saving model to files_Unet/model.keras
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1104s[0m 23s/step - dice_coef: 0.4204 - iou: 0.2699 - loss: 0.5796 - precision: 0.3556 - recall: 0.6873 - val_dice_coef: 0.3087 - val_iou: 0.1828 - val_loss: 0.6913 - val_precision: 0.0453 - val_recall: 2.4065e-05 - learning_rate: 1.0000e-04
Epoch 2/10
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19s/step - dice_coef: 0.5831 - iou: 0.4131 - loss: 0.4169 - precision: 0.5119 - recall: 0.8323 
Epoch 2: val_dice_coef did not improve from 0.30869
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m872s[0m 19s/step - dice_coef: 0.5836 - iou: 0.4136 - loss: 0.4164 - precision: 0.5123 - recall: 0.8322 - val_dice_coef:

In [27]:
save_path = os.path.join("/content/drive/MyDrive/models", "model_unet.keras")
model.save(save_path)
print(f"Model saved to {save_path}")

Model saved to /content/drive/MyDrive/models/model_unet.keras


In [None]:
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import LeakyReLU

model = load_model(save_path, custom_objects={'LeakyReLU': LeakyReLU}, compile=False)
model.summary()