In [None]:
import os
import numpy as np
from PIL import Image

In [None]:
base_path = "denoising-shabby-pages"
train_path = os.path.join(base_path, "train")
validate_path = os.path.join(base_path, "validate")
test_path = os.path.join(base_path, "test")

In [None]:
import os
import pandas as pd
# creates a dateframe for train, test and validate sets 
# within each df there are three columns indcies, cleaned paths image, shabby paths image 
# and lets the computer know that these images are pairs
def create_dataset_dataframe(image_dir):
    shabby_dir = os.path.join(image_dir, "shabby")
    cleaned_dir = os.path.join(image_dir, "cleaned")

    shabby_filenames = os.listdir(shabby_dir)
    cleaned_filenames = os.listdir(cleaned_dir)

    dataset_list = []

    for idx, (shabby_filename, cleaned_filename) in enumerate(zip(shabby_filenames, cleaned_filenames)):
        dataset_list.append({
            "shabby_image_path": os.path.join(shabby_dir, shabby_filename),
            "cleaned_image_path": os.path.join(cleaned_dir, cleaned_filename),
            
        })

    dataset_df = pd.DataFrame(dataset_list)
    return dataset_df

train_df = create_dataset_dataframe(train_path)
validate_df = create_dataset_dataframe(validate_path)
test_df = create_dataset_dataframe(test_path)

In [None]:
test_df.shape

In [None]:
test_df

In [None]:
#  prints out the size of each image
import cv2
size=[]
for i in range(len(test_df)):
  img_gt = cv2.imread(test_df['shabby_image_path'].iloc[i])
  size.append(img_gt.shape)

test_df['image size'] = size
test_df['image size'] = test_df['image size'].astype(str)
test_df.head()

In [None]:
import matplotlib.pyplot as plt
fig = plt.figure(figsize = (100, 10))
y = list(test_df['image size'].value_counts())
x = test_df['image size'].value_counts().index.tolist()
plt.bar(x,y)
plt.title("Images vs Size")
plt.xlabel("Size of images")
plt.ylabel("No. of images")

all of the images have the same size

In [None]:
# prints three samples of pairs of cleaned and shabby images
sample = train_df.sample(3)
fig, ax = plt.subplots(len(sample),2,figsize=(30,30))
for i in range(len(sample)):
  img = cv2.imread(sample['cleaned_image_path'].iloc[i])
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  # img = cv2.resize(img,(512,512))
  ax[i][0].imshow(img)
  ax[i][0].get_xaxis().set_visible(False)
  ax[i][0].get_yaxis().set_visible(False)
  ax[i][0].title.set_text("Cleaned Image")
  
  img = cv2.imread(sample['shabby_image_path'].iloc[i])
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  # img = cv2.resize(img,(512,512))
  ax[i][1].imshow(img)
  ax[i][1].get_xaxis().set_visible(False)
  ax[i][1].get_yaxis().set_visible(False)
  ax[i][1].title.set_text("Shabby Image")

Observation : One can see, there is significant amount of noise in the nosiy images and the ground truth images shows the corresponding clean images free from noise.

In [None]:
from patchify import patchify, unpatchify
# func to create patches 
def patches(img,patch_size):
  patches = patchify(img, (patch_size, patch_size, 3), step=patch_size)
  return patches

In [None]:
sample = pd.DataFrame({'cleaned_image_path':['a','denoising-shabby-pages/train/cleaned/0001-USPS-dmm300_608.pdf-15.png'], 'shabby_image_path':['b','denoising-shabby-pages/train/shabby/0001-USPS-dmm300_608.pdf-15.png']})

In [None]:
sample


In [None]:
#Creating patches for a Ground Truth Image of the specified sample
path = sample['cleaned_image_path'].iloc[1]
img = cv2.imread(path)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
print('Image shape: {}'.format(img.shape))

patches_gt = patches(img,100)
print('Patch shape: {}'.format(patches_gt.shape))

In [None]:
#Creating patches for a Noisy Image
path = sample['shabby_image_path'].iloc[1]
img = cv2.imread(path)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
print('Image shape: {}'.format(img.shape))

patches_nsy = patches(img,100)
print('Patch shape: {}'.format(patches_nsy.shape))

In [None]:
rows = patches_nsy.shape[0]
cols = patches_nsy.shape[1]
fig, axs = plt.subplots(rows,cols,figsize=(20,10))
for i in range(rows):
  for j in range(cols):
    axs[i][j].imshow(patches_nsy[i][j][0])
    axs[i][j].get_xaxis().set_visible(False)
    axs[i][j].get_yaxis().set_visible(False)

This is what patches does. It splits the images into different patches based on a given patch size.

In [None]:
# further split the dataframe into shabby and cleaned for X, y respectively
X_train_dataframe = train_df[["shabby_image_path"]]
y_train_dataframe = train_df[["cleaned_image_path"]]

X_validate_dataframe = validate_df[["shabby_image_path"]]
y_validate_dataframe = validate_df[["cleaned_image_path"]]

X_test_dataframe = test_df[["shabby_image_path"]]
y_test_dataframe = test_df[["cleaned_image_path"]]

In [None]:
X_test_dataframe

In [None]:
y_test_dataframe

In [None]:
y_train_dataframe

We will be creating patches for all the images. Experiments have shown that splitting images into patches and using these patches for training improve model performance in denoising.
In regard to that, we will resize all the images to a fixed size of 1024 x 1024 and create patches with patch size 256 x 256.
The patches are extracted from resized images and will be used for testing a denoising model.

In [None]:
#Creating patches for X_train and y_train
X_test_patches = []
y_test_patches = []
for i in range(len(X_test_dataframe)):
  path = X_test_dataframe.at[i, "shabby_image_path"]
  img_nsy = cv2.imread(path)
  img_nsy = cv2.cvtColor(img_nsy, cv2.COLOR_BGR2RGB)
  img_nsy = cv2.resize(img_nsy,(1024,1024))  #resizing the X_test images
  patches_nsy = patches(img_nsy,256)
  
  path = y_test_dataframe.at[i, "cleaned_image_path"]
  img_gt = cv2.imread(path)
  img_gt = cv2.cvtColor(img_gt, cv2.COLOR_BGR2RGB)
  img_gt = cv2.resize(img_gt,(1024,1024))  #resizing the y_test images
  patches_gt = patches(img_gt,256)

  rows = patches_nsy.shape[0]
  cols = patches_nsy.shape[1]
  for j in range(rows):
    for k in range(cols):
      X_test_patches.append(patches_nsy[j][k][0])
      y_test_patches.append(patches_gt[j][k][0])
  
X_test_dataframe = np.array(X_test_patches)
y_test_dataframe = np.array(y_test_patches)

In [None]:
ls denoising-shabby-pages/train/cleaned/.ipynb_checkpoints

