In [None]:
import random
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import cv2
import glob
import os
import time
from skimage.metrics import peak_signal_noise_ratio, structural_similarity
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import (
    Input, GlobalAveragePooling2D, Reshape, Dense, Multiply,
    Conv2D, Add, Activation, Lambda, MaxPooling2D, UpSampling2D,
    Concatenate, LayerNormalization, Dropout
)
from tensorflow.keras.models import Model
from tensorflow.keras.initializers import lecun_normal
# Function to load images
def load_images(noisy_dir, clean_dir, image_size=(128, 128), max_images=20000):
    noisy_images = []
    clean_images = []
    noisy_pattern = os.path.join(noisy_dir, '*.*')
    clean_pattern = os.path.join(clean_dir, '*.*')
    noisy_files = sorted(glob.glob(noisy_pattern))[:max_images]
    clean_files = sorted(glob.glob(clean_pattern))[:max_images]

    for noisy_path, clean_path in zip(noisy_files, clean_files):
        noisy_image = cv2.imread(noisy_path)
        if noisy_image is None:
            print(f"Warning: Unable to read noisy image file {noisy_path}")
            continue

        clean_image = cv2.imread(clean_path)
        if clean_image is None:
            print(f"Warning: Unable to read clean image file {clean_path}")
            continue

        noisy_image = cv2.resize(noisy_image, image_size)
        clean_image = cv2.resize(clean_image, image_size)

        noisy_images.append(noisy_image / 255.0)
        clean_images.append(clean_image / 255.0)

    return np.array(noisy_images), np.array(clean_images)


# # Load the images

# # Split the data into training, validation, and test sets
noisy_dir = '/Final_DN_Traning/DSENet/dataset/complete_merged_dataset/train/input/'
clean_dir = '/Final_DN_Traning/DSENet/dataset/complete_merged_dataset/train/groundtruth'
X, y = load_images(noisy_dir, clean_dir)

# Split the data into training and test sets
X_train, X_test_full, y_train, y_test_full = train_test_split(X, y, test_size=0.2, random_state=42)

# Use only the first 400 images of the test set
X_test, y_test = X_test_full[:4000], y_test_full[:4000]

# Print summary of dataset splits
print(f"Total images: {len(X)}")
print(f"Training set: {len(X_train)}")
print(f"Test set (restricted): {len(X_test)}")

In [None]:


# gpus = tf.config.list_physical_devices('GPU')
# if gpus:
#     try:
#         for gpu in gpus:
#             tf.config.experimental.set_memory_growth(gpu, True)
#     except RuntimeError as e:
#         print(e)


# Define PSNR Metric
def psnr_metric(y_true, y_pred):
    return tf.image.psnr(y_true, y_pred, max_val=1.0)

# Define SSIM Metric
def ssim_metric(y_true, y_pred):
    return tf.image.ssim(y_true, y_pred, max_val=1.0)

# Custom Loss Functions converted from PyTorch to TensorFlow
# Combined loss function
def combined_loss(y_true, y_pred):
    charbonnier = CharbonnierLoss()(y_true, y_pred)
    edge = EdgeLoss()(y_true, y_pred)
    ssim = SSIMLoss()(y_true, y_pred)
    return charbonnier + edge + ssim
