# Импорт библиотек

In [None]:
!pip install tensorflow
!pip install matplotlib 
!pip install pydot 
!pip install graphviz 
!pip install tqdm

In [None]:
import numpy as np
import os

import pathlib
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from keras import layers
from PIL import Image, ImageFilter
from tqdm import tqdm

### Загрузка датасета (на примере обработанного датасета "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz")

In [None]:
# скачать датасет
# закомменть после того как скачаешь
dataset_url = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"
archive = tf.keras.utils.get_file(origin=dataset_url, extract=True, cache_dir=os.getcwd())
data_dir = pathlib.Path(archive).with_suffix('')

In [None]:
# тоотношения тренировочной, валидационной и тестовой выборки
TRAIN_PERCENT = 0.60 
VAL_PERCENT = 0.20
TEST_PERCENT = 1 - TRAIN_PERCENT - VAL_PERCENT

if (TRAIN_PERCENT + VAL_PERCENT + TEST_PERCENT) != 1.:
    raise ValueError('Сумма процентов должна быть равна 1')

# папка в которую загрузится датасет flower_photos от tensorflow.org
PATH_TO_UNPREPARED_DATASET = 'datasets/flower_photos.tgz/flower_photos'
# папка в которой будет храниться готовый датасет после предобработки
PATH_TO_SAVE_DATASET = 'labeled_dataset'

# если поменять результат обучения измениться
# детерминирование случайных величин
SEED = 290
tf.random.set_seed(SEED)
np.random.seed(SEED)
DATASET_PATH = pathlib.Path(PATH_TO_SAVE_DATASET)
BATCH_SIZE = 32
IMAGE_HEIGHT = 256
IMAGE_WIDTH = 256

# словарь содержащий пути к фото для разных выборок
dataset = {'train':[],
           'val':[],
           'test':[]}

In [None]:
# распределяет все фото по тренировочному, валидационному и тестовому датасету
# нужно чтобы представители каждого класса из скачанного датасета присутствовали в 
# каждой выборке
for tup in os.walk(PATH_TO_UNPREPARED_DATASET):
    # tup - это кортеж 3 элементов
    # tup[0] - относительный путь до обрабатываемой папки
    # tup[1] - список всех папок в ней
    # tup[2] - список всех файлов в ней

    # если в папке есть изображения типа jpg
    if [filename for filename in tup[2] if filename.endswith('.jpg')]:
        print(f'Путь к фоторафиям: {tup[0]}. Кол-во фото в папке: {len(tup[2])}')
        # получить относительный путь ко всем фото в папке
        temp_arr = np.array([pathlib.Path(tup[0])/filename for filename in tup[2]])
        # разделить список путей к фото на 3 части для каждой выборки
        temp_train, temp_validate, temp_test = np.split(temp_arr, 
                                                        [int(temp_arr.shape[0]*TRAIN_PERCENT), 
                                                         int(temp_arr.shape[0]*(TRAIN_PERCENT+VAL_PERCENT))])
        # add ndarray to list of each dataset part
        # like dataset['train'] = list(np.ndarray(1,2,3), np.ndarray(4,5,6))
        dataset['train'].append(temp_train)
        dataset['val'].append(temp_validate)
        dataset['test'].append(temp_test)

# concatenate each numpy ndarray in each dataset part to one ndarray
for key, value in dataset.items():
    dataset[key] = np.concatenate(value)

In [None]:
# print dataset info
for key, value in dataset.items():
    print('|'*8,f'{key} выборка','|'*8)
    print(f'\tКол-во наблюдений: {value.shape[0]}')
    print(f'\Пример путей к фото (первые два):',*value[:2],sep='\n\t')
    print()

In [None]:
# blur labeled images and put all images in new folders
for key, value in dataset.items():
    # create path to save new photo
    path_to_save_blur =  pathlib.Path(PATH_TO_SAVE_DATASET)/key/'blur'
    if not os.path.exists(path_to_save_blur):
        os.makedirs(path_to_save_blur)
        
    path_to_save_sharp =  pathlib.Path(PATH_TO_SAVE_DATASET)/key/'sharp'
    if not os.path.exists(path_to_save_sharp):
        os.makedirs(path_to_save_sharp)

    for image_path in tqdm(value, desc=f'Подготовка данных для {key} выборки'):
        filename = image_path.name
        
        img = keras.utils.load_img(image_path)
        img = keras.utils.img_to_array(img)
        img = keras.layers.Resizing(IMAGE_HEIGHT, IMAGE_WIDTH)(img)
        img = keras.utils.array_to_img(img)
        img.save(path_to_save_sharp / filename)

        filename = "blur_"+filename
        
        img = img.filter(ImageFilter.BLUR)
        img.save(path_to_save_blur / filename)
        