In [None]:
#Creating patches for X_test and y_test
X_train_patches = []
y_train_patches = []
for i in range(len(X_train_dataframe)):
  path = X_train_dataframe.at[i, "shabby_image_path"]
  img_gt = cv2.imread(path)
  if img_gt is None:
        print(f"Error loading image: {path}")
  else:
    # Proceed with further operations

      img_nsy = cv2.imread(path)
      img_nsy = cv2.cvtColor(img_nsy, cv2.COLOR_BGR2RGB)
      img_nsy = cv2.resize(img_nsy,(1024,1024))  #resizing the X_train images
      patches_nsy = patches(img_nsy,256)

  path = y_train_dataframe.at[i, "cleaned_image_path"]
  img_gt = cv2.imread(path)
  if img_gt is None:
        print(f"Error loading image: {path}")
  else:
      img_gt = cv2.imread(path)
      img_gt = cv2.cvtColor(img_gt, cv2.COLOR_BGR2RGB)
      img_gt = cv2.resize(img_gt,(1024,1024))  #resizing the y_train images
      patches_gt = patches(img_gt,256)

  rows = patches_nsy.shape[0]
  cols = patches_nsy.shape[1]
  for j in range(rows):
     for k in range(cols):
          X_train_patches.append(patches_nsy[j][k][0])
          y_train_patches.append(patches_gt[j][k][0])

X_train_dataframe = np.array(X_train_patches)
y_train_dataframe = np.array(y_train_patches)

In [None]:
#Creating patches for X_validate and y_validate
X_valid_patches = []
y_valid_patches = []
for i in range(len(X_validate_dataframe)):
  path = X_validate_dataframe.at[i, "shabby_image_path"]
  img_nsy = cv2.imread(path)
  img_nsy = cv2.cvtColor(img_nsy, cv2.COLOR_BGR2RGB)
  img_nsy = cv2.resize(img_nsy,(1024,1024))  #resizing the X_validate images
  patches_nsy = patches(img_nsy,256)
  
  path = y_validate_dataframe.at[i, "cleaned_image_path"]
  img_gt = cv2.imread(path)
  img_gt = cv2.cvtColor(img_gt, cv2.COLOR_BGR2RGB)
  img_gt = cv2.resize(img_gt,(1024,1024))  #resizing the y_validate images
  patches_gt = patches(img_gt,256)

  rows = patches_nsy.shape[0]
  cols = patches_nsy.shape[1]
  for j in range(rows):
    for k in range(cols):
      X_valid_patches.append(patches_nsy[j][k][0])
      y_valid_patches.append(patches_gt[j][k][0])
  
X_validate_dataframe = np.array(X_valid_patches)
y_validate_dataframe = np.array(y_valid_patches)

In [None]:
print(X_train_dataframe.shape)
print(y_train_dataframe.shape)
print(X_test_dataframe.shape)
print(y_test_dataframe.shape)
print(X_validate_dataframe.shape)
print(y_validate_dataframe.shape)

plot patches for both clean and shabby images 

In [None]:
import random
fig, axs = plt.subplots(2,5,figsize=(20,10))
r = random.sample(range(0, 6911), 5)

fig.suptitle('Train Image Patches',fontweight ="bold")
for i in range(5):
  axs[0][i].imshow(y_train_dataframe[r[i]])
  axs[0][i].set_title('Ground Truth Image Patches')
  axs[1][i].imshow(X_train_dataframe[r[i]])
  axs[1][i].set_title('Noisy Image Patches')

In [None]:
print("Total number of image patches on train data : ", len(X_train_dataframe))
print("Total number of image patches on test data : ", len(X_test_dataframe))
print("Total number of image patches on validate data : ", len(X_validate_dataframe))

In [None]:
mean_red_gt = []
mean_blue_gt = []
mean_green_gt = []
mean_red_nsy = []
mean_blue_nsy = []
mean_green_nsy = []
for path in test_df['cleaned_image_path']:
  img = cv2.imread(path)
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  mean_red_gt.append(np.mean(img[:,:,0]))
  mean_green_gt.append(np.mean(img[:,:,1]))
  mean_blue_gt.append(np.mean(img[:,:,2]))

for path in test_df['shabby_image_path']:
  img = cv2.imread(path)
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  mean_red_nsy.append(np.mean(img[:,:,0]))
  mean_green_nsy.append(np.mean(img[:,:,1]))
  mean_blue_nsy.append(np.mean(img[:,:,2]))

In [None]:

red_gt = pd.DataFrame()
green_gt = pd.DataFrame()
blue_gt = pd.DataFrame()
red_nsy = pd.DataFrame()
green_nsy = pd.DataFrame()
blue_nsy = pd.DataFrame()

red_gt['Mean Pixel on Ground Truth Images'] = mean_red_gt
red_gt['channel'] = 'red'
red_nsy['Mean Pixel on  Noisy Images'] = mean_red_nsy
red_nsy['channel'] = 'red'

green_gt['Mean Pixel on Ground Truth Images'] = mean_green_gt
green_gt['channel'] = 'green'
green_nsy['Mean Pixel on  Noisy Images'] = mean_green_nsy
green_nsy['channel'] = 'green'

blue_gt['Mean Pixel on Ground Truth Images'] = mean_blue_gt
blue_gt['channel'] = 'blue'
blue_nsy['Mean Pixel on  Noisy Images'] = mean_blue_nsy
blue_nsy['channel'] = 'blue'

concat_gt = pd.concat([red_gt,green_gt,blue_gt],ignore_index=True)
concat_nsy = pd.concat([red_nsy,green_nsy,blue_nsy],ignore_index=True)

In [None]:
# Distribution of mean pixels of images
import seaborn as sns

fig, axes = plt.subplots(3,2,figsize=(16, 16))
fig.suptitle("Ground Truth Images", fontsize = 'x-large' , fontweight = 'bold' )
sns.histplot(mean_red_gt,ax=axes[0][0],color='r')
sns.distplot(mean_red_gt,ax=axes[0][1],hist=False,color='r')
axes[0][0].set_xlabel('Mean Pixels')
axes[0][1].set_xlabel('Mean Pixels')

sns.histplot(mean_green_gt,ax=axes[1][0],color='g')
sns.distplot(mean_green_gt,ax=axes[1][1],hist=False,color='g')
axes[1][0].set_xlabel('Mean Pixels')
axes[1][1].set_xlabel('Mean Pixels')

sns.histplot(mean_blue_gt,ax=axes[2][0],color='b')
sns.distplot(mean_blue_gt,ax=axes[2][1],hist=False,color='b')
axes[2][0].set_xlabel('Mean Pixels')
axes[2][1].set_xlabel('Mean Pixels')

Obsevations: for most of the clean images, the mean pixel values ranges between 210 to 250. This means, most of the images have dark to medium brightness. Only few images have high mean pixel values or high brightness.

In [None]:
fig, axes = plt.subplots(3,2,figsize=(16, 16))
fig.suptitle("Noisy Images", fontsize = 'x-large' , fontweight = 'bold' )
sns.histplot(mean_red_nsy,ax=axes[0][0],color='r')
sns.distplot(mean_red_nsy,ax=axes[0][1],hist=False,color='r')
axes[0][0].set_xlabel('Mean Pixels')
axes[0][1].set_xlabel('Mean Pixels')

sns.histplot(mean_green_nsy,ax=axes[1][0],color='g')
sns.distplot(mean_green_nsy,ax=axes[1][1],hist=False,color='g')
axes[1][0].set_xlabel('Mean Pixels')
axes[1][1].set_xlabel('Mean Pixels')

sns.histplot(mean_blue_nsy,ax=axes[2][0],color='b')
sns.distplot(mean_blue_nsy,ax=axes[2][1],hist=False,color='b')
axes[2][0].set_xlabel('Mean Pixels')
axes[2][1].set_xlabel('Mean Pixels')