import tensorflow as tf
from tensorflow.keras.models import load_model
# Custom Loss Functions converted from PyTorch to TensorFlow
def gaussian(window_size, sigma):
    gauss = tf.exp(-tf.square(tf.range(window_size, dtype=tf.float32) - window_size // 2) / (2 * sigma**2))
    return gauss / tf.reduce_sum(gauss)

def get_gaussian_kernel(ksize, sigma):
    if not isinstance(ksize, int) or ksize % 2 == 0 or ksize <= 0:
        raise TypeError(f"ksize must be an odd positive integer. Got {ksize}")
    return gaussian(ksize, sigma)

def get_gaussian_kernel2d(ksize, sigma):
    if not isinstance(ksize, tuple) or len(ksize) != 2:
        raise TypeError(f"ksize must be a tuple of length two. Got {ksize}")
    if not isinstance(sigma, tuple) or len(sigma) != 2:
        raise TypeError(f"sigma must be a tuple of length two. Got {sigma}")
    
    kernel_x = get_gaussian_kernel(ksize[0], sigma[0])
    kernel_y = get_gaussian_kernel(ksize[1], sigma[1])
    
    kernel_2d = tf.tensordot(kernel_x, kernel_y, axes=0)
    return kernel_2d
# Define ENL Metric
def enl_metric(image):
    """
    Calculate the Enhanced Non-Local (ENL) metric.
    ENL = (mean_intensity^2) / variance
    """
    image = tf.cast(image, tf.float32)
    mean_intensity = tf.reduce_mean(image)
    variance = tf.math.reduce_variance(image)
    return tf.math.divide_no_nan(mean_intensity ** 2, variance)

# Define FOM Metric
def fom_metric(clean_image, denoised_image):
    """
    Calculate the Figure of Merit (FOM).
    FOM = SSIM(clean_image, denoised_image) * PSNR(clean_image, denoised_image)
    """
    ssim_value = tf.image.ssim(clean_image, denoised_image, max_val=1.0)
    psnr_value = tf.image.psnr(clean_image, denoised_image, max_val=1.0)
    return ssim_value * psnr_value

# PSNR Loss
class PSNRLoss(tf.keras.losses.Loss):
    def __init__(self, loss_weight=1.0, toY=False):
        super(PSNRLoss, self).__init__()
        self.loss_weight = loss_weight
        self.scale = 10 / tf.math.log(10.0)
        self.toY = toY
        self.coef = tf.constant([65.481, 128.553, 24.966], shape=(1, 1, 1, 3))
    
    def call(self, y_true, y_pred):
        if self.toY:
            y_true = tf.reduce_sum(y_true * self.coef, axis=-1, keepdims=True) + 16
            y_pred = tf.reduce_sum(y_pred * self.coef, axis=-1, keepdims=True) + 16
            
            y_true = y_true / 255.0
            y_pred = y_pred / 255.0
        
        mse_loss = tf.reduce_mean(tf.square(y_pred - y_true), axis=[1, 2, 3])
        loss = -self.loss_weight * self.scale * tf.reduce_mean(tf.math.log(mse_loss + 1e-8))
        return loss

# SSIM Loss
class SSIMLoss(tf.keras.losses.Loss):
    def __init__(self, window_size=11, max_val=1.0):
        super(SSIMLoss, self).__init__()
        self.window_size = window_size
        self.max_val = max_val
        self.window = get_gaussian_kernel2d((window_size, window_size), (1.5, 1.5))
        self.padding = (window_size - 1) // 2

        self.C1 = (0.01 * self.max_val) ** 2
        self.C2 = (0.03 * self.max_val) ** 2
    
    def filter2D(self, img, kernel):
        kernel = kernel[:, :, tf.newaxis, tf.newaxis]
        kernel = tf.tile(kernel, [1, 1, img.shape[-1], 1])
        return tf.nn.depthwise_conv2d(img, kernel, strides=[1, 1, 1, 1], padding='SAME')
    
    def call(self, y_true, y_pred):
        kernel = self.window
        mu1 = self.filter2D(y_true, kernel)
        mu2 = self.filter2D(y_pred, kernel)

        mu1_sq = tf.square(mu1)
        mu2_sq = tf.square(mu2)
        mu1_mu2 = mu1 * mu2

        sigma1_sq = self.filter2D(y_true * y_true, kernel) - mu1_sq
        sigma2_sq = self.filter2D(y_pred * y_pred, kernel) - mu2_sq
        sigma12 = self.filter2D(y_true * y_pred, kernel) - mu1_mu2

        ssim_map = ((2 * mu1_mu2 + self.C1) * (2 * sigma12 + self.C2)) / \
                   ((mu1_sq + mu2_sq + self.C1) * (sigma1_sq + sigma2_sq + self.C2))

        return tf.reduce_mean((1.0 - ssim_map) / 2.0)

# Charbonnier Loss
class CharbonnierLoss(tf.keras.losses.Loss):
    def __init__(self, eps=1e-3):
        super(CharbonnierLoss, self).__init__()
        self.eps = eps
    
    def call(self, y_true, y_pred):
        diff = y_pred - y_true
        loss = tf.reduce_mean(tf.sqrt(tf.square(diff) + self.eps**2))
        return loss

# Edge Loss
class EdgeLoss(tf.keras.losses.Loss):
    def __init__(self):
        super(EdgeLoss, self).__init__()
        k = tf.constant([[0.05, 0.25, 0.4, 0.25, 0.05]], dtype=tf.float32)
        self.kernel = tf.matmul(k, k, transpose_b=True)
        self.kernel = self.kernel[:, :, tf.newaxis, tf.newaxis]
        self.loss_fn = CharbonnierLoss()

    def conv_gauss(self, img):
        n_channels = img.shape[-1]
        kernel = tf.tile(self.kernel, [1, 1, n_channels, 1])
        return tf.nn.depthwise_conv2d(img, kernel, strides=[1, 1, 1, 1], padding='SAME')

    def laplacian_kernel(self, img):
        filtered = self.conv_gauss(img)
        downsampled = filtered[:, ::2, ::2, :]
        upsampled = tf.image.resize(downsampled, filtered.shape[1:3])
        filtered_up = self.conv_gauss(upsampled)
        return img - filtered_up

    def call(self, y_true, y_pred):
        return self.loss_fn(self.laplacian_kernel(y_true), self.laplacian_kernel(y_pred))
# Define or import all your custom classes and functions
# Custom Layers
class SwinTransformerBlock(tf.keras.layers.Layer):
    def __init__(self, dim, num_heads, window_size=7, mlp_ratio=4., dropout=0.0, **kwargs):
        super(SwinTransformerBlock, self).__init__(**kwargs)
        self.dim = dim
        self.num_heads = num_heads
        self.window_size = window_size
        self.mlp_ratio = mlp_ratio
        self.dropout = dropout
        
        # Simplified attention mechanism
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.attention = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=dim)
        self.projection = Dense(dim)  # Align attention output back to `dim`
        
        # Reduced MLP layer
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.mlp_dense = Dense(int(dim * mlp_ratio), activation='relu')  # Reduced to single layer MLP
        self.mlp_output_projection = Dense(dim)  # Project back to `dim`
        
        self.dropout_layer = Dropout(rate=dropout)
        

# Now, load the model using custom_objects
model_save_path = '/Final_DN_Traning/DSENet/DN_model/HSENet.keras'
model = load_model(
    model_save_path,
    custom_objects={
        'SwinTransformerBlock': SwinTransformerBlock,
        'PSNRLoss': PSNRLoss,
        'SSIMLoss': SSIMLoss,
        'CharbonnierLoss': CharbonnierLoss,
        'EdgeLoss': EdgeLoss,
        'psnr_metric': psnr_metric,
        'ssim_metric': ssim_metric,
        'combined_loss': combined_loss
    }
)

# Now you can use your loaded model for inference or further training

# Function to load images
def load_images(noisy_dir, clean_dir, image_size=(128, 128), max_images=20000):
    noisy_images = []
    clean_images = []
    noisy_pattern = os.path.join(noisy_dir, '*.*')
    clean_pattern = os.path.join(clean_dir, '*.*')
    noisy_files = sorted(glob.glob(noisy_pattern))[:max_images]
    clean_files = sorted(glob.glob(clean_pattern))[:max_images]

    for noisy_path, clean_path in zip(noisy_files, clean_files):
        noisy_image = cv2.imread(noisy_path)
        if noisy_image is None:
            print(f"Warning: Unable to read noisy image file {noisy_path}")
            continue

        clean_image = cv2.imread(clean_path)
        if clean_image is None:
            print(f"Warning: Unable to read clean image file {clean_path}")
            continue

        noisy_image = cv2.resize(noisy_image, image_size)
        clean_image = cv2.resize(clean_image, image_size)

        noisy_images.append(noisy_image / 255.0)
        clean_images.append(clean_image / 255.0)

    return np.array(noisy_images), np.array(clean_images)

# Generate denoised images using the model
start_time = time.time()
try:
    denoised_images_test = model.predict(X_test)
except ValueError as e:
    print(f"Error during prediction: {e}")
    print("Resizing test images to match model input size...")
    X_test_resized = np.array([cv2.resize(img, (128, 128)) for img in X_test])
    denoised_images_test = model.predict(X_test_resized)
end_time = time.time()

denosising_time = end_time - start_time
print(f"Time taken for denoising {len(X_test)} patches of size 128x128: {denosising_time:.4f} seconds")

# Create output directories to save images
output_dir = '/Final_DN_Traning/DSENet/DN_model/HSENet_v8_merged_Denoised_400_images_same2'
os.makedirs(output_dir, exist_ok=True)

# Save all images (noisy, clean, and denoised) into the output directory
for idx in range(len(X_test)):
    noisy_image_path = os.path.join(output_dir, f'noisy_image_{idx}.png')
    clean_image_path = os.path.join(output_dir, f'clean_image_{idx}.png')
    denoised_image_path = os.path.join(output_dir, f'denoised_image_{idx}.png')

    # Convert RGB to BGR before saving to maintain original colors
    cv2.imwrite(noisy_image_path, cv2.cvtColor((X_test[idx] * 255).astype(np.uint8), cv2.COLOR_RGB2BGR))
    cv2.imwrite(clean_image_path, cv2.cvtColor((y_test[idx] * 255).astype(np.uint8), cv2.COLOR_RGB2BGR))
    cv2.imwrite(denoised_image_path, cv2.cvtColor((denoised_images_test[idx] * 255).astype(np.uint8), cv2.COLOR_RGB2BGR))

# Calculate and display mean PSNR and SSIM for all denoised images
psnr_values = []
ssim_values = []
for idx in range(len(X_test)):
    psnr_value = peak_signal_noise_ratio(y_test[idx], denoised_images_test[idx], data_range=1.0)
    ssim_value = structural_similarity(y_test[idx], denoised_images_test[idx], channel_axis=-1, data_range=1.0, win_size=7)
    psnr_values.append(psnr_value)
    ssim_values.append(ssim_value)

mean_psnr = np.mean(psnr_values)
mean_ssim = np.mean(ssim_values)
print(f"Mean PSNR: {mean_psnr:.4f}")
print(f"Mean SSIM: {mean_ssim:.4f}")

# Plot random results
def plot_random_results(noisy_images, clean_images, denoised_images, n=5):
    """
    Plot random results from noisy, clean, and denoised images.
    
    Parameters:
        noisy_images (numpy array): Array of noisy images.
        clean_images (numpy array): Array of clean images.
        denoised_images (numpy array): Array of denoised images.
        n (int): Number of random images to plot.
    """
    # Select random indices from the test set
    indices = random.sample(range(len(noisy_images)), n)

    plt.figure(figsize=(15, 10))
    for i, idx in enumerate(indices):
        # Plot Noisy Image
        plt.subplot(n, 3, i * 3 + 1)
        plt.imshow(noisy_images[idx])
        plt.title('Noisy Image')
        plt.axis('off')

        # Plot Clean Image
        plt.subplot(n, 3, i * 3 + 2)
        plt.imshow(clean_images[idx])
        plt.title('Clean Image')
        plt.axis('off')

        # Plot Denoised Image
        plt.subplot(n, 3, i * 3 + 3)
        plt.imshow(denoised_images[idx])
        plt.title('Denoised Image')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# Call the function to visualize random results
plot_random_results(X_test, y_test, denoised_images_test, n=5)
