# <a id="1">Скачивание данных</a>

In [None]:
import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'chest-xray-pneumonia:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F17810%2F23812%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240528%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240528T113859Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D4e03d923bb73a3fc5121e6feb952c1ceb0f410d8d848ff9e75871477215107ba4f2ccf68af8c008385fc5b54b8c01111691047653532c138c19eef67c143eb0d5ff25a00d0e444df0a13be971f86968de36ab88d4bb8de172fbe89aea5a48e68263fa0cd8b2608af8386b1b53a33f108ba439491dbd585e80e5c5d02356794ea42b24120fbc7a6cd8cd8d0d1ce72b3e2e255004899fa2ca4f1fc2105b6bb5c9b45ad48b02d0231066b481bb4dc049bc7f79f3f21e29573d1805b71add3baf00fadbf386933ecdbb036f350bcd7c545c5619fa17b6f7dfc48b7221e414a9ff07d13a63e83cba0b90ffae8aef886af5416837f550ff53cb798dfdbd54cbfa6af1b,labeled-chest-xray-images:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F835414%2F1426603%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240528%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240528T113859Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D7a60effa3e413a7deb903f06129757ebb47a641bb521c1c7c0ce6a1ad2cad0ec5ba0e2df6817571c9325a03dc99222b4292d95f99f6220aa0282fe690020b48b4a85caa7bcb904e838808069363b620b6e7b3b520ed4f635530850d4026617ad6ac3158039b58961778c5a1f2d5d921478c32a6bf2973f80de001ee8bb1c6f68c6662d4167a51b3a07f99be88c8d628887bd187177590c27308f454e011bcf2edb8b7257f187c459f112134f9eb16d60b9c2c1ad89810d71b89f487b03f52363a27bdf10a808587e9599d8ccf5432f6dba3d85a78059861c06d2ce9bf3f63327d36c848c798526608e7d8e2caaebd1581447deef03b6a4c842c914bd0b625f45'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


# <a id="1">Импорт нужных модулей и библиотек</a>

In [None]:
import pandas as pd
import matplotlib as mat
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import random
import os
import glob
import cv2
import tensorflow as tf

from numpy.random import seed
from sklearn.model_selection import train_test_split
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import callbacks
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.random import set_seed


# <a id="1">Инициализация генератора случайных чисел</a>

In [None]:
random.seed(42)
os.environ['PYTHONHASHSEED'] = str(42)
set_seed(42)
seed(42)
os.environ['TF_DETERMINISTIC_OPS'] = '1'

In [None]:
IMG_SIZE = 224
BATCH = 32
SEED = 42
img_size = (224, 224)

# <a id="1">Генерация датасетов</a>

In [None]:
def get_df(dir: str):
    filepaths = []
    labels = []

    folds = os.listdir(dir)

    for fold in folds:
        foldpath = os.path.join(dir, fold)
        filelist = os.listdir(foldpath)
        for file in filelist:
            fpath = os.path.join(foldpath, file)

            filepaths.append(fpath)
            labels.append(fold)

In [None]:
# Concatenate data paths with labels into one dataframe
Fseries = pd.Series(filepaths, name= 'image')
Lseries = pd.Series(labels, name='class')

return pd.concat([Fseries, Lseries], axis= 1)

In [None]:
# Указание путей к директориям
train_data_dir = 'input/labeled-chest-xray-images/chest_xray/train'
test_data_dir = 'input/labeled-chest-xray-images/chest_xray/test'

In [None]:
train_df = get_df(train_data_dir)
test_df = get_df(test_data_dir)

In [None]:
print('Train Set')
plt.figure(figsize=(12,12))

for i in range(0, 12):
    plt.subplot(3,4,i + 1)
    img = cv2.imread(train_df['image'][i])
    img = cv2.resize(img, img_size)
    plt.imshow(img)
    plt.title(train_df['class'][i])
    plt.axis("off")

plt.tight_layout()
plt.show()