Obsevations: for most of the clean images, the mean pixel values ranges between 210 to 250. This means, most of the images have dark to medium brightness. Only few images have high mean pixel values or high brightness.

In [None]:
fig, axes = plt.subplots(3,2,figsize=(16, 16))
fig.suptitle("Noisy Images", fontsize = 'x-large' , fontweight = 'bold' )
sns.histplot(mean_red_nsy,ax=axes[0][0],color='r')
sns.distplot(mean_red_nsy,ax=axes[0][1],hist=False,color='r')
axes[0][0].set_xlabel('Mean Pixels')
axes[0][1].set_xlabel('Mean Pixels')

sns.histplot(mean_green_nsy,ax=axes[1][0],color='g')
sns.distplot(mean_green_nsy,ax=axes[1][1],hist=False,color='g')
axes[1][0].set_xlabel('Mean Pixels')
axes[1][1].set_xlabel('Mean Pixels')

sns.histplot(mean_blue_nsy,ax=axes[2][0],color='b')
sns.distplot(mean_blue_nsy,ax=axes[2][1],hist=False,color='b')
axes[2][0].set_xlabel('Mean Pixels')
axes[2][1].set_xlabel('Mean Pixels')

Obsevations: for most of the clean images, the mean pixel values ranges between 150 to 250. This means, most of the images have dark to medium brightness. Only few images have high mean pixel values or high brightness.

# Analyzing the PSNR and SSIM values of the images
The Peak Signal-to-Noise Ratio (PSNR) and Structural Similarity Index (SSIM) are both widely used metrics for evaluating the quality of images, particularly in the context of image compression and restoration.
PSNR measures the ratio between the maximum possible power of a signal (in this case, an image) and the power of corrupting noise that affects the fidelity of its representation.
Higher PSNR values indicate higher image quality
SSIM is designed to measure the structural similarity between two images, considering luminance, contrast, and structure. Unlike PSNR, SSIM aims to reflect perceived image quality more accurately.
 It outputs a value between -1 and 1, where 1 indicates perfect similarity.

In [None]:
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

from tqdm import tqdm
SSIM = [];PSNR = [];
for i in tqdm(range(len(train_df))):
  path = train_df['cleaned_image_path'].iloc[i]
  img1 = cv2.imread(path)
  img1 = img1.astype("float32") / 255.0
  path = train_df['shabby_image_path'].iloc[i]
  img2 = cv2.imread(path)
  img2 = img2.astype("float32") / 255.0
  window_size = 3
  SSIM.append(ssim(img1,img2,multichannel=True,win_size=window_size, data_range=img2.max() - img2.min()))
  PSNR.append(psnr(img1,img2))

In [None]:
ax = sns.displot(PSNR,kind='kde')
ax.set(xlabel='PSNR', ylabel='Density')
ax = sns.displot(PSNR)
ax.set(xlabel='PSNR', ylabel='Count')

Observations : Majority of the clean-noisy image pairs have PSNR value between 10-15. So, a good denoising model should give PSNR value greater than 15 (approx) for majority of the images.

In [None]:

ax = sns.displot(SSIM,kind='kde')
ax.set(xlabel='SSIM', ylabel='Density')
ax = sns.displot(SSIM)
ax.set(xlabel='SSIM', ylabel='Count')

Observations : Majority of the clean-noisy image pairs have SSIM value between 0.5-0.8. So, a good denoising model should give SSIM value greater than 0.8 (approx) for majority of the images.

# Denoising few image patches using NLM filter

The basic idea behind the non-local means algorithm is to exploit redundancy in natural images. Instead of averaging pixel values within a local neighborhood as traditional filtering methods do, NLM looks for similar patches throughout the entire image. It averages the pixel values of similar patches, giving more weight to patches that are more similar to the one being denoised.
Overall, the NLM denoising technique works by comparing small patches in the image to find similar patches across the entire image and averaging their pixel values to reduce noise while preserving details. The specific parameter values you choose will depend on your preferences and the characteristics of your images.


Here's a simplified explanation of how the algorithm works:

Patch Search: For each pixel in the image, a search is conducted to find similar patches. Similarity is usually measured using a distance metric, often based on the Euclidean distance between pixel values.

Patch Weighting: Once similar patches are found, their pixel values are weighted based on their similarity to the current patch. Patches that are more similar have higher weights.

Weighted Averaging: The pixel values of the similar patches are averaged, with more weight given to patches that are more similar. This averaging process reduces noise while preserving the underlying structure of the image.

Reconstruction: The weighted averages of the pixel values are used to replace the noisy pixel values in the image, resulting in a denoised version of the image.

In [None]:
import matplotlib.pyplot as plt
import random

# Assuming patches_nsy and patches_gt are your arrays of noisy and ground truth image patches
# rows and cols are the dimensions of your patches arrays

num_samples = 10  # Number of random image pairs to visualize and analyze

fig, axs = plt.subplots(num_samples, 3, figsize=(15, 5 * num_samples))

PSNR_nsy=[]
PSNR_de_nsy=[]

for i in range(num_samples):
    r = random.randint(0, rows - 1)
    c = random.randint(0, cols - 1)

    img1 = patches_nsy[r][c][0]
    img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2RGB)
    img2 = patches_gt[r][c][0]
    img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2RGB)

    axs[i][0].imshow(img1)
    axs[i][0].set_title('Noisy Image Patch')

    axs[i][1].imshow(img2)
    axs[i][1].set_title('Ground Truth Image Patch')

    # NLM Denoising
    """ img1: The input noisy image that you want to denoise.
        None: The optional output destination. In this case, None indicates that the output will be returned by the function.
        50: The parameter h (h for "hazardousness") determines the filter strength. Higher values will remove noise effectively but might also remove image details.
        Lower values will preserve details but might not remove noise as well.
        10: The parameter hForColorComponents is similar to h but for color images. It specifies the strength of the filter for color channels.
        A lower value will preserve more color details.
        7: The size of the window used for searching similar patches. Larger values can remove noise more effectively but might remove finer details.
        21: The size of the window used for averaging similar patches. Larger values will yield smoother results but might blur important details. """
    dst = cv2.fastNlMeansDenoisingColored(img1, None, 50, 10, 7, 21)
    axs[i][2].imshow(dst)
    axs[i][2].set_title('Denoised Image Patch')

    # Calculate and print PSNR values
    PSNR_nsy.append(psnr(img1,img2))
    PSNR_de_nsy.append(psnr(img1,dst))
    print(f"Pair {i+1} - PSNR value between Noisy and Ground Truth patches:", PSNR_nsy)
    print(f"Pair {i+1} - PSNR value between Noisy and Denoised patches:", PSNR_de_nsy)

plt.tight_layout()
plt.show()

In [None]:
improvement = [x1 - x2 for (x1, x2) in zip(PSNR_de_nsy, PSNR_nsy)]
from prettytable import PrettyTable
x = PrettyTable()
x.add_column("PSNR before denoising",PSNR_nsy)
x.add_column("PSNR after denoising",PSNR_de_nsy)
x.add_column("PSNR Improvement",improvement)
print(x)
     

Observations:
As you can see, the NLM filter is able to denoise the images to some extent. But it smoothens many details that are present in the ground truth images leading to loss of important informations that should have been retained. Also, when noise is too high NLM fails to provide good results.

