Failas skirtas susipažinti su darbe sudaryto neuroninio tinklo struktūra bei atlikti apmokytų tinklo modelių silpno apšvietimo vaizdų korekciją.

Žemiau pateiktame kode:
*   Duomenų rinkinių LOL, LoLEG ir LoLEG_triukšmas (LoLEG_noise) paruošimas.
*   Pateiktas sudarytas neuroninis tinklas ir jo paruošimas mokymui.
*   Sudaryto neuroninio tinklo apmokytų modelių užkrovimas į sistemą.
*   Užkrauto modelio silpno apšvietimo vaizdo korekcijos vykdymas.
*   Užkrauto modelio duomenų rinkinių metrikų rezultatų lyginimas.


Apmokyto tinklo modelių sąrašas:


1.   Sudaryto tinklo be praretinimo:
  *   Apmokytas su LOL duomenų rinkiniu: ***EGmodel_LOL_combined_losses.h5***
  *   Apmokytas su LoLEG duomenų rinkiniu: ***EGmodel_LoLEG_combined_losses.h5***
  *   Apmokytas su LoLEG_triukšmas (LoLEG_noise) duomenų rinkiniu: ***EGmodel_LoLEG_noise_combined_losses.h5***
2.   Sudaryto tinklo su praretinimu:
  *   Apmokytas su LOL duomenų rinkiniu: ***EGmodel_LOL_combined_losses_dilation_2.h5***
  *   Apmokytas su LoLEG duomenų rinkiniu: ***EGmodel_LoLEG_combined_losses_dilation_2.h5***
  *   Apmokytas su LoLEG_triukšmas (LoLEG_noise) duomenų rinkiniu: ***EGmodel_LoLEG_noise_combined_losses_dilation_2.h5***


Kodo vykdymui reikalingų bibliotekų įkėlimas.

In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

!pip install pyiqa

import os
import time
import pyiqa
import random
import shutil
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.io import read_file
from tensorflow.image import decode_png
from tensorflow.data import Dataset
from torchvision.transforms.functional import to_tensor
from glob import glob
from PIL import Image, ImageFont, ImageOps
from keras.models import Model
from keras.layers import Conv2D, Input, UpSampling2D, Concatenate, LeakyReLU, Multiply, Add, BatchNormalization, GlobalAveragePooling2D, Dense, Reshape
from keras.losses import MeanSquaredError
from keras.optimizers import Adam
from keras.applications.vgg19 import VGG19
from keras.utils.layer_utils import count_params

Tinklo bei duomenų rinkinių konfigūracija.

In [None]:
DATASET_IMAGE_WIDTH = 600
DATASET_IMAGE_HEIGHT = 400
IMAGE_SIZE_PER_AXIS = 128
IMAGE_SIZE = (IMAGE_SIZE_PER_AXIS, IMAGE_SIZE_PER_AXIS)
IMAGE_CHANNEL_NUMBER = 3
RGB_MAX = 255
BATCH_SIZE = 8
NUMBER_OF_EPOCHS = 50
INPUT = Input((None, None, IMAGE_CHANNEL_NUMBER))
USE_DILATION = False
DILATION_RATE = 2
CHARBONNIER_LOSS_RATIO = 0.6
PERCEPTUAL_LOSS_RATIO = 0.6
SSIM_LOSS_RATIO = 0.6

PATH_TO_DATASETS = "/content/"
PATH_TO_MODELS = "/content/"
LOLEG_NOISE_DATASET_NAME = "LoLEG_noise"
LOLEG_DATASET_NAME = "LoLEG"
LOL_DATASET_NAME = "LOL"
PRIMARY_DATASET_NAME = LOLEG_NOISE_DATASET_NAME
SECONDARY_DATASET_NAME = LOLEG_DATASET_NAME
TERTIARY_DATASET_NAME = LOL_DATASET_NAME

Duomenų rinkinių užkrovimas į sistemą ir paruošimas neuroninio tinklo mokymui.

In [None]:
train_primary_dataset_low_light_paths = sorted(glob(PATH_TO_DATASETS + PRIMARY_DATASET_NAME + "/our485/low/*"))
train_primary_dataset_high_light_paths = sorted(glob(PATH_TO_DATASETS + PRIMARY_DATASET_NAME + "/our485/high/*"))
validate_primary_dataset_low_light_paths = sorted(glob(PATH_TO_DATASETS + PRIMARY_DATASET_NAME + "/eval15/low/*"))
validate_primary_dataset_path_high_light_paths = sorted(glob(PATH_TO_DATASETS + PRIMARY_DATASET_NAME + "/eval15/high/*"))

