In [1]:
import os
import random
from pathlib import Path
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from model import edsr
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.losses import categorical_crossentropy, sparse_categorical_crossentropy, mean_squared_error
from tensorflow.keras.preprocessing import *
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.python.ops import math_ops
from tensorflow.python.keras import backend as K
from tensorflow.python.framework import ops
from tensorflow.python.keras import initializers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from glob import glob
from tensorflow._api.v2.nn import depth_to_space
import matplotlib.pyplot as plt
from PIL import Image
import random

In [2]:
# 하이퍼 파라미터
batch_size = 4
epochs = 10

AUTOTUNE = tf.data.experimental.AUTOTUNE

In [3]:
# 데이터셋 경로 설정
data_dir = Path("./image_dataset")  # 이미지 파일 총 4개 HR_train, HR_valid, LR_train, LR_valid

# 이미지 파일 경로 리스트 생성
train_hr_paths = [str(p) for p in (data_dir / "DIV2K_train_HR").glob("*")]
train_lr_paths = [str(p) for p in (data_dir / "DIV2K_train_LR_bicubicX4").glob("*")]
valid_hr_paths = [str(p) for p in (data_dir / "DIV2K_valid_HR").glob("*")]
valid_lr_paths = [str(p) for p in (data_dir / "DIV2K_valid_LR_bicubic_X4").glob("*")]

# 데이터셋 생성
train_dataset = tf.data.Dataset.from_tensor_slices((train_lr_paths, train_hr_paths))
valid_dataset = tf.data.Dataset.from_tensor_slices((valid_lr_paths, valid_hr_paths))

# 이미지 읽기 함수 정의
def read_image(lr_path, hr_path):
    lr_image = tf.io.read_file(lr_path)
    lr_image = tf.image.decode_png(lr_image, channels=3)
    lr_image = tf.image.resize(lr_image, (64, 64))  # 저해상도 이미지 크기 조정
    
    hr_image = tf.io.read_file(hr_path)
    hr_image = tf.image.decode_png(hr_image, channels=3)
    hr_image = tf.image.resize(hr_image, (256, 256))  # 고해상도 이미지 크기 조정
    
    return lr_image, hr_image