Thus we can conclude, there is a need of using more advanced deep learning techniques for image denoising tasks.

Creating Dataset for modeling using custom data generators in Keras

In [None]:
print(X_train_dataframe.shape)
print(y_train_dataframe.shape)
print(X_test_dataframe.shape)
print(y_test_dataframe.shape)
print(X_validate_dataframe.shape)
print(y_validate_dataframe.shape)

In [None]:
#Normalizing the image pixels
X_train_dataframe = X_train_dataframe.astype("float32") / 255.0
y_train_dataframe = y_train_dataframe.astype("float32") / 255.0
X_test_dataframe = X_test_dataframe.astype("float32") / 255.0
y_test_dataframe = y_test_dataframe.astype("float32") / 255.0
X_validate_dataframe = X_validate_dataframe.astype("float32") / 255.0
y_validate_dataframe = y_validate_dataframe.astype("float32") / 255.0


This code snippet defines a custom data loader class that efficiently loads batches of data and labels for training, validation, and testing purposes. This class is useful when working with large datasets that cannot be loaded into memory all at once. It ensures that neural network training processes can iterate over the data efficiently in a batch-wise manner.

In [None]:
import tensorflow as tf
# This class inherits from the Keras Sequence class, which is used to work with datasets that are too large to fit in memory at once.
class Dataloder(tf.keras.utils.Sequence): 
    # The constructor initializes the data loader object
    def __init__(self, X,y,batch_size=1, shuffle=False):
        # The input data (features)
        self.X = X
        # The corresponding labels
        self.y = y
        self.batch_size = batch_size
        self.shuffle = shuffle
        # An array containing the indices of the data samples
        self.indexes = np.arange(len(X))
    # This method is used to retrieve batches of data and labels given an index i.
    # It slices the input data and labels arrays to create a batch of size batch_size for both data and labels.
    # It then returns a tuple containing the batch of input data and the corresponding batch of labels.
    #. the output: (Data: [1 2 3 4 5], Labels: [1 4 9 16 25])
    def __getitem__(self, i):
        # collect batch data
        batch_x = self.X[i * self.batch_size : (i+1) * self.batch_size]
        batch_y = self.y[i * self.batch_size : (i+1) * self.batch_size]
        
        return tuple((batch_x,batch_y))
    # This method returns the number of batches in the data loader
    def __len__(self):
        return len(self.indexes) // self.batch_size
    
    def on_epoch_end(self):
        if self.shuffle:
            self.indexes = np.random.permutation(self.indexes)

In [None]:
batch_size=32
train_dataloader = Dataloder(X_train_dataframe,y_train_dataframe, batch_size, shuffle=True)
validate_dataloader = Dataloder(X_validate_dataframe,y_validate_dataframe,batch_size, shuffle=True)
test_dataloader = Dataloder(X_test_dataframe,y_test_dataframe,batch_size, shuffle=True)

In [None]:
train_dataloader[0][0].shape

Baseline Model : Autoencoder
This is a simple encoder decoder network with 3 convolutional layers followed by max pooling for encoders and 3 deconvolutional layers for decoders. The output from decoder is then given to a convolutional layer with 3 filters to maintain the similar input and output shape.

In [None]:
from tensorflow.keras.callbacks import LearningRateScheduler,ReduceLROnPlateau
from tensorflow.keras import models, layers
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, BatchNormalization, Activation, Flatten, Dense, Input, MaxPooling2D, Add, Reshape, concatenate, AveragePooling2D, Multiply, GlobalAveragePooling2D, UpSampling2D, MaxPool2D,Softmax
from tensorflow.keras.activations import softmax
from tensorflow.keras import initializers, regularizers
from tensorflow.keras.optimizers import Adam
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

In [None]:
#https://keras.io/examples/vision/autoencoder/
tf.keras.backend.clear_session()
input = Input(shape=(256, 256, 3))

