In [None]:
!pip install opendatasets

In [None]:
import opendatasets as op
op.download("https://www.kaggle.com/datasets/tawsifurrahman/covid19-radiography-database/")

In [None]:
import tensorflow as tf                                                               # библиотека Tensorflow
import keras                                                                          # библиотека Keras
from keras.layers import Input, Conv2D, MaxPooling2D, Activation, ReLU, Rescaling     # cлои библиотеки Keras
from keras.layers import BatchNormalization, Conv2DTranspose, Concatenate             # cлои библиотеки Keras
from keras.layers import Rescaling, Resizing                                          # cлои библиотеки Keras
from keras.models import Model, Sequential                                            # конструкторы построения моделей библиотеки Keras

from keras.optimizers import Adam                                                     # оптимизатор Adam
from keras.utils import to_categorical                                                # преобразует вектор класса (целые числа) в двоичную матрицу класса

import numpy as np                                                                    # библиотека линейной алгебры
import pandas as pd                                                                   # библиотека обработки табличных данных
import os                                                                             # библиотека работы с функциями операционной системы, в том числе с файлами
import albumentations as A                                                            # библиотека аугментации изображений (https://albumentations.ai/)

import matplotlib.pyplot as plt                                                       # библиотека для рисования графиков

import zipfile
import os
import glob
import random
import sys

import skimage.io                           #Used for imshow function
import skimage.transform                    #Used for resize function
from skimage.morphology import label        #Used for Run-Length-Encoding RLE to create final submission
from tensorflow.keras.preprocessing.image import load_img, img_to_array, ImageDataGenerator

from tensorflow.keras.layers import *
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import LearningRateScheduler, ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
%matplotlib inline

In [None]:
def load_image(image_path, target_size=(192, 192)):
    img = load_img(image_path, target_size=target_size)
    img = img_to_array(img) / 255.0
    return img

def load_mask(mask_path, target_size=(192, 192)):
    mask = load_img(mask_path, target_size=target_size, color_mode="grayscale")
    mask = img_to_array(mask) / 255.0
    return mask

def image_mask_generator(image_paths, mask_paths, batch_size):
    while True:
        idx = np.random.permutation(len(image_paths))
        image_paths = np.array(image_paths)[idx]
        mask_paths = np.array(mask_paths)[idx]

        for i in range(0, len(image_paths), batch_size):
            batch_images = image_paths[i:i + batch_size]
            batch_masks = mask_paths[i:i + batch_size]

            images = np.array([load_image(img_path) for img_path in batch_images])
            masks = np.array([load_mask(mask_path) for mask_path in batch_masks])

            yield images, masks

In [None]:
image_dir = '/content/covid19-radiography-database/COVID-19_Radiography_Dataset/Normal/images'
label_dir = '/content/covid19-radiography-database/COVID-19_Radiography_Dataset/Normal/masks'

image_paths = [os.path.join(image_dir, img) for img in os.listdir(image_dir)]
mask_paths = [os.path.join(label_dir, msk) for msk in os.listdir(label_dir)]

X_train, X_test, y_train, y_test = train_test_split(image_paths, mask_paths, test_size=0.25, random_state=42)

batch_size = 8

train_gen = image_mask_generator(X_train, y_train, batch_size=batch_size)
val_gen = image_mask_generator(X_test, y_test, batch_size=batch_size)

In [None]:
# Функция свертки
def convolution_operation(entered_input, filters=64):
    conv1 = Conv2D(filters, kernel_size=(3, 3), padding="same")(entered_input)
    batch_norm1 = BatchNormalization()(conv1)
    acti1 = ReLU()(batch_norm1)

    conv2 = Conv2D(filters, kernel_size=(3, 3), padding="same")(acti1)
    batch_norm2 = BatchNormalization()(conv2)
    acti2 = ReLU()(batch_norm2)

    return acti2

# Латеральное соединение
def dense_skip_connection(encoder_output, skip_output, filters, depth):
    for i in range(depth):
        skip_output = convolution_operation(skip_output, filters)
        skip_output = Concatenate()([skip_output, encoder_output])
    return skip_output

# Кодировщик
def encoder(entered_input, filters=64):
    encod1 = convolution_operation(entered_input, filters)
    MaxPool1 = MaxPooling2D(strides=(2, 2))(encod1)
    return encod1, MaxPool1

# Декодировщик
def decoder(entered_input, skip, filters=64, depth=2):
    Upsample = Conv2DTranspose(filters, (2, 2), strides=2, padding="same")(entered_input)
    skip = dense_skip_connection(skip, skip, filters, depth)
    Connect_Skip = Concatenate()([Upsample, skip])
    out = convolution_operation(Connect_Skip, filters)
    return out