In [None]:
# Извлечение данных из тренировочного набора для создания валидационного (проверочного) набора
train_df, val_df = train_test_split(train_df, test_size = 0.20, random_state = SEED, stratify = train_df['class'])

In [None]:
# Инициализация генераторов изображений с описанием процессов препроцессинга и аугментации
train_datagen = ImageDataGenerator(rescale=1/255.,
                                  rotation_range=0.1,
                                  zoom_range = 0.1,
                                  width_shift_range = 0.1,
                                  height_shift_range = 0.1)

val_datagen = ImageDataGenerator(rescale=1/255.)

# Применяем генераторы к датасетам
ds_train = train_datagen.flow_from_dataframe(train_df,
                                             x_col = 'image',
                                             y_col = 'class',
                                             color_mode ="rgb",
                                             target_size = img_size,
                                             class_mode = "binary",
                                             batch_size = BATCH,
                                             shuffle = True,
                                             seed = SEED)

ds_val = val_datagen.flow_from_dataframe(val_df,
                                            #directory=train_path,
                                            x_col = 'image',
                                            y_col = 'class',
                                            color_mode = "rgb",
                                            target_size = img_size,
                                            class_mode = 'binary',
                                            batch_size = BATCH,
                                            shuffle = True,
                                            seed = SEED)

ds_test = val_datagen.flow_from_dataframe(test_df,
                                            #directory=test_path,
                                            x_col = 'image',
                                            y_col = 'class',
                                            color_mode ="rgb",
                                            target_size = img_size,
                                            class_mode = 'binary',
                                            batch_size = 1,
                                            shuffle = False)


# <a id="1">Fine tuning</a>

In [None]:
 def get_pretrained(base_model):
    inputs = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))

    x = base_model(inputs)

    x = layers.BatchNormalization()(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.1)(x)

    output = layers.Dense(1, activation='sigmoid')(x)

    model = keras.Model(inputs=[inputs], outputs=output)

    return model

In [None]:
def fine_tuning(model_arch):
    base_model = model_arch(
    weights='imagenet',
    input_shape=(IMG_SIZE, IMG_SIZE, 3),
    include_top=False)

    base_model.trainable = True

    for layer in base_model.layers[:-13]:
        layer.trainable = False

    model_pretrained = get_pretrained()
    model_pretrained.compile(loss='binary_crossentropy'
                  , optimizer = keras.optimizers.Adam(learning_rate=3e-5), metrics='binary_accuracy')
    model_pretrained.summary()

    history = model_pretrained.fit(ds_train,
          batch_size = BATCH, epochs = 10,
          validation_data=ds_val);

    ds_test.reset()
    predictions = model_pretrained.predict(ds_test, steps=len(ds_test), verbose=0)
    pred_labels= np.where(predictions>0.5, 1, 0)
    print(metrics.classification_report(Y_test, pred_labels, labels = [0, 1]))

    confusion_matrix = metrics.confusion_matrix(Y_test, pred_labels)
    sns.heatmap(confusion_matrix, annot=True, fmt="d")
    plt.xlabel("Predicted Label", fontsize= 12)
    plt.ylabel("True Label", fontsize= 12)
    plt.show()

    roc_auc = metrics.roc_auc_score(Y_test, predictions)
    print('ROC_AUC: ', roc_auc)
    fpr, tpr, thresholds = metrics.roc_curve(Y_test, predictions)
    plt.plot(fpr, tpr, label = 'ROC_AUC = %0.3f' % roc_auc)
    plt.xlabel("False Positive Rate", fontsize= 12)
    plt.ylabel("True Positive Rate", fontsize= 12)
    plt.legend(loc="lower right")
    plt.show()

    fig, ax = plt.subplots(figsize=(20,8))
    sns.lineplot(x = history.epoch, y = history.history['binary_accuracy'])
    sns.lineplot(x = history.epoch, y = history.history['val_binary_accuracy'])
    ax.set_title('Learning Curve (Accuracy)')
    ax.set_ylabel('Accuracy')
    ax.set_xlabel('Epoch')
    ax.legend(['train', 'val'], loc='best')
    plt.show()

    score = model_pretrained.evaluate(ds_test, steps = len(test_df), verbos= 0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

    fig, ax = plt.subplots(figsize=(20,8))
    sns.lineplot(x = history.epoch, y = history.history['loss'])
    sns.lineplot(x = history.epoch, y = history.history['val_loss'])
    ax.set_title('Learning Curve (Loss)')
    ax.set_ylabel('Loss')
    ax.set_xlabel('Epoch')
    ax.legend(['train', 'val'], loc='best')
    plt.show()

    model_pretrained.save(f'/{base_model._name}_{score[1]}acc.h5')

In [None]:
for arch in [keras.applications.VGG16, keras.applications.VGG19, keras.applications.ResNet50V2]:
    keras.backend.clear_session()
    fine_tuning(arch)

# <a id="1">K-Fold кросс-валидация</a>

In [None]:
# Установка колбэков

early_stopping = callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
)