train_secondary_dataset_low_light_paths = sorted(glob(PATH_TO_DATASETS + SECONDARY_DATASET_NAME + "/our485/low/*"))
train_secondary_dataset_high_light_paths = sorted(glob(PATH_TO_DATASETS + SECONDARY_DATASET_NAME + "/our485/high/*"))
validate_secondary_dataset_low_light_paths = sorted(glob(PATH_TO_DATASETS + SECONDARY_DATASET_NAME + "/eval15/low/*"))
validate_secondary_dataset_high_light_paths = sorted(glob(PATH_TO_DATASETS + SECONDARY_DATASET_NAME + "/eval15/high/*"))

train_tertiary_dataset_low_light_paths = sorted(glob(PATH_TO_DATASETS + TERTIARY_DATASET_NAME + "/our485/low/*"))
train_tertiary_dataset_high_light_paths = sorted(glob(PATH_TO_DATASETS + TERTIARY_DATASET_NAME + "/our485/high/*"))
validate_tertiary_dataset_low_light_paths = sorted(glob(PATH_TO_DATASETS + TERTIARY_DATASET_NAME + "/eval15/low/*"))
validate_tertiary_dataset_high_light_paths = sorted(glob(PATH_TO_DATASETS + TERTIARY_DATASET_NAME + "/eval15/high/*"))

def load_image(image_path):
  loaded_image = read_file(image_path)
  decoded_image = decode_png(loaded_image, channels=IMAGE_CHANNEL_NUMBER)
  decoded_image.set_shape([None, None, IMAGE_CHANNEL_NUMBER])
  normalized_image = tf.cast(decoded_image, dtype=tf.float32) / RGB_MAX

  return normalized_image

def get_random_crop_by_axis(available_crop_axis_value):
  return tf.random.uniform(shape=(), minval=0, maxval=available_crop_axis_value, dtype=tf.int32)

def load_low_high_light_images(low_light_image_source_path, high_light_image_source_path):
  low_light_image = load_image(low_light_image_source_path)
  high_light_image = load_image(high_light_image_source_path)

  available_crop_area = (DATASET_IMAGE_WIDTH - IMAGE_SIZE_PER_AXIS + 1, DATASET_IMAGE_HEIGHT - IMAGE_SIZE_PER_AXIS + 1)
  random_width_crop = get_random_crop_by_axis(available_crop_area[0])
  random_height_crop = get_random_crop_by_axis(available_crop_area[1])

  low_light_image = low_light_image[random_height_crop:random_height_crop + IMAGE_SIZE_PER_AXIS, random_width_crop:random_width_crop + IMAGE_SIZE_PER_AXIS]
  high_light_image = high_light_image[random_height_crop:random_height_crop + IMAGE_SIZE_PER_AXIS, random_width_crop:random_width_crop + IMAGE_SIZE_PER_AXIS]

  return low_light_image, high_light_image

def load_dataset_in_batches(low_light_image_paths, high_light_image_paths):
  dataset = Dataset.from_tensor_slices((low_light_image_paths, high_light_image_paths))
  dataset = dataset.map(load_low_high_light_images, num_parallel_calls=tf.data.AUTOTUNE)

  return dataset.batch(BATCH_SIZE, drop_remainder=True)

train_primary_dataset = load_dataset_in_batches(train_primary_dataset_low_light_paths, train_primary_dataset_high_light_paths)
validate_primary_dataset = load_dataset_in_batches(validate_primary_dataset_low_light_paths, validate_primary_dataset_path_high_light_paths)

train_secondary_dataset = load_dataset_in_batches(train_secondary_dataset_low_light_paths, train_secondary_dataset_high_light_paths)
validate_secondary_dataset = load_dataset_in_batches(validate_secondary_dataset_low_light_paths, validate_secondary_dataset_high_light_paths)

train_tertiary_dataset = load_dataset_in_batches(train_tertiary_dataset_low_light_paths, train_tertiary_dataset_high_light_paths)
validate_tertiary_dataset = load_dataset_in_batches(validate_tertiary_dataset_low_light_paths, validate_tertiary_dataset_high_light_paths)

Neuroninio tinklo paruošimas.

In [None]:
# Metrikos

def peak_signal_noise_ratio(ground_truth, predicted):
  ground_truth = tf.image.convert_image_dtype(ground_truth, tf.float32)
  predicted = tf.image.convert_image_dtype(predicted, tf.float32)

  return tf.image.psnr(ground_truth, predicted, max_val=1.0)

def structural_similarity_index(ground_truth, predicted):
  return tf.reduce_mean(tf.image.ssim(ground_truth, predicted, max_val=1.0))