print('Подготовка завершена')

In [None]:
# tf.keras.preprocessing.image_dataset_from_directory 
# автоматически делит данные на батчи и размечает данные 
# как класс blur и sharp на основании структуры переданной папки
# В нашем случае передается папка со структурой:
# train/
#     blur/
#         image1.jpg
#         image2.jpg
#     sharp/
#         image1.jpg
#         image2.jpg
# Изображения класса blur помечаются как 1, класса sharp как 0
# tf.keras.preprocessing.image_dataset_from_directory автоматом
# приводит все изображения к единому размеру (IMAGE_HEIGHT, IMAGE_WIDTH)

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    DATASET_PATH/'train',
    labels='inferred',
    label_mode='binary',
    color_mode='rgb',
    batch_size=BATCH_SIZE,
    data_format='channels_last',
    image_size=(IMAGE_HEIGHT, IMAGE_WIDTH),
    shuffle=True,
    seed=SEED,
    verbose=True
)

val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    DATASET_PATH/'val',
    labels='inferred',
    label_mode='binary',
    color_mode='rgb',
    batch_size=BATCH_SIZE,
    data_format='channels_last',
    image_size=(IMAGE_HEIGHT, IMAGE_WIDTH),
    shuffle=True,
    seed=SEED,
    verbose=True
)

test_ds = tf.keras.preprocessing.image_dataset_from_directory(
    DATASET_PATH/'test',
    labels='inferred',
    label_mode='binary',
    color_mode='rgb',
    batch_size=BATCH_SIZE,
    data_format='channels_last',
    image_size=(IMAGE_HEIGHT, IMAGE_WIDTH),
    shuffle=True,
    seed=SEED,
    verbose=True
)

### пример 1 батча наблюдения и его маркировки

In [None]:
print(type(train_ds))
batch_shapes = [x.shape for x in list(train_ds)[0]]

# размер входного батча данных (batch_size, rows, cols, challels)
print(f'Размер входного батча данных: {batch_shapes[0]}')

# размер выходного батча данных (batch_size, num_classes)
# если классов 2 (как у нас - blur и sharp), то num_classes равен 1 (ибо 2 класса кодируются 1 числом)
print(f'Размер выходного батча данных: {batch_shapes[1]}')

# Создание модели

In [None]:
# настройка шага обучения, метрик, оптимизатора
LEARNING_RATE = 0.01

LOSS_FUNCTION = keras.losses.BinaryCrossentropy()

OPTIMIZER = keras.optimizers.SGD(LEARNING_RATE)

METRICS = [keras.metrics.BinaryAccuracy(name='BinaryAccuracy'), 
           keras.metrics.MeanSquaredError(name='MeanSquaredError'),
           keras.metrics.Precision(name='Precision'),
           keras.metrics.Recall(name='Recall'),
           keras.metrics.AUC(name='PR_AUC', curve='PR'),
           keras.metrics.AUC(name='ROC_AUC', curve='ROC')]

### Блок предобработки изображений

In [None]:
# пиксели RGB имеют занчения от 0 до 255, этот слой приводит их к диапазону от 0 до 1
rescale = tf.keras.Sequential([
  layers.Rescaling(1./255)
], name='preprocess_part')

### Блок аугментации изображений

In [None]:
# аугментации изображений для улучшения качества обучения

augment_data = tf.keras.Sequential([
  layers.RandomFlip("horizontal_and_vertical"), # отражение по вертикали или горизонтали
  layers.RandomRotation(0.5, fill_mode='reflect'), # поворот по часовой стрелке на случайный угол
  layers.RandomTranslation(0.2,0.2, fill_mode="reflect"), # сдвиг изображения по вертикали или горизонтали
  layers.RandomZoom(0.3,0.3, fill_mode="reflect") # случайное приближение изображения
], name='augment_part')

### Архитектура модели

#### Медель + слои аугментации и слои предобработки
##### Слои аугментации используются в рантайме только во време обучения

In [None]:
# можно resize_and_rescale и augment_data не делать частью модели
# а применить к датасетам train_ds, val_ds, test_ds
# вопросики в столбце Output Shape - норма (тк размер входного слоя не указан)
# размер станет известен после обучения модели