plateau = callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor = 0.2,
    patience = 3,
    min_delt = 1e-7,
    cooldown = 0,
    verbose = 1
)

In [None]:
from sklearn.model_selection import KFold
tprs = []
base_fpr = np.linspace(0, 1, 101)

plt.figure(figsize=(5, 5))
plt.axes().set_aspect('equal', 'datalim')

kf = KFold(n_splits = 10, shuffle=True, random_state=42)

for k, (train_index, test_index) in enumerate(kf.split(train_df)):
  train_data_frame = train_df.loc[train_index]
  test_data_frame = train_df.loc[test_index]

  ds_train = train_datagen.flow_from_dataframe(train_data_frame,
                                             x_col = 'image',
                                             y_col = 'class',
                                             color_mode ="rgb",
                                             target_size = img_size,
                                             class_mode = "binary",
                                             batch_size = BATCH,
                                             shuffle = True,
                                             seed = 42)

  ds_val = val_datagen.flow_from_dataframe(test_data_frame,
                                            #directory=train_path,
                                            x_col = 'image',
                                            y_col = 'class',
                                            color_mode = "rgb",
                                            target_size = img_size,
                                            class_mode = 'binary',
                                            batch_size = BATCH,
                                            shuffle = True,
                                            seed = 42)
  keras.backend.clear_session()
  model = get_pretrained(keras.applications.VGG16)
  model.compile(loss='binary_crossentropy', optimizer =keras.optimizers.Adam(learning_rate=3e-5), metrics='binary_accuracy')
  history = model.fit(ds_train,
          batch_size = BATCH, epochs = 50,
          validation_data=ds_val,
          callbacks=[early_stopping, plateau],
          steps_per_epoch=(len(train_data_frame)/BATCH),
          validation_steps=(len(test_data_frame)/BATCH));

  ds_test.reset()
  predictions = model.predict(ds_test, steps=len(ds_test), verbose=0)
  fpr, tpr, _ = roc_curve(Y_test, predictions)
  plt.plot(fpr, tpr, 'b', alpha=0.15)
  tpr = np.interp(base_fpr, fpr, tpr)
  tpr[0] = 0.0
  tprs.append(tpr)

tprs = np.array(tprs)
mean_tprs = tprs.mean(axis=0)

sd = tprs.std(axis=0)
se = sd / np.sqrt(101)
tprs_upper = np.minimum(mean_tprs + 1.96 * se, 1)
tprs_lower = mean_tprs - 1.96 * se
mean_auc = np.array(roc_aucs).mean()

plt.plot(base_fpr, mean_tprs, 'b')
plt.fill_between(base_fpr, tprs_lower, tprs_upper, color='grey', alpha=0.3)

plt.plot([0, 1], [0, 1],'r--')
plt.xlim([-0.01, 1.01])
plt.ylim([-0.01, 1.01])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.show()
print(mean_auc)

# <a id="1"></a>