# Nuostoliai

vgg19 = VGG19(include_top=False, weights="imagenet", input_shape=INPUT.shape[1:])
vgg19 = Model(inputs=vgg19.input, outputs=vgg19.get_layer("block3_conv3").output)
vgg19.trainable = False

def charbonnier_loss(ground_truth, predicted):
  return tf.reduce_mean(tf.sqrt(tf.square(ground_truth - predicted) + tf.square(1e-3)))

def perceptual_loss(ground_truth, predicted):
  ground_truth = vgg19(ground_truth)
  predicted = vgg19(predicted)

  return MeanSquaredError()(ground_truth, predicted)

def calculate_combined_loss(ground_truth, predicted):
  charbonnier_loss_value = CHARBONNIER_LOSS_RATIO * charbonnier_loss(ground_truth, predicted)
  perceptual_loss_value = PERCEPTUAL_LOSS_RATIO * perceptual_loss(ground_truth, predicted)
  ssim_loss_value = SSIM_LOSS_RATIO * (1 - structural_similarity_index(ground_truth, predicted))

  return charbonnier_loss_value + perceptual_loss_value + ssim_loss_value

In [None]:
# Neuroninis tinklas

def choose_dilation(input, total_filters):
  if USE_DILATION:
    convolution1 = Conv2D(filters=total_filters, kernel_size=(3,3), activation=LeakyReLU(alpha=0.3), dilation_rate=DILATION_RATE, padding="same")(input)
    return Conv2D(filters=total_filters, kernel_size=(3,3), activation=LeakyReLU(alpha=0.3), dilation_rate=DILATION_RATE, padding="same")(convolution1)
  else:
    convolution1 = Conv2D(filters=total_filters, kernel_size=(3,3), activation=LeakyReLU(alpha=0.3), padding="same")(input)
    return Conv2D(filters=total_filters, kernel_size=(3,3), activation=LeakyReLU(alpha=0.3), padding="same")(convolution1)

