In [42]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Dropout, Input, MaxPooling2D, BatchNormalization, Conv2DTranspose
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras import Model, regularizers
from tensorflow.keras import layers, models
from tensorflow.keras.utils import plot_model
from time import time
import csv
import random
import datetime

from matplotlib import pyplot as plt
plt.rcParams['figure.figsize'] = [15, 10]

from helper import load_and_preprocess_image, generator_from_csv

# Set the seeds for reproducibility
from numpy.random import seed
from tensorflow.random import set_seed
seed_value = 1234578790
seed(seed_value)
set_seed(seed_value)

In [43]:
# CSV paths
TRAIN_CSV = 'annotations_train.csv'
VAL_CSV   = 'annotations_val.csv'
TEST_CSV  = 'annotations_test.csv'

In [44]:
# Hyperparameters
IMG_HEIGHT = 256
IMG_WIDTH  = 256
CHANNELS   = 3
BATCH_SIZE = 8
EPOCHS     = 10

In [45]:
# Data generators for train, val, test sets
train_gen = generator_from_csv(TRAIN_CSV, batch_size=BATCH_SIZE, shuffle=True)
val_gen   = generator_from_csv(VAL_CSV,   batch_size=BATCH_SIZE, shuffle=False)
test_gen  = generator_from_csv(TEST_CSV,  batch_size=BATCH_SIZE, shuffle=False)

# Let's guess how many lines are in each CSV
train_lines = sum(1 for _ in open(TRAIN_CSV)) - 1  # minus header
val_lines   = sum(1 for _ in open(VAL_CSV))   - 1
test_lines  = sum(1 for _ in open(TEST_CSV))  - 1

# Steps per epoch
steps_per_epoch      = train_lines // BATCH_SIZE
validation_steps     = val_lines   // BATCH_SIZE
test_steps           = test_lines  // BATCH_SIZE

In [6]:
# Define callbacks using mse
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5,
                              patience=5, min_lr=1e-6, verbose=1)
early_stop = EarlyStopping(monitor='val_loss', patience=10,
                           restore_best_weights=True, verbose=1)
checkpoint = ModelCheckpoint('unet_best_model.h5', monitor='val_loss',
                             save_best_only=True, verbose=1)

In [48]:
log_dir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

In [7]:
def build_unet(input_shape=(256, 256, 3)):
    inputs = layers.Input(shape=input_shape)
    
    # Encoder
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D((2, 2))(c1)
    
    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D((2, 2))(c2)
    
    c3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(p2)
    c3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(c3)
    
    # Decoder
    u4 = layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c3)
    u4 = layers.concatenate([u4, c2])
    c4 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(u4)
    c4 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(c4)
    
    u5 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c4)
    u5 = layers.concatenate([u5, c1])
    c5 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(u5)
    c5 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c5)
    
    outputs = layers.Conv2D(3, (1, 1), activation='sigmoid')(c5)
    
    model = models.Model(inputs, outputs)
    return model


In [None]:
model = build_dncnn(input_shape=(IMG_HEIGHT, IMG_WIDTH, CHANNELS))
model.compile(optimizer='adam', loss='mae', metrics=['accuracy', 'mae'])
model.summary()

In [None]:
plot_model(model, show_shapes=True)

In [None]:
# Fit the model
history = model.fit(
    x=train_gen,
    epochs=EPOCHS,
    steps_per_epoch=steps_per_epoch,
    validation_data=val_gen,
    validation_steps=validation_steps,
    callbacks=[reduce_lr, early_stop, checkpoint, tensorboard_callback]
)

In [None]:
def plot_history(history):
    h = history.history
    epochs = range(len(h['loss']))

    plt.subplot(121), plt.plot(epochs, h['loss'], '.-', epochs, h['val_loss'], '.-')
    plt.grid(True), plt.xlabel('epochs'), plt.ylabel('loss')
    plt.legend(['Train', 'Validation'])
    plt.subplot(122), plt.plot(epochs, h['accuracy'], '.-',
                               epochs, h['val_accuracy'], '.-')
    plt.grid(True), plt.xlabel('epochs'), plt.ylabel('Accuracy')
    plt.legend(['Train', 'Validation'])
        
    print('Train Acc     ', h['accuracy'][-1])
    print('Validation Acc', h['val_accuracy'][-1])
    
plot_history(history)

In [55]:
def mse(img1, img2):
    return np.mean((img1 - img2)**2)

def psnr(img1, img2):
    # Assumes images in [0..1], so max_val = 1.0
    mse_val = mse(img1, img2)
    if mse_val == 0:
        return 100  # max PSNR if identical
    return 10 * np.log10(1.0 / mse_val)

# Compare model output vs ground truth
model_psnr = psnr(predicts[idx], original_batch[idx])
print("Model PSNR:", model_psnr)

# Compare Gaussian vs ground truth
gaussian_psnr = psnr(gaussian_img, original_batch[idx])
print("Gaussian PSNR:", gaussian_psnr)


Model PSNR: 36.702814020810976
Gaussian PSNR: 32.64933119450204


In [60]:
model.evaluate(val_gen, steps=200)



[0.023345565423369408, 0.8799682855606079, 0.023345565423369408]

In [None]:
history.history