In [None]:
# Модель U-Net++
def U_NetPP(img_size, num_classes, deep_supervision=True):
    inputs = Input(img_size)

    # Кодировщик
    skip1, encoder_1 = encoder(inputs, 64)
    skip2, encoder_2 = encoder(encoder_1, 64*2)
    skip3, encoder_3 = encoder(encoder_2, 64*4)
    skip4, encoder_4 = encoder(encoder_3, 64*8)

    # Латентное пространство
    conv_block = convolution_operation(encoder_4, 64*16)

    # Декодировщик с плотными скип-соединениями (dense skip connections)
    decoder_1 = decoder(conv_block, skip4, 64*8, depth=3)
    decoder_2 = decoder(decoder_1, skip3, 64*4, depth=2)
    decoder_3 = decoder(decoder_2, skip2, 64*2, depth=1)
    decoder_4 = decoder(decoder_3, skip1, 64, depth=1)

    # Глубокая супервизия
    if deep_supervision:
        output1 = Conv2D(num_classes, (1, 1), padding="same", activation="sigmoid")(decoder_1)
        output2 = Conv2D(num_classes, (1, 1), padding="same", activation="sigmoid")(decoder_2)
        output3 = Conv2D(num_classes, (1, 1), padding="same", activation="sigmoid")(decoder_3)
        output4 = Conv2D(num_classes, (1, 1), padding="same", activation="sigmoid")(decoder_4)

        # Приводим все выходы к одному размеру
        output1 = UpSampling2D(size=(8, 8))(output1)
        output2 = UpSampling2D(size=(4, 4))(output2)
        output3 = UpSampling2D(size=(2, 2))(output3)

        # Среднее объединение выходов при супервизиях
        outputs = tf.keras.layers.Average()([output1, output2, output3, output4])
    else:
        # Один финальный выход (без супервизии)
        outputs = Conv2D(num_classes, (1, 1), padding="same", activation="sigmoid")(decoder_4)

    model = Model(inputs, outputs)
    return model

In [None]:
input_shape = (192, 192, 3)
num_classes = 1


model = U_NetPP(input_shape, num_classes, deep_supervision=True)

model.compile(
    optimizer='adam',
    loss="binary_crossentropy",
    metrics=['accuracy']
)

callbacks = [
    tf.keras.callbacks.ModelCheckpoint("unetpp_deep_supervision.keras", monitor='val_loss', save_best_only=True),
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
]


epochs = 3

history = model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=epochs,
    steps_per_epoch=len(X_train) // batch_size,
    validation_steps=len(X_test) // batch_size,
    callbacks=callbacks
)

In [None]:
acc = history.history['accuracy']                              # данные о точности на обучающей выборке
val_acc = history.history['val_accuracy']                      # данные о точности на проверочной выборке
loss = history.history['loss']                                             # данные об ошибке на обучающей выборке
val_loss = history.history['val_loss']                                     # данные об ошибке на проверочной выборке
epochs = range(1, len(acc) + 1)                                            # массив со значениями для оси абсцисс (Х)
plt.plot(epochs, acc, 'r', label='Точность на обучающей выборке')          #  строим график точность на обучающей выборке
plt.plot(epochs, val_acc, 'bo', label='Точность на проверочной выборке')   #  строим график точность на проверочной выборке
plt.title('График точности на обучающей и проверочной выборках')           #  заголовок графика
plt.legend()                                                               #  легенда графика
plt.figure()                                                               #  создаем новую фигуру (полотно для графика)
plt.plot(epochs, loss, 'r', label='Потери на обучающей выборке')           #  строим график потерь (ошибки) на обучающей выборке
plt.plot(epochs, val_loss, 'bo', label='Потери на валидационной выборке')  #  строим график потерь на проверочной выборке
plt.title('График потерь на обучающей и проверочной выборках')             #  заголовок графика
plt.legend()                                                               #  легенда графика
plt.show()

In [None]:
def display_multiple_results(images, true_masks, pred_masks, num_images=10):
    plt.figure(figsize=(15, num_images * 5))

    for i in range(num_images):
        # Входное изображение
        plt.subplot(num_images, 3, i * 3 + 1)
        plt.title(f'Входное изображение {i+1}')
        plt.imshow(images[i])
        plt.axis('off')

        # Оригинальная маска
        plt.subplot(num_images, 3, i * 3 + 2)
        plt.title(f'Оригинальная маска {i+1}')
        plt.imshow(images[i])
        plt.imshow(true_masks[i], alpha=0.8)
        plt.axis('off')

        # Предсказанная маска
        plt.subplot(num_images, 3, i * 3 + 3)
        plt.title(f'Предсказанная маска {i+1}')
        plt.imshow(images[i])
        plt.imshow(pred_masks[i], alpha=0.8)
        plt.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
num_images = 10
sample_images = [load_image(X_test[i]) for i in range(num_images)]
sample_masks = [load_mask(y_test[i]) for i in range(num_images)]
predicted_masks = [model.predict(sample_images[i][np.newaxis, ...])[0] for i in range(num_images)]
display_multiple_results(sample_images, sample_masks, predicted_masks, num_images=num_images)