# Encoder
x = Conv2D(32, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(input)
x = MaxPooling2D((2, 2), padding="same")(x)
x = Conv2D(64, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(x)
x = MaxPooling2D((2, 2), padding="same")(x)
x = Conv2D(128, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(x)
x = MaxPooling2D((2, 2), padding="same")(x)

# Decoder
x = Conv2DTranspose(128, (3, 3), strides=2, activation="relu", kernel_initializer='he_normal',padding="same")(x)
x = Conv2DTranspose(64, (3, 3), strides=2, activation="relu",kernel_initializer='he_normal', padding="same")(x)
x = Conv2DTranspose(32, (3, 3), strides=2, activation="relu", kernel_initializer='he_normal',padding="same")(x)
x = Conv2D(3, (3, 3), activation="sigmoid", kernel_initializer='he_normal',padding="same")(x)

# Autoencoder
autoencoder = Model(input, x)
autoencoder.compile(optimizer=tf.keras.optimizers.Adam(1e-03), loss=tf.keras.losses.MeanSquaredError())
autoencoder.summary()

In [None]:
# # Specifies the directory where TensorBoard logs will be stored. TensorBoard is a tool that helps visualize training metrics, model architectures, and more.
# log_dir = "logs/model_1"
# tensorboard = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_grads=True)
# # The ReduceLROnPlateau technique, which adjusts the learning rate (LR) when the model's validation loss plateaus
# reducelr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, verbose=1, patience=2)
# callback = [tensorboard, reducelr]
# autoencoder.fit(train_dataloader, shuffle=True, epochs=15, validation_data=test_dataloader, callbacks=callback)

In [None]:
# autoencoder.save('autoencoder.h5')

In [None]:

autoencoder =  tf.keras.models.load_model('autoencoder.h5')


prediction_tflite is tailored to work with quantized TFLite models, while prediction is designed for general Keras models. The choice between the two functions depends on the type of model you have trained and intend to use for denoising.

In [None]:

#Custom function to get denoised image prediction for noisy images
def prediction(img,model):
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  img = cv2.resize(img,(1024,1024))
  img = img.astype("float32") / 255.0

  img_patches = patches(img,256)

  nsy=[]
  for i in range(4):
    for j in range(4):
      nsy.append(img_patches[i][j][0])
  nsy = np.array(nsy)

  pred_img = model.predict(nsy)
  pred_img = np.reshape(pred_img,(4,4,1,256,256,3))
  pred_img = unpatchify(pred_img, img.shape)
  return pred_img


In [None]:

#Custom function to get denoised image prediction for noisy images on quantized models using tflite
def prediction_tflite(img,model):
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  img = cv2.resize(img,(1024,1024))
  img = img.astype("float32") / 255.0

  img_patches = patches(img,256)

  nsy=[]
  for i in range(4):
    for j in range(4):
      nsy.append(img_patches[i][j][0])
  nsy = np.array(nsy)
  pred=[]
  for patch in nsy:
    model.set_tensor(input_details[0]['index'], tf.expand_dims(patch,axis=0))
    model.invoke()
    tflite_model_predictions = model.get_tensor(output_details[0]['index'])
    pred.append(tflite_model_predictions)

  pred_img = np.reshape(pred,(4,4,1,256,256,3))
  pred_img = unpatchify(pred_img, img.shape)
  return pred_img


In [None]:
X_test_dataframe

In [None]:

#Custom function to plot/visualize noisy, ground truth and predicted images
def visualize(sample,model):
  fig,ax = plt.subplots(len(sample),3,figsize=(30,30))
  for i in range(len(sample)):
    path = sample['cleaned_image_path'].iloc[i]
    test_img_gt = cv2.imread(path)
    test_img_gt = cv2.cvtColor(test_img_gt, cv2.COLOR_BGR2RGB)
    test_img_gt = cv2.resize(test_img_gt,(512,512))
    test_img_gt = test_img_gt.astype("float32") / 255.0
  
    path = sample['shabby_image_path'].iloc[i]
    test_img_nsy = cv2.imread(path)
    pred_img = prediction(test_img_nsy,model)
    pred_img = cv2.resize(pred_img,(512,512))

    test_img_nsy = cv2.cvtColor(test_img_nsy, cv2.COLOR_BGR2RGB)
    test_img_nsy = cv2.resize(test_img_nsy,(512,512))
    test_img_nsy = test_img_nsy.astype("float32") / 255.0
    
    ax[i][0].imshow(test_img_nsy)
    ax[i][0].get_xaxis().set_visible(False)
    ax[i][0].get_yaxis().set_visible(False)
    ax[i][0].title.set_text("Noisy Image")

    ax[i][1].imshow(test_img_gt)
    ax[i][1].get_xaxis().set_visible(False)
    ax[i][1].get_yaxis().set_visible(False)
    ax[i][1].title.set_text("Ground Truth Image")

    ax[i][2].imshow(pred_img)
    ax[i][2].get_xaxis().set_visible(False)
    ax[i][2].get_yaxis().set_visible(False)
    ax[i][2].title.set_text("Predicted Image")

In [None]:
sample = pd.DataFrame({'cleaned_image_path': ['denoising-shabby-pages/train/cleaned/0001-USPS-dmm300_608.pdf-15.png',  'denoising-shabby-pages/train/cleaned/0001-USPS-dmm300_608.pdf-18.png', 'denoising-shabby-pages/train/cleaned/0002-HHS-ocse_eiwo_paperless_solution_presentation.pdf-01.png'], 'shabby_image_path': ['denoising-shabby-pages/train/shabby/0001-USPS-dmm300_608.pdf-15.png', 'denoising-shabby-pages/train/shabby/0001-USPS-dmm300_608.pdf-18.png', 'denoising-shabby-pages/train/shabby/0002-HHS-ocse_eiwo_paperless_solution_presentation.pdf-01.png']})

In [None]:
visualize(sample,autoencoder)


In [None]:
test_df.head(5)

In [None]:
def psnr_and_ssim(test_df, model, model_type='Normal'):
    psnr_nsy = 0.0
    psnr_de_nsy = 0.0
    ssim_nsy = 0.0
    ssim_de_nsy = 0.0
    
    for i in range(len(test_df)):
        # Getting the noisy image path
        nsy_path = test_df['shabby_image_path'].iloc[i]

        # Load the noisy image
        nsy = cv2.imread(nsy_path)
        if nsy is None:
            print(f"Error loading noisy image: {nsy_path}")
            continue  # Skip this iteration and move to the next
        
        # Getting the predicted images
        if model_type == 'Quantized': 
            pred = prediction_tflite(nsy, model)
        else:
            pred = prediction(nsy, model)

        # Getting the ground truth image data
        gt_path = test_df['cleaned_image_path'].iloc[i]
        gt = cv2.imread(gt_path)
        if gt is None:
            print(f"Error loading ground truth image: {gt_path}")
            continue  # Skip this iteration and move to the next
        gt = cv2.cvtColor(gt, cv2.COLOR_BGR2RGB)

        # Resizing the images
        gt = cv2.resize(gt, (1024, 1024))
        nsy = cv2.resize(nsy, (1024, 1024))

        # Normalizing the images
        gt = gt.astype("float32") / 255.0
        nsy = nsy.astype("float32") / 255.0

        # Computing PSNR and SSIM for test images
        psnr_nsy += psnr(gt, nsy)
        psnr_de_nsy += psnr(gt, pred)
        ssim_nsy += ssim(gt, nsy, multichannel=True, data_range=nsy.max() - nsy.min())
        ssim_de_nsy += ssim(gt, pred, multichannel=True, data_range=pred.max() - pred.min())

    psnr_nsy = psnr_nsy / len(test_df)
    psnr_de_nsy = psnr_de_nsy / len(test_df)
    ssim_nsy = ssim_nsy / len(test_df)
    ssim_de_nsy = ssim_de_nsy / len(test_df)
    return psnr_nsy, psnr_de_nsy, ssim_nsy, ssim_de_nsy

# Calculate and print PSNR and SSIM
psnr_nsy, psnr_de_nsy, ssim_nsy, ssim_de_nsy = psnr_and_ssim(test_df, autoencoder)
print('PSNR before denoising:', psnr_nsy)
print('PSNR after denoising:', psnr_de_nsy)
print('SSIM before denoising:', ssim_nsy)
print('SSIM after denoising:', ssim_de_nsy)


In [None]:
model_size = round((os.stat('autoencoder.h5').st_size)/(1024**2),3)
PSNR = [];SSIM = [];PSNR_imp = [];SSIM_imp = [];size=[]
PSNR.append(round(psnr_nsy,3))
PSNR.append(round(psnr_de_nsy,3))
PSNR_imp.append('-')
PSNR_imp.append(round(psnr_de_nsy-psnr_nsy,3))

SSIM.append(round(ssim_nsy,3))
SSIM.append(round(ssim_de_nsy,3))
SSIM_imp.append('-')
SSIM_imp.append(round(ssim_de_nsy-ssim_nsy,3))

size.append('-')
size.append(model_size)
     

# CBDNet


CDBNet, or Convolutional Denoising Bilinear Network, has a specific architecture designed to effectively denoise images. Here's an overview of how it is typically built:

1.Input Layer: The network takes a noisy image as input. This image contains the noise that needs to be removed.

2. Convolutional Layers: CDBNet starts with several convolutional layers. These layers apply filters to the input image, extracting features and patterns. These filters identify both the noise and the underlying content of the image.

3. Bilinear-Interpolated Image: Before reaching the Bilinear Fusion Layer, the input image goes through a process called bilinear interpolation. In this process, the noisy input image is upscaled to a higher resolution using interpolation techniques. This means that new pixel values are estimated based on the existing pixel values in the image. This upscaled image captures the high-frequency details present in the original image, even though it still contains noise.

4. Residual Blocks: Residual blocks are often included to capture complex relationships between image features. These blocks allow the network to learn residual information, which helps in the denoising process.

5. Skip Connections: Skip connections connect layers at different depths in the network. They allow information from earlier layers to bypass some layers and directly contribute to the final output. This helps prevent the vanishing gradient problem and aids in information flow.

6. Bilinear Fusion Layer: This layer combines the bilinear-interpolated image with the output of the residual blocks. It helps in integrating the high-frequency details from the interpolated image with the denoised features.

7. Convolutional Layers (again): After the fusion, additional convolutional layers are applied. These layers further refine the denoised image by processing the fused features.

8. Output Layer: The final output is the denoised image. It should ideally resemble the clean version of the image without the noise

In [None]:

#https://github.com/IDKiro/CBDNet-tensorflow/blob/dev/model.py
tf.keras.backend.clear_session()
input = Input(shape=(256, 256, 3))

#Noise estimation subnetwork
x = Conv2D(32, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(input)
x = Conv2D(32, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(x)
x = Conv2D(32, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(x)
x = Conv2D(32, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(x)
x = Conv2D(3, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(x)

#Non Blind denoising subnetwork
x = concatenate([x,input])
conv1 = Conv2D(64, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(x)
conv2 = Conv2D(64, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv1)

pool1 = AveragePooling2D(pool_size=(2,2),padding='same')(conv2)
conv3 = Conv2D(128, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(pool1)
conv4 = Conv2D(128, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv3)
conv5 = Conv2D(128, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv4)

pool2 = AveragePooling2D(pool_size=(2,2),padding='same')(conv5)
conv6 = Conv2D(256, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(pool2)
conv7 = Conv2D(256, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv6)
conv8 = Conv2D(256, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv7)
conv9 = Conv2D(256, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv8)
conv10 = Conv2D(256, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv9)
conv11 = Conv2D(256, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv10)

upsample1 = Conv2DTranspose(128, (3, 3), strides=2, activation="relu", kernel_initializer='he_normal',padding="same")(conv11)
add1 = Add()([upsample1,conv5])
conv12 = Conv2D(128, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(add1)
conv13 = Conv2D(128, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv12)
conv14 = Conv2D(128, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv13)

upsample2 = Conv2DTranspose(64, (3, 3), strides=2, activation="relu", kernel_initializer='he_normal',padding="same")(conv14)
add1 = Add()([upsample2,conv2])
conv15 = Conv2D(64, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(add1)
conv16 = Conv2D(64, (3, 3), activation="relu", kernel_initializer='he_normal',padding="same")(conv15)

out = Conv2D(3, (1,1), kernel_initializer='he_normal',padding="same")(conv16)
out = Add()([out,input])

CBDNet = Model(input,out)
CBDNet.compile(optimizer=tf.keras.optimizers.Adam(1e-03), loss=tf.keras.losses.MeanSquaredError())
CBDNet.summary()

In [None]:
# log_dir="logs/model_2"
# tensorboard = tf.keras.callbacks.TensorBoard(log_dir=log_dir,histogram_freq=1, write_graph=True,write_grads=True)
# reducelr = ReduceLROnPlateau(monitor='val_loss', factor=0.1,verbose=1,patience=4,min_delta=0.00001)
# callback = [tensorboard,reducelr]
# CBDNet.fit(train_dataloader,shuffle=True,epochs=30,validation_data= test_dataloader,callbacks=callback)

In [None]:
# CBDNet.save('CBDNet.h5')

In [None]:
CBDNet = tf.keras.models.load_model('CBDNet.h5')


In [None]:
visualize(sample,CBDNet)


In [None]:

psnr_nsy, psnr_de_nsy, ssim_nsy, ssim_de_nsy = psnr_and_ssim(test_df,CBDNet)
print('PSNR before denoising :', psnr_nsy)
print('PSNR after denoising :', psnr_de_nsy)
print('SSIM before denoising :', ssim_nsy)
print('SSIM after denoising :', ssim_de_nsy)


In [None]:
model_size = round((os.stat('CBDNet.h5').st_size)/(1024**2),3)
PSNR.append(round(psnr_de_nsy,3))
SSIM.append(round(ssim_de_nsy,3))
PSNR_imp.append(round(psnr_de_nsy-psnr_nsy,3))
SSIM_imp.append(round(ssim_de_nsy-ssim_nsy,3))
size.append(model_size)

#  PRIDNET
(Progressive Residual Illumination Denoising Network) is an advanced neural network architecture designed for image denoising. It incorporates multiple stages to effectively reduce noise while preserving image details

Noise Estimation Stage

In this initial stage, PRIDNet estimates the noise present in the input noisy image. Accurate noise estimation is crucial for effective denoising, as it helps the network understand the characteristics of the noise that needs to be removed. PRIDNet uses a specific module to estimate the noise level, which is then used to guide subsequent denoising processes.

In [None]:
#https://github.com/491506870/PRIDNet/blob/master/network.py
class convolutional_block1(tf.keras.layers.Layer):
    def __init__(self,filters,**kwargs):
        super().__init__(**kwargs)
        self.filters = filters 
        self.conv1 = Conv2D(filters = self.filters, kernel_size=3,activation='relu',padding='same',kernel_initializer='he_normal')
        self.conv2 = Conv2D(filters = self.filters, kernel_size=3,activation='relu',padding='same',kernel_initializer='he_normal')
        self.conv3 = Conv2D(filters = self.filters, kernel_size=3,activation='relu',padding='same',kernel_initializer='he_normal')
        self.conv4 = Conv2D(filters = self.filters, kernel_size=3,activation='relu',padding='same',kernel_initializer='he_normal')

    def get_config(self):
      config = super().get_config().copy()
      config.update({'filters': self.filters})
      return config

    def call(self, X):
        X = self.conv1(X)
        X = self.conv2(X)
        X = self.conv3(X)
        X = self.conv4(X)
        return X

In [None]:

class CAM(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.gap = GlobalAveragePooling2D()
    self.dense1 = Dense(units=2,activation='relu')
    self.dense2 = Dense(units=32,activation='sigmoid')

  def call(self, X):
    Y = self.gap(X)
    Y = self.dense1(Y)
    Y = self.dense2(Y)
    X = Multiply()([X,Y])
    return X

Multi Stage Denoising

PRIDNet employs a multi-stage approach to progressively refine the denoising process. Each denoising stage focuses on different aspects of noise reduction. The model uses multiple convolutional blocks, residual networks, or other architectural components to iteratively enhance the denoising quality. The intermediate outputs from these stages serve as inputs for subsequent stages, allowing the model to build upon its denoising performance.

In [None]:
class convolutional_block2(tf.keras.layers.Layer):
    def __init__(self,filters,**kwargs):
        super().__init__(**kwargs)
        self.filters = filters 
        self.conv1 = Conv2D(filters = self.filters, kernel_size=3,activation='relu',padding='same',kernel_initializer='he_normal')
        self.conv2 = Conv2D(filters = self.filters, kernel_size=3,activation='relu',padding='same',kernel_initializer='he_normal')
        self.conv3 = Conv2D(filters = self.filters, kernel_size=3,activation='relu',padding='same',kernel_initializer='he_normal')

    def get_config(self):
        config = super().get_config().copy()
        config.update({'filters': self.filters})
        return config

    def call(self, X):
        X = self.conv1(X)
        X = self.conv2(X)
        X = self.conv3(X)
        return X

In [None]:
     

class pyramid(tf.keras.layers.Layer):
  def __init__(self,pool_size, upsample_size, **kwargs):
    super().__init__(**kwargs)
    self.pool_size = pool_size
    self.upsample_size = upsample_size
    self.upsample = UpSampling2D(self.upsample_size, interpolation='bilinear')
    self.pool = AveragePooling2D(pool_size=(self.pool_size,self.pool_size))
    
    self.conv1 = convolutional_block1(filters=32)
    self.maxpool1 = MaxPool2D(pool_size=[2, 2], padding='same')

    self.conv2 = convolutional_block1(filters=64)
    self.maxpool2 = MaxPool2D(pool_size=[2, 2], padding='same')

    self.conv3 = convolutional_block1(filters=128)
    self.maxpool3 = MaxPool2D(pool_size=[2, 2], padding='same')

    self.conv4 = convolutional_block1(filters=256)
    self.maxpool4 = MaxPool2D(pool_size=[2, 2], padding='same')

    self.conv5 = convolutional_block1(filters=512)
    
    self.upsample1 = Conv2DTranspose(256, (3, 3), strides=2,kernel_initializer='he_normal',padding="same")
    self.conv6 = convolutional_block2(filters=256)

    self.upsample2 = Conv2DTranspose(128, (3, 3), strides=2,kernel_initializer='he_normal',padding="same")
    self.conv7 = convolutional_block2(filters=128)
    
    self.upsample3 = Conv2DTranspose(64, (3, 3), strides=2,kernel_initializer='he_normal',padding="same")
    self.conv8 = convolutional_block2(filters=64)

    self.upsample4 = Conv2DTranspose(32, (3, 3), strides=2,kernel_initializer='he_normal',padding="same")
    self.conv9 = convolutional_block2(filters=32)

    self.conv10 = Conv2D(filters = 3, kernel_size=1,padding='same',kernel_initializer='he_normal')

  def get_config(self):
    config = super().get_config().copy()
    config.update({'pool_size': self.pool_size,'upsample_size':self.upsample_size})
    return config

  def call(self, input):
    conv1 = self.pool(input)
    
    conv1 = self.conv1(conv1)
    pool1 = self.maxpool1(conv1)
    
    conv2 = self.conv2(pool1)
    pool2 = self.maxpool2(conv2)

    conv3 = self.conv3(pool2)
    pool3 = self.maxpool3(conv3)

    conv4 = self.conv4(pool3)
    pool4 = self.maxpool4(conv4)

    conv5 = self.conv5(pool4)

    up1 = self.upsample1(conv5)
    concat1 = concatenate([up1,conv4])
    conv6 = self.conv6(concat1)

    up2 = self.upsample2(conv6)    
    concat2 = concatenate([up2,conv3])
    conv7 = self.conv7(concat2)


    up3 = self.upsample3(conv7)
    concat3 = concatenate([up3,conv2])
    conv8 = self.conv8(concat3)

    up4 = self.upsample4(conv8)
    concat4 = concatenate([up4,conv1])
    conv9 = self.conv9(concat4)
 
    conv10 = self.conv10(conv9)
    out = self.upsample(conv10)

    return out

Feature Fusion Stage

After the multiple denoising stages, PRIDNet incorporates a feature fusion mechanism. This stage combines the denoised features extracted from different stages, often using skip connections or concatenation. The feature fusion step aims to gather the benefits of denoising at various stages, effectively capturing diverse aspects of noise and image content.

In [None]:

class KSM(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.conv1 = Conv2D(filters = 21, kernel_size=3,activation='relu',padding='same',kernel_initializer='he_normal')
    self.conv2 = Conv2D(filters = 21, kernel_size=5,activation='relu',padding='same',kernel_initializer='he_normal')
    self.conv3 = Conv2D(filters = 21, kernel_size=7,activation='relu',padding='same',kernel_initializer='he_normal')
    self.gap = GlobalAveragePooling2D()
    self.fc1 = Dense(units=2,activation='relu')
    self.fc2_1 = Dense(units=21)
    self.fc2_2 = Dense(units=21)
    self.fc2_3 = Dense(units=21)

  def call(self, input):
    conv1 = self.conv1(input)
    conv2 = self.conv2(input)
    conv3 = self.conv3(input)
    sum = Add()([conv1,conv2,conv3])
    gap =  self.gap(sum)
    gap = tf.reshape(gap, [-1, 1, 1, 21])
    fc1 = self.fc1(gap)
    a1 = self.fc2_1(fc1)
    a2 = self.fc2_2(fc1)
    a3 = self.fc2_3(fc1)

    before_softmax = concatenate([a1, a2, a3],1)
    after_softmax = softmax(before_softmax,axis=1)

    a1 = after_softmax[:, 0, :, :]
    a1 = tf.reshape(a1, [-1, 1, 1, 21])

    a2 = after_softmax[:, 1, :, :]
    a2 = tf.reshape(a2, [-1, 1, 1, 21])

    a3 = after_softmax[:, 2, :, :]
    a3 = tf.reshape(a3, [-1, 1, 1, 21])

    out1 = Multiply()([a1,conv1])
    out2 = Multiply()([a2,conv2])
    out3 = Multiply()([a3,conv3])
    out = Add()([out1,out2,out3])
    return out      

Model Creation

In [None]:
tf.keras.backend.clear_session()
input = Input(shape=(256, 256, 3))

C1 = convolutional_block1(filters=32)(input)
cam = CAM()(C1)
C2 = Conv2D(filters = 3, kernel_size=3,activation='relu',padding='same',kernel_initializer='he_normal')(cam)
concat1 = concatenate([C2,input])

p1 = pyramid(pool_size=1,upsample_size=1)(concat1)
p2 = pyramid(pool_size=2,upsample_size=2)(concat1)
p3 = pyramid(pool_size=4,upsample_size=4)(concat1)
p4 = pyramid(pool_size=8,upsample_size=8)(concat1)
p5 = pyramid(pool_size=16,upsample_size=16)(concat1)

concat2 = concatenate([p1,p2,p3,p4,p5,concat1])
ksm = KSM()(concat2)
out = Conv2D(filters = 3, kernel_size=1,padding='same',kernel_initializer='he_normal')(ksm)

PRIDNet = Model(input,out)
PRIDNet.compile(optimizer=tf.keras.optimizers.Adam(1e-03), loss=tf.keras.losses.MeanSquaredError())
PRIDNet.summary()



In [None]:
# log_dir="logs/model_3"
# tensorboard = tf.keras.callbacks.TensorBoard(log_dir=log_dir,histogram_freq=1, write_graph=True,write_grads=True)
# reducelr = ReduceLROnPlateau(monitor='val_loss', factor=0.1,verbose=1,patience=3,min_delta=0.00001)
# callback = [tensorboard,reducelr]
# PRIDNet.fit(train_dataloader,shuffle=True,epochs=30,validation_data= test_dataloader,callbacks=callback)

In [None]:
# PRIDNet.save('PRIDNet.h5')

In [None]:
PRIDNet = tf.keras.models.load_model('PRIDNet.h5',custom_objects={'convolutional_block1':convolutional_block1, 'CAM':CAM,'convolutional_block2':convolutional_block2,'pyramid':pyramid,'KSM':KSM})

In [None]:
visualize(sample,PRIDNet)


In [None]:
psnr_nsy, psnr_de_nsy, ssim_nsy, ssim_de_nsy = psnr_and_ssim(test_df,PRIDNet)
print('PSNR before denoising :', psnr_nsy)
print('PSNR after denoising :', psnr_de_nsy)
print('SSIM before denoising :', ssim_nsy)
print('SSIM after denoising :', ssim_de_nsy)

In [None]:
model_size = round((os.stat('PRIDNet.h5').st_size)/(1024**2),3)
PSNR.append(round(psnr_de_nsy,3))
SSIM.append(round(ssim_de_nsy,3))
PSNR_imp.append(round(psnr_de_nsy-psnr_nsy,3))
SSIM_imp.append(round(ssim_de_nsy-ssim_nsy,3))
size.append(model_size)

RIDNet

In [None]:
#https://github.com/saeed-anwar/RIDNet
#In the above reference code, short skip connection in EAM network and skip conncetions in the overall network was not included. I will be adding those as well.   
class EAM(tf.keras.layers.Layer):
  def __init__(self,**kwargs):
    super().__init__(**kwargs)
    
    self.conv1 = Conv2D(64, (3,3), dilation_rate=1,padding='same',activation='relu')
    self.conv2 = Conv2D(64, (3,3), dilation_rate=2,padding='same',activation='relu') 

    self.conv3 = Conv2D(64, (3,3), dilation_rate=3,padding='same',activation='relu')
    self.conv4 = Conv2D(64, (3,3), dilation_rate=4,padding='same',activation='relu')

    self.conv5 = Conv2D(64, (3,3),padding='same',activation='relu')

    self.conv6 = Conv2D(64, (3,3),padding='same',activation='relu')
    self.conv7 = Conv2D(64, (3,3),padding='same')

    self.conv8 = Conv2D(64, (3,3),padding='same',activation='relu')
    self.conv9 = Conv2D(64, (3,3),padding='same',activation='relu')
    self.conv10 = Conv2D(64, (1,1),padding='same')

    self.gap = GlobalAveragePooling2D()

    self.conv11 = Conv2D(64, (3,3),padding='same',activation='relu')
    self.conv12 = Conv2D(64, (3,3),padding='same',activation='sigmoid')

  def call(self,input):
    conv1 = self.conv1(input)
    conv1 = self.conv2(conv1)

    conv2 = self.conv3(input)
    conv2 = self.conv4(conv2)

    concat = concatenate([conv1,conv2])
    conv3 = self.conv5(concat)
    add1 = Add()([input,conv3])

    conv4 = self.conv6(add1)
    conv4 = self.conv7(conv4)
    add2 = Add()([conv4,add1])
    add2 = Activation('relu')(add2)

    conv5 = self.conv8(add2)
    conv5 = self.conv9(conv5)
    conv5 = self.conv10(conv5)
    add3 = Add()([add2,conv5])
    add3 = Activation('relu')(add3)

    gap = self.gap(add3)
    gap = Reshape((1,1,64))(gap)
    conv6 = self.conv11(gap)
    conv6 = self.conv12(conv6)
    
    mul = Multiply()([conv6, add3])
    out = Add()([input,mul]) # This is not included in the reference code
    return out

In [None]:
tf.keras.backend.clear_session()
input = Input(shape=(256, 256, 3))

conv1 = Conv2D(64, (3,3),padding='same')(input)
eam1 = EAM()(conv1)
eam2 = EAM()(eam1)
eam3 = EAM()(eam2)
eam4 = EAM()(eam3)
#add = Add()([eam4,conv1])  
conv2 = Conv2D(3, (3,3),padding='same')(eam4)
out = Add()([conv2,input])

RIDNet = Model(input,out)
RIDNet.compile(optimizer=tf.keras.optimizers.Adam(1e-03), loss=tf.keras.losses.MeanSquaredError())
RIDNet.summary()
     

In [None]:
batch_size=8
train_dataloader = Dataloder(X_train_dataframe,y_train_dataframe, batch_size, shuffle=True)
test_dataloader = Dataloder(X_test_dataframe,y_test_dataframe,batch_size, shuffle=True)
validate_dataloader = Dataloder(X_validate_dataframe,y_validate_dataframe,batch_size, shuffle=True)

In [None]:
log_dir="logs/model_4"
tensorboard = tf.keras.callbacks.TensorBoard(log_dir=log_dir,histogram_freq=1, write_graph=True,write_grads=True)
reducelr = ReduceLROnPlateau(monitor='val_loss', factor=0.1,verbose=1,patience=4,min_delta=0.00001)
callback = [tensorboard,reducelr]
RIDNet.fit(train_dataloader,shuffle=True,epochs=20,validation_data= test_dataloader, callbacks=callback)

In [None]:
RIDNet.fit(train_dataloader,shuffle=True,epochs=25,initial_epoch=20,validation_data= test_dataloader, callbacks=callback)


In [None]:
visualize(sample,RIDNet)


In [None]:
psnr_nsy, psnr_de_nsy, ssim_nsy, ssim_de_nsy = psnr_and_ssim(test_df,RIDNet)
print('PSNR before denoising :', psnr_nsy)
print('PSNR after denoising :', psnr_de_nsy)
print('SSIM before denoising :', ssim_nsy)
print('SSIM after denoising :', ssim_de_nsy)

In [None]:
RIDNet.save('RIDNet.h5')


In [None]:
RIDNet = tf.keras.models.load_model('RIDNet.h5',custom_objects={'EAM':EAM})


In [None]:
model_size = round((os.stat('RIDNet.h5').st_size)/(1024**2),3)
PSNR.append(round(psnr_de_nsy,3))
SSIM.append(round(ssim_de_nsy,3))
PSNR_imp.append(round(psnr_de_nsy-psnr_nsy,3))
SSIM_imp.append(round(ssim_de_nsy-ssim_nsy,3))
size.append(model_size)

In [None]:
import numpy as np
import cv2

def calculate_average_rmse(test_df, model, model_type='Normal'):
    total_rmse = 0.0
    num_samples = 0

    for i in range(len(test_df)):
        # Load the noisy image
        nsy_path = test_df['shabby_image_path'].iloc[i]
        nsy = cv2.imread(nsy_path)
        if nsy is None:
            print(f"Error loading noisy image: {nsy_path}")
            continue  # Skip this iteration and move to the next

        # Getting the predicted images
        if model_type == 'Quantized': 
            pred = prediction_tflite(nsy, model)
        else:
            pred = prediction(nsy, model)

        # Getting the ground truth image data
        gt_path = test_df['cleaned_image_path'].iloc[i]
        gt = cv2.imread(gt_path)
        if gt is None:
            print(f"Error loading ground truth image: {gt_path}")
            continue  # Skip this iteration and move to the next
        gt = cv2.cvtColor(gt, cv2.COLOR_BGR2RGB)

        # Resize the predicted image to match the ground truth image dimensions
        pred = cv2.resize(pred, (gt.shape[1], gt.shape[0]))

        # Calculate RMSE for the current image
        rmse = np.sqrt(np.mean(np.square(gt - pred)))

        # Accumulate RMSE values
        total_rmse += rmse
        num_samples += 1

    # Calculate the average RMSE over all test images
    average_rmse = total_rmse / num_samples
    return average_rmse


In [None]:
# Load the models (replace 'model1_path' and 'model2_path' with the actual file paths)
model1 = tf.keras.models.load_model('autoencoder.h5')
model2 = tf.keras.models.load_model('CBDNet.h5')
# Load PRIDNet model with custom layer registration
model3 = tf.keras.models.load_model('PRIDNet.h5', custom_objects={'convolutional_block1':convolutional_block1, 'CAM':CAM,'convolutional_block2':convolutional_block2,'pyramid':pyramid,'KSM':KSM})
# Calculate average RMSE for each model
average_rmse_model1 = calculate_average_rmse(test_df,model1)
average_rmse_model2 = calculate_average_rmse(test_df,model2)
average_rmse_model3 = calculate_average_rmse(test_df,model3)

print("Average RMSE for Autoencoder (Baseline model):", average_rmse_model1)
print("Average RMSE for CBDNet Model:", average_rmse_model2)
print("Average RMSE for PRIDNet Model:", average_rmse_model3)