# 이미지 읽기 및 전처리 함수 적용
train_dataset = train_dataset.map(read_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
valid_dataset = valid_dataset.map(read_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)

# 데이터셋 캐싱 및 배치 설정
train_dataset = train_dataset.cache().batch(batch_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
valid_dataset = valid_dataset.cache().batch(batch_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)


In [4]:
# 데이터셋 증강

def flip_left_right(lowres_img, highres_img):
    """Flips Images to left and right."""

    # Outputs random values from a uniform distribution in between 0 to 1
    rn = tf.random.uniform(shape=(), maxval=1)
    # If rn is less than 0.5 it returns original lowres_img and highres_img
    # If rn is greater than 0.5 it returns flipped image
    return tf.cond(
        rn < 0.5,
        lambda: (lowres_img, highres_img),
        lambda: (
            tf.image.flip_left_right(lowres_img),
            tf.image.flip_left_right(highres_img),
        ),
    )


def random_rotate(lowres_img, highres_img):
    """Rotates Images by 90 degrees."""

    # Outputs random values from uniform distribution in between 0 to 4
    rn = tf.random.uniform(shape=(), maxval=4, dtype=tf.int32)
    # Here rn signifies number of times the image(s) are rotated by 90 degrees
    return tf.image.rot90(lowres_img, rn), tf.image.rot90(highres_img, rn)


def random_crop(lowres_img, highres_img, hr_crop_size=96, scale=4):
    """Crop images.

    low resolution images: 24x24
    high resolution images: 96x96
    """
    lowres_crop_size = hr_crop_size // scale  # 96//4=24
    lowres_img_shape = tf.shape(lowres_img)[:2]  # (height,width)

    lowres_width = tf.random.uniform(
        shape=(), maxval=lowres_img_shape[1] - lowres_crop_size + 1, dtype=tf.int32
    )
    lowres_height = tf.random.uniform(
        shape=(), maxval=lowres_img_shape[0] - lowres_crop_size + 1, dtype=tf.int32
    )

    highres_width = lowres_width * scale
    highres_height = lowres_height * scale

    lowres_img_cropped = lowres_img[
        lowres_height : lowres_height + lowres_crop_size,
        lowres_width : lowres_width + lowres_crop_size,
    ]  # 24x24
    highres_img_cropped = highres_img[
        highres_height : highres_height + hr_crop_size,
        highres_width : highres_width + hr_crop_size,
    ]  # 96x96

    return lowres_img_cropped, highres_img_cropped

def dataset_object(dataset_cache, training=True):
    ds = dataset_cache
    ds = ds.map(
        lambda lowres, highres: random_crop(lowres, highres, scale=4),
        num_parallel_calls=AUTOTUNE,
    )

    if training:
        ds = ds.map(random_rotate, num_parallel_calls=AUTOTUNE)
        ds = ds.map(flip_left_right, num_parallel_calls=AUTOTUNE)
    # Batching Data
    ds = ds.batch(4)

    if training:
        # Repeating Data, so that cardinality if dataset becomes infinte
        ds = ds.repeat()
    # prefetching allows later images to be prepared while the current image is being processed
    ds = ds.prefetch(buffer_size=AUTOTUNE)
    return ds


train_ds = dataset_object(train_dataset, training=True) # 저해상도와 고해상도의 쌍으로 이루어짐.
val_ds = dataset_object(valid_dataset, training=False)

train_ds = train_ds.map(lambda lr, hr: (tf.squeeze(lr, axis=0), tf.squeeze(hr, axis=0)))
val_ds = val_ds.map(lambda lr, hr: (tf.squeeze(lr, axis=0), tf.squeeze(hr, axis=0)))

In [5]:
train_ds

<MapDataset shapes: ((None, None, None, 3), (None, None, None, 3)), types: (tf.float32, tf.float32)>

In [6]:
def PSNR(super_resolution, high_resolution):
    """Compute the peak signal-to-noise ratio, measures quality of image."""
    # Max value of pixel is 255
    psnr_value = tf.image.psnr(high_resolution, super_resolution, max_val=255)[0]
    return psnr_value

In [7]:
import datetime

# EDSR 모델 생성 (앞서 제공된 코드 사용)
model = edsr(num_filters=64, num_res_blocks=16)

# 옵티마이저 및 손실함수 설정
optim_edsr = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(optimizer=optim_edsr, loss="mae", metrics=['accuracy'])

filename = './checkpoints/checkpoint-epoch-{}-batch-{}-EDSR_model.h5'.format(epochs, batch_size) # from epoch  
log_dir = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

# my custom callback
callbacks = [ 
    # tensorboardm
    # tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_images=True),
    
    # 개선된 validation score를 도출해낼 때마다 weight를 .h5타입으로 중간 저장
    # 모델을 저장할 때의 파라미터를 조정하는 것
    ModelCheckpoint(filepath=filename,
                        monitor='val_loss', # 모델을 저장할 때, 기준이 되는 값을 지정. 예를 들어, validation set의 loss가 가장 작을 때 저장하고 싶으면 'val_loss'를 입력
                        save_weights_only=True, # True인 경우, 모델의 weights만 저장됨. False인 경우, 모델 레이어 및 weights 모두 저장됨.
                        verbose=1,  # 1일 경우 모델이 저장 될 때, '저장되었습니다' 라고 화면에 표시되고, 0일 경우 화면에 표시되는 것 없이 그냥 바로 모델이 저장됨.
                        save_best_only=True,  # True 인 경우, monitor 되고 있는 값을 기준으로 가장 좋은 값으로 모델이 저장됨. False인 경우, 매 에폭마다 모델이 filepath{epoch}으로 저장된다. (model0, model1, model2....)
                        mode='auto')  # auto means automatically find the best
                        ]

steps_per_epoch = len(train_lr_paths) // batch_size

# 모델 학습
model.fit(train_ds, validation_data=val_ds, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch )


Epoch 1/10


InvalidArgumentError: 2 root error(s) found.
  (0) Invalid argument:  Need minval < maxval, got 0 >= -19
	 [[{{node random_uniform_1}}]]
	 [[IteratorGetNext]]
  (1) Invalid argument:  Need minval < maxval, got 0 >= -19
	 [[{{node random_uniform_1}}]]
	 [[IteratorGetNext]]
	 [[gradient_tape/mean_absolute_error/sub/Shape_1/_6]]
0 successful operations.
0 derived errors ignored. [Op:__inference_train_function_5190]

Function call stack:
train_function -> train_function