def block(input, total_filters):
  convolution1 = choose_dilation(input, total_filters)
  normalized = BatchNormalization()(convolution1)

  convolution2 = Conv2D(filters=total_filters, kernel_size=(3,3), activation=LeakyReLU(alpha=0.3), padding="same")(normalized)
  convolution3 = Conv2D(filters=1, kernel_size=(1,1), activation=LeakyReLU(alpha=0.3), padding="same")(convolution2)
  local_features = Multiply()([normalized, convolution3])
  local_features = Add()([normalized, local_features])

  global_average_pooling = GlobalAveragePooling2D()(normalized)
  dense1 = Dense(units=total_filters // 16, activation=LeakyReLU(alpha=0.3))(global_average_pooling)
  dense2 = Dense(units=total_filters, activation="sigmoid")(dense1)
  global_features = Reshape((1, 1, total_filters))(dense2)
  global_features = Multiply()([normalized, global_features])

  return Add()([local_features, global_features])

def EG_model(input):
  block1 = block(input, 64)
  convolution1 = Conv2D(filters=64, kernel_size=(2,2), strides=(2,2), padding="same")(block1)
  block2 = block(convolution1, 128)
  convolution2 = Conv2D(filters=128, kernel_size=(2,2), strides=(2,2), padding="same")(block2)
  block3 = block(convolution2, 256)

  up_sampled1 = UpSampling2D()(block3)
  fusion1 = Concatenate()([up_sampled1, block2])
  block4 = block(fusion1, 128)
  up_sampled2 = UpSampling2D()(block4)
  fusion2 = Concatenate()([up_sampled2, block1])
  block5 = block(fusion2, 64)

  aggregated_layers = Conv2D(filters=IMAGE_CHANNEL_NUMBER, kernel_size=(1,1), activation="sigmoid")(block5)

  return Model(inputs=[input], outputs=[aggregated_layers])

model = EG_model(INPUT)

model_trainable_parameters = count_params(model.trainable_weights)
model_non_trainable_parameters = count_params(model.non_trainable_weights)

print("Modelio parametrų skaičius: {:,}".format(model_trainable_parameters + model_non_trainable_parameters))
print("Mokymui skirtų parametrų skaičius: {:,}".format(model_trainable_parameters))
print("Mokymui neskirtų parametrų skaičius: {:,}".format(model_non_trainable_parameters))

Modelio parametrų skaičius: 2,941,680
Mokymui skirtų parametrų skaičius: 2,940,400
Mokymui neskirtų parametrų skaičius: 1,280


Neuroninio tinklo mokymas.

In [None]:
model.compile(
    optimizer=Adam(learning_rate=1e-4),
    loss=calculate_combined_loss,
    metrics=[peak_signal_noise_ratio, structural_similarity_index]
)

history = model.fit(
    train_primary_dataset,
    validation_data=validate_primary_dataset,
    epochs=NUMBER_OF_EPOCHS
)

plt.plot(history.history["loss"], label="Mokymas")
plt.plot(history.history["val_loss"], label="Validavimas")
plt.xlabel("Epochos")
plt.ylabel("Nuostoliai")
plt.title("Mokymo ir validavimo nuostoliai epochų metu", fontsize=14)
plt.legend()
plt.grid()
plt.show()


plt.plot(history.history["peak_signal_noise_ratio"], label="Mokymas")
plt.plot(history.history["val_peak_signal_noise_ratio"], label="Validavimas")
plt.xlabel("Epochos")
plt.ylabel("PSNR")
plt.title("Mokymo ir validavimo PSNR įvertis epochų metu", fontsize=14)
plt.legend()
plt.grid()
plt.show()

plt.plot(history.history["structural_similarity_index"], label="Mokymas")
plt.plot(history.history["val_structural_similarity_index"], label="Validavimas")
plt.xlabel("Epochos")
plt.ylabel("SSIM")
plt.title("Mokymo ir validavimo SSIM įvertis epochų metu", fontsize=14)
plt.legend()
plt.grid()
plt.show()

Modelių užkrovimo pasirinkimai:
1.   Sudaryto tinklo be praretinimo:
  *   (1) Apmokytas su LOL duomenų rinkiniu: ***EGmodel_LOL_combined_losses.h5***
  *   (2) Apmokytas su LoLEG duomenų rinkiniu: ***EGmodel_LoLEG_combined_losses.h5***
  *   (3) Apmokytas su LoLEG_triukšmas (LoLEG_noise) duomenų rinkiniu: ***EGmodel_LoLEG_noise_combined_losses.h5***
2.   Sudaryto tinklo su praretinimu:
  *   (4) Apmokytas su LOL duomenų rinkiniu: ***EGmodel_LOL_combined_losses_dilation_2.h5***
  *   (5) Apmokytas su LoLEG duomenų rinkiniu: ***EGmodel_LoLEG_combined_losses_dilation_2.h5***
  *   (6) Apmokytas su LoLEG_triukšmas (LoLEG_noise) duomenų rinkiniu: ***EGmodel_LoLEG_noise_combined_losses_dilation_2.h5***

In [None]:
def get_chosen_model_path(index):
  model_paths = ["EGmodel_LOL_combined_losses.h5", "EGmodel_LoLEG_combined_losses.h5", "EGmodel_LoLEG_noise_combined_losses.h5",
                 "EGmodel_LOL_combined_losses_dilation_2.h5", "EGmodel_LoLEG_combined_losses_dilation_2.h5", "EGmodel_LoLEG_noise_combined_losses_dilation_2.h5"]

  return model_paths[index - 1]

In [None]:
model_choise = 6

custom_objects = {'calculate_combined_loss': calculate_combined_loss, 'peak_signal_noise_ratio': peak_signal_noise_ratio, 'structural_similarity_index': structural_similarity_index}
reconstructed_model = keras.models.load_model(PATH_TO_MODELS + get_chosen_model_path(model_choise), custom_objects=custom_objects)

Korekcijos vykdymas su užkrautu tinklo modeliu ir duomenų rinkiniais.

In [None]:
def print_image(image):
  _ = plt.imshow(image)
  plt.axis("off")
  plt.show()

def process_image(image):
  image = tf.keras.preprocessing.image.img_to_array(image)
  image = image[:,:,:] / RGB_MAX
  single_batch_image = np.expand_dims(image, axis=0)

  start_time = time.perf_counter()
  correction_result = reconstructed_model(single_batch_image)

  duration = time.perf_counter() - start_time
  correction_times.append(duration)

  correction_result = correction_result[0] * RGB_MAX
  correction_result = Image.fromarray(np.uint8(correction_result))

  return correction_result

In [None]:
# PRIMARY_DATASET

images_to_correct = 5
correction_times = []
images_data = random.sample(train_primary_dataset_low_light_paths + validate_primary_dataset_low_light_paths, images_to_correct)

for low_light_image_path in images_data:
  low_light_image = Image.open(low_light_image_path)
  high_light_image = Image.open(low_light_image_path.replace("low", "high"))
  correction_result = process_image(low_light_image)
  for single_image in [low_light_image, high_light_image, correction_result]:
    print_image(single_image)

correction_average = sum(correction_times)/len(correction_times)
print("Korekcijos vykdymo trukmės vidurkis: {0}".format(correction_average))

In [None]:
# SECONDARY_DATASET

images_to_correct = 5
correction_times = []
images_data = random.sample(train_secondary_dataset_low_light_paths + validate_secondary_dataset_low_light_paths, images_to_correct)

for low_light_image_path in images_data:
  low_light_image = Image.open(low_light_image_path)
  high_light_image = Image.open(low_light_image_path.replace("low", "high"))
  correction_result = process_image(low_light_image)
  for single_image in [low_light_image, high_light_image, correction_result]:
    print_image(single_image)

correction_average = sum(correction_times)/len(correction_times)
print("Korekcijos vykdymo trukmės vidurkis: {0}".format(correction_average))

In [None]:
# TERTIARY_DATASET

images_to_correct = 5
correction_times = []
images_data = random.sample(train_tertiary_dataset_low_light_paths + validate_tertiary_dataset_low_light_paths, images_to_correct)

for low_light_image_path in images_data:
  low_light_image = Image.open(low_light_image_path)
  high_light_image = Image.open(low_light_image_path.replace("low", "high"))
  correction_result = process_image(low_light_image)
  for single_image in [low_light_image, high_light_image, correction_result]:
    print_image(single_image)

correction_average = sum(correction_times)/len(correction_times)
print("Korekcijos vykdymo trukmės vidurkis: {0}".format(correction_average))

Korekcijos vykdymas su įkeltu vaizdu.

In [None]:
uploaded_image_path = "/img1.png"
correction_times = []

low_light_image = Image.open(uploaded_image_path)
correction_result = process_image(low_light_image)
for single_image in [low_light_image, correction_result]:
  print_image(single_image)

correction_average = sum(correction_times)/len(correction_times)
print("Korekcijos vykdymo trukmė: {0}".format(correction_average))

Duomenų rinkinių įvertinimas metrikomis PSNR, SSIM, BRISQUE, NIQE.

In [None]:
def evaluate_dataset(image_paths):
  niqe_metric = pyiqa.create_metric('niqe')
  brisque_metric = pyiqa.create_metric('brisque')
  brisque_values = []
  niqe_values = []
  psnr_values = []
  ssim_values = []

  for low_light_image_path in random.sample(image_paths, len(image_paths)):
      low_light_image = Image.open(low_light_image_path)
      high_light_image = Image.open(low_light_image_path.replace("low", "high"))

      correction_image = process_image(low_light_image)

      correction_image_for_metrics = to_tensor(correction_image).unsqueeze(0)
      high_light_image_for_metrics = to_tensor(high_light_image).unsqueeze(0)

      brisque_values.append(brisque_metric(correction_image_for_metrics).item())
      niqe_values.append(niqe_metric(correction_image_for_metrics).item())

      single_psnr_value = peak_signal_noise_ratio(high_light_image_for_metrics, correction_image_for_metrics)
      ssim_correction_image = correction_image_for_metrics.permute(0, 2, 3, 1)
      ssim_correction_image_expected = to_tensor(high_light_image).unsqueeze(0).permute(0, 2, 3, 1)
      single_ssim_value = structural_similarity_index(ssim_correction_image, ssim_correction_image_expected)
      psnr_values.append(single_psnr_value)
      ssim_values.append(single_ssim_value)

  correction_average = sum(correction_times)/len(correction_times)
  brisque_average = sum(brisque_values)/len(brisque_values)
  niqe_average = sum(niqe_values)/len(niqe_values)
  psnr_average = sum(psnr_values)/len(psnr_values)
  ssim_average = sum(ssim_values)/len(ssim_values)

  print('[VIDUTINIS] Korekcijos vykdymo trukmė: {0} | BRISQUE: {1} | NIQE: {2} | PSNR: {3} | SSIM: {4}'.format(correction_average, brisque_average, niqe_average, psnr_average, ssim_average))

In [None]:
combined_primary_dataset_images_paths = train_primary_dataset_low_light_paths + validate_primary_dataset_low_light_paths
combined_scondary_dataset_images_paths = train_secondary_dataset_low_light_paths + validate_secondary_dataset_low_light_paths
combined_tertiary_dataset_images_paths = train_tertiary_dataset_low_light_paths + validate_tertiary_dataset_low_light_paths

correction_times = []
evaluate_dataset(combined_primary_dataset_images_paths)
correction_times = []
evaluate_dataset(combined_scondary_dataset_images_paths)
correction_times = []
evaluate_dataset(combined_tertiary_dataset_images_paths)