model = tf.keras.Sequential([
  # Add the preprocessing layers you created earlier.
    rescale,
    augment_data,
    layers.Conv2D(32, (3,3), 1, activation='relu',),
    layers.MaxPooling2D(),
    layers.Conv2D(32, (3,3), 1, activation='relu'),
    layers.MaxPooling2D(),
    layers.Conv2D(32, (3,3), 1, activation='relu'),
    layers.MaxPooling2D(),
    layers.Conv2D(16, (3,3), 1, activation='relu'),
    layers.MaxPooling2D(),
    layers.Flatten(),
    layers.Dense(64, activation='selu'),
    layers.Dropout(0.5),
    layers.Dense(32, activation='selu'),
    layers.Dropout(0.5),
    layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer=OPTIMIZER, loss=LOSS_FUNCTION, metrics=METRICS)

# model summary
print(model.summary())

### Пример предобработанного и аугментированного изображения

In [None]:
FONT_SIZE = 15
fig, axes = plt.subplots(1,2)

fig.set_figwidth(22)
fig.set_figheight(8)

# read image
img = list(train_ds)[0][0][0]
axes[0].imshow(tf.keras.utils.array_to_img(img))
axes[0].set_title(f"Пример размытого изображения", fontsize=FONT_SIZE, pad=15)

# augmentate image
img = augment_data(tf.expand_dims(img, axis=0))[0]
axes[1].imshow(tf.keras.utils.array_to_img(img))
axes[1].set_title(f"Пример аугментированного размытого изображения", fontsize=FONT_SIZE, pad=15)

plt.show()

### Обучение модели

#### Колбэк функции для вызова в процессе обучения

In [None]:
callback_list = [] # массив колбэков до подачи в колбек "callbacklist"

# если модель плохо учится - остановит обучение
callback_list.append(keras.callbacks.EarlyStopping(
            monitor = 'val_loss', 
            min_delta = 0.0001, 
            patience = 3,
            restore_best_weights = True
            ))

# если модель не учится - уменьшит шаг LEARNING_RATE
callback_list.append(keras.callbacks.ReduceLROnPlateau(
            monitor = 'loss', 
            factor = 0.2, 
            patience = 2, 
            verbose = 1,
            mode = 'auto', 
            min_delta = 0.001, 
            cooldown = 2, 
            min_lr = 0
            ))

#### Обучение

In [None]:
history = model.fit(train_ds,
                    batch_size = BATCH_SIZE, 
                    epochs = 30, 
                    verbose = 1, 
                    validation_data = val_ds, 
                    callbacks = callback_list)

In [None]:
# итоговый размер модели
print(model.summary())

In [None]:
FONT_SIZE = 15
for key in [k for k in history.history.keys() if not k.startswith('val')]:
    fig, ax = plt.subplots()

    fig.set_figwidth(12)
    fig.set_figheight(8)
    
    plt.plot(history.history[key], 
             label='Train dataset',  linewidth=1.5, color='blue')
    if key != 'learning_rate':
        plt.plot(history.history[f'val_{key}'], linestyle = '--', 
             label='Validation dataset',  linewidth=3, color='red')
    
    ax.set_xlabel('Epoch number', fontsize=FONT_SIZE)
    ax.set_ylabel(f'{key} value', fontsize=FONT_SIZE)
    ax.set_title(f"Learning process {key} plot", fontsize=FONT_SIZE, pad=15)
    
    ax.patch.set_alpha(0)
    
    #  Устанавливаем форматирование делений:
    ax.tick_params(axis='both', which='both', labelsize = FONT_SIZE)
    
    # Вывод и настройка сетки
    ax.minorticks_on()
    ax.grid(which='major', linewidth=2)
    ax.grid(which='minor', color = 'gray', linestyle = ':')
    
    ax.legend(fontsize = FONT_SIZE, facecolor = "white")
    
    plt.show()

In [None]:
# тестирование модели и итоговые значения метрик на всех выборках
train_res = model.evaluate(train_ds, batch_size = BATCH_SIZE, verbose=0, return_dict=True)
val_res = model.evaluate(val_ds, batch_size = BATCH_SIZE, verbose=0, return_dict=True)
test_res = model.evaluate(test_ds, batch_size = BATCH_SIZE, verbose=0, return_dict=True)

In [None]:
print('|'*8,'train','|'*8)#,'\n',train_res,'\n')
print(*[(key,value) for (key,value) in train_res.items()],sep='\n')
print()
print('|'*8,'val','|'*8)#,'\n',train_res,'\n')
print(*[(key,value) for (key,value) in val_res.items()],sep='\n')
print()
print('|'*8,'test','|'*8)#,'\n',train_res,'\n')
print(*[(key,value) for (key,value) in test_res.items()],sep='\n')
print()

In [None]:
# сохранения модели по желанию
#model.save(f'model.keras')