# Library


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.transforms.functional as TF
import torchvision
import torchvision.utils as vutils

from torch.nn import init
import functools
from PIL import Image
import random
import os
import time
import numpy as np
import cv2
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid
from pathlib import Path
from natsort import natsorted
from glob import glob

from collections import OrderedDict

from tqdm.notebook import tqdm
import math

from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
import pandas as pd
import numpy as np
import os
from os.path import exists
from pathlib import Path

In [3]:
from google.colab import drive
from google.colab import auth
from google.auth import default

drive.mount('/content/drive/', force_remount=True)
os.chdir('/content/drive/MyDrive/Colab Notebooks/CPSC 480-580: Computer Vision/Final/dl_model/GT-RAIN')

Mounted at /content/drive/


In [4]:
# Plot image
def show_img(img, title='No Title', normalize_range=False, figsize=15):
  plt.figure(figsize=(figsize, figsize))
  plt.imshow(img)
  plt.title(title)
  plt.show()

# Model Definitions

In [5]:
# Main network blocks
# Code modified from: https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix

# Basic Blocks
class Identity(nn.Module):
  def forward(self, x):
    return x

def get_norm_layer(norm_type='instance'):
  """Return a normalization layer
  Parameters:
      norm_type (str) -- the name of the normalization layer: batch | instance | none
  For BatchNorm, we use learnable affine parameters and track running statistics (mean/stddev).
  For InstanceNorm, we do not use learnable affine parameters. We do not track running statistics.
  """
  if norm_type == 'batch':
    norm_layer = functools.partial(nn.BatchNorm2d, affine=True, track_running_stats=True)
  elif norm_type == 'instance':
    norm_layer = functools.partial(nn.InstanceNorm2d, affine=False, track_running_stats=False)
  elif norm_type == 'none':
    def norm_layer(x): return Identity()
  else:
    raise NotImplementedError('normalization layer [%s] is not found' % norm_type)
  return norm_layer

class Conv2d(torch.nn.Module):
  '''
  2D convolution class
  Args:
    in_channels : int
      number of input channels
    out_channels : int
      number of output channels
    kernel_size : int
      size of kernel
    stride : int
      stride of convolution
    activation_func : func
      activation function after convolution
    norm_layer : functools.partial
      normalization layer
    use_bias : bool
      if set, then use bias
    padding_type : str
      the name of padding layer: reflect | replicate | zero
  '''

  def __init__(
      self,
      in_channels,
      out_channels,
      kernel_size=3,
      stride=1,
      activation_func=torch.nn.LeakyReLU(negative_slope=0.10, inplace=True),
      norm_layer=nn.BatchNorm2d,
      use_bias=False,
      padding_type='reflect'):
    super(Conv2d, self).__init__()

    self.activation_func = activation_func
    conv_block = []
    p = 0
    if padding_type == 'reflect':
      conv_block += [nn.ReflectionPad2d(kernel_size // 2)]
    elif padding_type == 'replicate':
      conv_block += [nn.ReplicationPad2d(kernel_size // 2)]
    elif padding_type == 'zero':
      p = kernel_size // 2
    else:
      raise NotImplementedError('padding [%s] is not implemented' % padding_type)

    conv_block += [
        nn.Conv2d(
            in_channels,
            out_channels,
            stride=stride,
            kernel_size=kernel_size,
            padding=p,
            bias=use_bias),
        norm_layer(out_channels)]

    self.conv = nn.Sequential(*conv_block)

  def forward(self, x):
    conv = self.conv(x)

    if self.activation_func is not None:
      return self.activation_func(conv)
    else:
      return conv

class DeformableConv2d(nn.Module):
  '''
  2D deformable convolution class
  Args:
    in_channels : int
      number of input channels
    out_channels : int
      number of output channels
    kernel_size : int
      size of kernel
    stride : int
      stride of convolution
    padding : int
      padding
    use_bias : bool
      if set, then use bias
  '''
  def __init__(
      self,
      in_channels,
      out_channels,
      kernel_size=3,
      stride=1,
      padding=1,
      bias=False):

    super(DeformableConv2d, self).__init__()

    self.stride = stride if type(stride) == tuple else (stride, stride)
    self.padding = padding

    self.offset_conv = nn.Conv2d(
        in_channels,
        2 * kernel_size * kernel_size,
        kernel_size=kernel_size,
        stride=stride,
        padding=self.padding,
        bias=True)

    nn.init.constant_(self.offset_conv.weight, 0.)
    nn.init.constant_(self.offset_conv.bias, 0.)

    self.modulator_conv = nn.Conv2d(
        in_channels,
        1 * kernel_size * kernel_size,
        kernel_size=kernel_size,
        stride=stride,
        padding=self.padding,
        bias=True)

    nn.init.constant_(self.modulator_conv.weight, 0.)
    nn.init.constant_(self.modulator_conv.bias, 0.)

    self.regular_conv = nn.Conv2d(
        in_channels=in_channels,
        out_channels=out_channels,
        kernel_size=kernel_size,
        stride=stride,
        padding=self.padding,
        bias=bias)

  def forward(self, x):
    offset = self.offset_conv(x)
    modulator = 2. * torch.sigmoid(self.modulator_conv(x))

    x = torchvision.ops.deform_conv2d(
        input=x,
        offset=offset,
        weight=self.regular_conv.weight,
        bias=self.regular_conv.bias,
        padding=self.padding,
        mask=modulator,
        stride=self.stride)
    return x

class UpConv2d(torch.nn.Module):
  '''
  Up-convolution (upsample + convolution) block class
  Args:
    in_channels : int
      number of input channels
    out_channels : int
      number of output channels
    kernel_size : int
      size of kernel (k x k)
    activation_func : func
      activation function after convolution
    norm_layer : functools.partial
      normalization layer
    use_bias : bool
      if set, then use bias
    padding_type : str
      the name of padding layer: reflect | replicate | zero
    interpolate_mode : str
      the mode for interpolation: bilinear | nearest
  '''
  def __init__(
      self,
      in_channels,
      out_channels,
      kernel_size=3,
      activation_func=torch.nn.LeakyReLU(negative_slope=0.10, inplace=True),
      norm_layer=nn.BatchNorm2d,
      use_bias=False,
      padding_type='reflect',
      interpolate_mode='bilinear'):

    super(UpConv2d, self).__init__()
    self.interpolate_mode = interpolate_mode

    self.conv = Conv2d(
        in_channels,
        out_channels,
        kernel_size=kernel_size,
        stride=1,
        activation_func=activation_func,
        norm_layer=norm_layer,
        use_bias=use_bias,
        padding_type=padding_type)

  def forward(self, x):
    n_height, n_width = x.shape[2:4]
    shape = (int(2 * n_height), int(2 * n_width))
    upsample = torch.nn.functional.interpolate(
        x, size=shape, mode=self.interpolate_mode, align_corners=True)
    conv = self.conv(upsample)
    return conv

class DeformableResnetBlock(nn.Module):
  """Define a Resnet block with deformable convolutions"""

  def __init__(
      self, dim, padding_type,
      norm_layer, use_dropout,
      use_bias, activation_func):
    """Initialize the deformable Resnet block
    A defromable resnet block is a conv block with skip connections
    """
    super(DeformableResnetBlock, self).__init__()
    self.conv_block = self.build_conv_block(
        dim, padding_type,
        norm_layer, use_dropout,
        use_bias, activation_func)

  def build_conv_block(
      self, dim, padding_type,
      norm_layer, use_dropout,
      use_bias, activation_func):
    """Construct a convolutional block.
    Parameters:
        dim (int) -- the number of channels in the conv layer.
        padding_type (str) -- the name of padding layer: reflect | replicate | zero
        norm_layer -- normalization layer
        use_dropout (bool) -- if use dropout layers.
        use_bias (bool) -- if the conv layer uses bias or not
        activation_func (func) -- activation type
    Returns a conv block (with a conv layer, a normalization layer, and a non-linearity layer)
    """
    conv_block = []

    p = 0
    if padding_type == 'reflect':
      conv_block += [nn.ReflectionPad2d(1)]
    elif padding_type == 'replicate':
      conv_block += [nn.ReplicationPad2d(1)]
    elif padding_type == 'zero':
      p = 1
    else:
      raise NotImplementedError('padding [%s] is not implemented' % padding_type)

    conv_block += [
        DeformableConv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias),
        norm_layer(dim),
        activation_func]
    if use_dropout:
      conv_block += [nn.Dropout(0.5)]

    p = 0
    if padding_type == 'reflect':
      conv_block += [nn.ReflectionPad2d(1)]
    elif padding_type == 'replicate':
      conv_block += [nn.ReplicationPad2d(1)]
    elif padding_type == 'zero':
      p = 1
    else:
      raise NotImplementedError('padding [%s] is not implemented' % padding_type)
    conv_block += [DeformableConv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias), norm_layer(dim)]

    return nn.Sequential(*conv_block)

  def forward(self, x):
    """Forward function (with skip connections)"""
    out = x + self.conv_block(x)    # add skip connections
    return out

class DecoderBlock(torch.nn.Module):
  '''
  Decoder block with skip connections
  Args:
    in_channels : int
      number of input channels
    skip_channels : int
      number of skip connection channels
    out_channels : int
      number of output channels
    activation_func : func
      activation function after convolution
    norm_layer : functools.partial
      normalization layer
    use_bias : bool
      if set, then use bias
    padding_type : str
      the name of padding layer: reflect | replicate | zero
    upsample_mode : str
      the mode for interpolation: transpose | bilinear | nearest
  '''

  def __init__(
      self,
      in_channels,
      skip_channels,
      out_channels,
      activation_func=torch.nn.LeakyReLU(negative_slope=0.10, inplace=True),
      norm_layer=nn.BatchNorm2d,
      use_bias=False,
      padding_type='reflect',
      upsample_mode='transpose'):
    super(DecoderBlock, self).__init__()

    self.skip_channels = skip_channels
    self.upsample_mode = upsample_mode

    # Upsampling
    if upsample_mode == 'transpose':
      self.deconv = nn.Sequential(
          nn.ConvTranspose2d(
              in_channels, out_channels,
              kernel_size=3, stride=2,
              padding=1, output_padding=1,
              bias=use_bias),
          norm_layer(out_channels),
          activation_func)
    else:
      self.deconv = UpConv2d(
          in_channels, out_channels,
          use_bias=use_bias,
          activation_func=activation_func,
          norm_layer=norm_layer,
          padding_type=padding_type,
          interpolate_mode=upsample_mode)

    concat_channels = skip_channels + out_channels

    self.conv = Conv2d(
        concat_channels,
        out_channels,
        kernel_size=3,
        stride=1,
        activation_func=activation_func,
        padding_type=padding_type,
        norm_layer=norm_layer,
        use_bias=use_bias)

  def forward(self, x, skip=None):
    deconv = self.deconv(x)

    if self.skip_channels > 0:
      concat = torch.cat([deconv, skip], dim=1)
    else:
      concat = deconv

    return self.conv(concat)

In [6]:
def init_weights(net, init_type='normal', init_gain=0.02):
  """
  Initialize network weights.
  Parameters:
      net (network) -- network to be initialized
      init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal
      init_gain (float) -- scaling factor for normal, xavier and orthogonal.
  """
  def init_func(m):  # define the initialization function
    classname = m.__class__.__name__
    if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear') != -1):
      if init_type == 'normal':
        init.normal_(m.weight.data, 0.0, init_gain)
      elif init_type == 'xavier':
        init.xavier_normal_(m.weight.data, gain=init_gain)
      elif init_type == 'kaiming':
        init.kaiming_normal_(m.weight.data, a=0, mode='fan_in')
      elif init_type == 'orthogonal':
        init.orthogonal_(m.weight.data, gain=init_gain)
      else:
        raise NotImplementedError('initialization method [%s] is not implemented' % init_type)
      if hasattr(m, 'bias') and m.bias is not None:
        init.constant_(m.bias.data, 0.0)
    elif classname.find('BatchNorm2d') != -1:  # BatchNorm Layer's weight is not a matrix; only normal distribution applies.
      init.normal_(m.weight.data, 1.0, init_gain)
      init.constant_(m.bias.data, 0.0)

  print('initialize network with %s' % init_type)
  net.apply(init_func)  # apply the initialization function <init_func>

def init_net(net, init_type='normal', init_gain=0.02, gpu_ids=[]):
  """Initialize a network: 1. register CPU/GPU device (with multi-GPU support); 2. initialize the network weights
  Parameters:
          net (network) -- the network to be initialized
          init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal
          gain (float) -- scaling factor for normal, xavier and orthogonal.
          gpu_ids (int list) -- which GPUs the network runs on: e.g., 0,1,2
  Return an initialized network.
  """
  if len(gpu_ids) > 0:
    assert(torch.cuda.is_available())
    net.to(gpu_ids[0])
    net = torch.nn.DataParallel(net, gpu_ids)    # multi-GPUs
  init_weights(net, init_type, init_gain=init_gain)

  # Zero for deform convs
  key_name_list = ['offset', 'modulator']
  for cur_name, parameters in net.named_parameters():
    if any(key_name in cur_name for key_name in key_name_list):
      nn.init.constant_(parameters, 0.)
  return net

class ResNetModified(nn.Module):
  """
  Resnet-based generator that consists of deformable Resnet blocks.
  """

  def __init__(
      self,
      input_nc,
      output_nc,
      ngf=64,
      norm_layer=nn.BatchNorm2d,
      activation_func=torch.nn.LeakyReLU(negative_slope=0.10, inplace=True),
      use_dropout=False,
      n_blocks=6,
      padding_type='reflect',
      upsample_mode='bilinear'):
    """Construct a Resnet-based generator
    Parameters:
      input_nc (int) -- the number of channels in input images
      output_nc (int) -- the number of channels in output images
      ngf (int) -- the number of filters in the last conv layer
      norm_layer -- normalization layer
      use_dropout (bool) -- if use dropout layers
      n_blocks (int) -- the number of ResNet blocks
      padding_type (str) -- the name of padding layer in conv layers: reflect | replicate | zero
      upsample_mode (str) -- mode for upsampling: transpose | bilinear
    """
    assert(n_blocks >= 0)
    super(ResNetModified, self).__init__()
    if type(norm_layer) == functools.partial:
      use_bias = norm_layer.func == nn.InstanceNorm2d
    else:
      use_bias = norm_layer == nn.InstanceNorm2d

    # Initial Convolution
    self.initial_conv = nn.Sequential(
        Conv2d(
            in_channels=input_nc,
            out_channels=ngf,
            kernel_size=7,
            padding_type=padding_type,
            norm_layer=norm_layer,
            activation_func=activation_func,
            use_bias=use_bias),
        Conv2d(
            in_channels=ngf,
            out_channels=ngf,
            kernel_size=3,
            padding_type=padding_type,
            norm_layer=norm_layer,
            activation_func=activation_func,
            use_bias=use_bias))

    # Downsample Blocks
    n_downsampling = 2
    mult = 2 ** 0
    self.downsample_1 = Conv2d(
        in_channels=ngf * mult,
        out_channels=ngf * mult * 2,
        kernel_size=3,
        stride=2,
        padding_type=padding_type,
        norm_layer=norm_layer,
        activation_func=activation_func,
        use_bias=use_bias)

    mult = 2 ** 1
    self.downsample_2 = Conv2d(
        in_channels=ngf * mult,
        out_channels=ngf * mult * 2,
        kernel_size=3,
        stride=2,
        padding_type=padding_type,
        norm_layer=norm_layer,
        activation_func=activation_func,
        use_bias=use_bias)

    # Residual Blocks
    residual_blocks = []
    mult = 2 ** n_downsampling
    for i in range(n_blocks): # add ResNet blocks
      residual_blocks += [
          DeformableResnetBlock(
              ngf * mult,
              padding_type=padding_type,
              norm_layer=norm_layer,
              use_dropout=use_dropout,
              use_bias=use_bias, activation_func=activation_func)]

    self.residual_blocks = nn.Sequential(*residual_blocks)

    # Upsampling
    mult = 2 ** (n_downsampling - 0)
    self.upsample_2 = DecoderBlock(
        ngf * mult,
        int(ngf * mult / 2),
        int(ngf * mult / 2),
        use_bias=use_bias,
        activation_func=activation_func,
        norm_layer=norm_layer,
        padding_type=padding_type,
        upsample_mode=upsample_mode)

    mult = 2 ** (n_downsampling - 1)
    self.upsample_1 = DecoderBlock(
        ngf * mult,
        int(ngf * mult / 2),
        int(ngf * mult / 2),
        use_bias=use_bias,
        activation_func=activation_func,
        norm_layer=norm_layer,
        padding_type=padding_type,
        upsample_mode=upsample_mode)

    # Output Convolution
    self.output_conv_naive = nn.Sequential(
        nn.ReflectionPad2d(1),
        nn.Conv2d(ngf, output_nc, kernel_size=3, padding=0),
        nn.Tanh())

    # # Projection for rain robust loss
    # self.feature_projection = nn.Sequential(
    #     nn.AdaptiveAvgPool2d((2, 2)),
    #     nn.Flatten(start_dim=1, end_dim=-1))

  def forward(self, input):
    """Standard forward"""

    # Downsample
    initial_conv_out  = self.initial_conv(input)
    downsample_1_out = self.downsample_1(initial_conv_out)
    downsample_2_out = self.downsample_2(downsample_1_out)

    # Residual
    residual_blocks_out = self.residual_blocks(downsample_2_out)

    # Upsample
    upsample_2_out = self.upsample_2(residual_blocks_out, downsample_1_out)
    upsample_1_out = self.upsample_1(upsample_2_out, initial_conv_out)
    final_out = self.output_conv_naive(upsample_1_out)

    # Features
    # features = self.feature_projection(residual_blocks_out)

    # Return multiple final conv results
    return final_out, # features

class GTRainModel(nn.Module):
  def __init__(
      self,
      ngf=64,
      n_blocks=9,
      norm_layer_type='batch',
      activation_func=torch.nn.LeakyReLU(negative_slope=0.10, inplace=True),
      upsample_mode='bilinear',
      init_type='kaiming'):
    """
    GT-Rain Model
    Parameters:
      ngf (int) -- the number of conv filters
      n_blocks (int) -- the number of deformable ResNet blocks
      norm_layer_type (str) -- 'batch', 'instance'
      activation_func (func) -- activation functions
      upsample_mode (str) -- 'transpose', 'bilinear'
      init_type (str) -- None, 'normal', 'xavier', 'kaiming', 'orthogonal'
    """
    super(GTRainModel, self).__init__()
    self.resnet = ResNetModified(
      input_nc=3, output_nc=3, ngf=ngf,
      norm_layer=get_norm_layer(norm_layer_type),
      activation_func=activation_func,
      use_dropout=False, n_blocks=n_blocks,
      padding_type='reflect',
      upsample_mode=upsample_mode)

    # Initialization
    if init_type:
      init_net(self.resnet, init_type=init_type)

  def forward(self, x):
    out_img = self.resnet(x)
    return out_img

# Parameters

In [14]:
params = {
  'load_checkpoint': './model/model_checkpoint.pth', # Dir to load model weights
  'input_path': '../../data/rain/GT-RAIN_test',
  'gt_path': '/path/to/gt',
  'save_path': './outputs',
  'init_type': 'normal', # Initialization type
  'norm_layer_type': 'batch', # Normalization type
  'activation_func': torch.nn.LeakyReLU(negative_slope=0.10, inplace=True), # Activation function
  'upsample_mode': 'bilinear', # Mode for upsampling
  'ngf': 64,
  'n_blocks': 9}

os.makedirs(params['save_path'], exist_ok=True)

# Load Model

In [11]:
# Make the model
model = GTRainModel(
  ngf=params['ngf'],
  n_blocks=params['n_blocks'],
  norm_layer_type=params['norm_layer_type'],
  activation_func=params['activation_func'],
  upsample_mode=params['upsample_mode'],
  init_type=params['init_type'])

print(model)
model.cuda()

initialize network with normal
GTRainModel(
  (resnet): ResNetModified(
    (initial_conv): Sequential(
      (0): Conv2d(
        (activation_func): LeakyReLU(negative_slope=0.1, inplace=True)
        (conv): Sequential(
          (0): ReflectionPad2d((3, 3, 3, 3))
          (1): Conv2d(3, 64, kernel_size=(7, 7), stride=(1, 1), bias=False)
          (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (1): Conv2d(
        (activation_func): LeakyReLU(negative_slope=0.1, inplace=True)
        (conv): Sequential(
          (0): ReflectionPad2d((1, 1, 1, 1))
          (1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), bias=False)
          (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
    )
    (downsample_1): Conv2d(
      (activation_func): LeakyReLU(negative_slope=0.1, inplace=True)
      (conv): Sequential(
        (0): ReflectionPad2d((1, 1, 1, 1))
        (1): Conv2d

GTRainModel(
  (resnet): ResNetModified(
    (initial_conv): Sequential(
      (0): Conv2d(
        (activation_func): LeakyReLU(negative_slope=0.1, inplace=True)
        (conv): Sequential(
          (0): ReflectionPad2d((3, 3, 3, 3))
          (1): Conv2d(3, 64, kernel_size=(7, 7), stride=(1, 1), bias=False)
          (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (1): Conv2d(
        (activation_func): LeakyReLU(negative_slope=0.1, inplace=True)
        (conv): Sequential(
          (0): ReflectionPad2d((1, 1, 1, 1))
          (1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), bias=False)
          (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
    )
    (downsample_1): Conv2d(
      (activation_func): LeakyReLU(negative_slope=0.1, inplace=True)
      (conv): Sequential(
        (0): ReflectionPad2d((1, 1, 1, 1))
        (1): Conv2d(64, 128, kernel_size=(3, 3), s

In [12]:
# Load model weights
print('Loading weights:', params['load_checkpoint'])
checkpoint = torch.load(params['load_checkpoint']) #, map_location=torch.device('cpu')
model.load_state_dict(checkpoint['state_dict'], strict=True) #strict=True
model.eval()

Loading weights: ./model/model_checkpoint.pth


GTRainModel(
  (resnet): ResNetModified(
    (initial_conv): Sequential(
      (0): Conv2d(
        (activation_func): LeakyReLU(negative_slope=0.1, inplace=True)
        (conv): Sequential(
          (0): ReflectionPad2d((3, 3, 3, 3))
          (1): Conv2d(3, 64, kernel_size=(7, 7), stride=(1, 1), bias=False)
          (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (1): Conv2d(
        (activation_func): LeakyReLU(negative_slope=0.1, inplace=True)
        (conv): Sequential(
          (0): ReflectionPad2d((1, 1, 1, 1))
          (1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), bias=False)
          (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
    )
    (downsample_1): Conv2d(
      (activation_func): LeakyReLU(negative_slope=0.1, inplace=True)
      (conv): Sequential(
        (0): ReflectionPad2d((1, 1, 1, 1))
        (1): Conv2d(64, 128, kernel_size=(3, 3), s

# Testing Code

In [None]:
# Section for running with generic test sets
if params['gt_path']:
  total_PSNR_input = 0
  total_SSIM_input = 0
  total_PSNR_output = 0
  total_SSIM_output = 0
  clean_img_paths = natsorted(glob(params['gt_path']))

rainy_img_paths = natsorted(glob(params['input_path']))
num_paths = len(rainy_img_paths)

for i in tqdm(range(num_paths)):
  filename = rainy_img_paths[i].split('/')[-1][:-4]
  img = Image.open(rainy_img_paths[i])
  img = np.array(img, dtype=np.float32)
  img *= 1/255
  height, width = img.shape[:2]

  img = img[:height-height%4,:width-width%4,:]
  input = torch.from_numpy(img).permute((2, 0, 1)) * 2 - 1
  input = torch.unsqueeze(input, 0).cuda()
  output = (model(input)[0]* 0.5 + 0.5).squeeze().permute((1, 2, 0))
  output = output.detach().cpu().numpy()

  if params['gt_path']:
    gt_img = Image.open(clean_img_paths[i])
    gt_img = np.array(gt_img, dtype=np.float32)
    gt_img *= 1/255
    gt_img = gt_img[:height-height%4,:width-width%4,:]
    total_PSNR_input += psnr(gt_img, img)
    total_SSIM_input += ssim(gt_img, img, multichannel=True)
    total_PSNR_output += psnr(gt_img, output)
    total_SSIM_output += ssim(gt_img, output, multichannel=True)

  # USE THIS BLOCK TO SAVE
  im = Image.fromarray((output*255).astype(np.uint8))
  im.save(f"{params['save_path']}/{filename}.png")

if params['gt_path']:
  print(f"PSNR Input: {total_PSNR_input/num_paths}")
  print(f"SSIM Input: {total_SSIM_input/num_paths}")
  print(f"PSNR Output: {total_PSNR_output/num_paths}")
  print(f"SSIM Output: {total_SSIM_output/num_paths}")

In [16]:
# Section for running on GT-RAIN test set
total_PSNR_input = 0
total_SSIM_input = 0
total_PSNR_output = 0
total_SSIM_output = 0

rain_acc_total_PSNR_input = 0
rain_acc_total_SSIM_input = 0
rain_acc_total_PSNR_output = 0
rain_acc_total_SSIM_output = 0
rain_acc_num_scenes = 0

dense_streak_total_PSNR_input = 0
dense_streak_total_SSIM_input = 0
dense_streak_total_PSNR_output = 0
dense_streak_total_SSIM_output = 0
dense_streak_num_scenes = 0

scene_paths = natsorted(glob(f"{params['input_path']}/*"))

for scene_path in tqdm(scene_paths):
    scene_name = scene_path.split('/')[-1]
    clean_img_path = glob(scene_path + '/*C-000.png')[0]
    rainy_img_paths = natsorted(glob(scene_path + '/*R-*.png'))
    scene_PSNR_input = 0
    scene_SSIM_input = 0
    scene_PSNR_output = 0
    scene_SSIM_output = 0

    for i in tqdm(range(len(rainy_img_paths))):
        filename = rainy_img_paths[i].split('/')[-1][:-4]
        img = Image.open(rainy_img_paths[i])
        gt_img = Image.open(clean_img_path)
        img = np.array(img, dtype=np.float32)
        img *= 1 / 255
        gt_img = np.array(gt_img, dtype=np.float32)
        gt_img *= 1 / 255
        height, width = img.shape[:2]

        img = img[:height - height % 4, :width - width % 4, :]

        gt_img = gt_img[:height - height % 4, :width - width % 4, :]

        input = torch.from_numpy(img).permute((2, 0, 1)) * 2 - 1
        input = torch.unsqueeze(input, 0).cuda()
        output = (model(input)[0] * 0.5 + 0.5).squeeze().permute((1, 2, 0))
        output = output.detach().cpu().numpy()

        # USE THIS BLOCK TO SAVE
        im = Image.fromarray((output * 255).astype(np.uint8))
        os.makedirs(params['save_path']+ '/' + scene_name, exist_ok=True)
        im.save(f"{params['save_path']}/{scene_name}/{filename}.png")

        scene_PSNR_input += psnr(gt_img, img)
        scene_SSIM_input += ssim(gt_img, img, multichannel=True)
        scene_PSNR_output += psnr(gt_img, output)
        scene_SSIM_output += ssim(gt_img, output, multichannel=True)
    print(f"Scene: {scene_name}")
    print(f"Scene PSNR Input: {scene_PSNR_input / len(rainy_img_paths)}")
    print(f"Scene SSIM Input: {scene_SSIM_input / len(rainy_img_paths)}")
    print(f"Scene PSNR Output: {scene_PSNR_output / len(rainy_img_paths)}")
    print(f"Scene SSIM Output: {scene_SSIM_output / len(rainy_img_paths)}")

    total_PSNR_input += scene_PSNR_input / len(rainy_img_paths)
    total_SSIM_input += scene_SSIM_input / len(rainy_img_paths)
    total_PSNR_output += scene_PSNR_output / len(rainy_img_paths)
    total_SSIM_output += scene_SSIM_output / len(rainy_img_paths)

    if scene_name in ["Oinari_0-0", "M1135_0-0", "Table_Rock_0-0"]:
        rain_acc_total_PSNR_input += scene_PSNR_input / len(rainy_img_paths)
        rain_acc_total_SSIM_input += scene_SSIM_input / len(rainy_img_paths)
        rain_acc_total_PSNR_output += scene_PSNR_output / len(rainy_img_paths)
        rain_acc_total_SSIM_output += scene_SSIM_output / len(rainy_img_paths)
        rain_acc_num_scenes += 1
    else:
        dense_streak_total_PSNR_input += scene_PSNR_input / len(rainy_img_paths)
        dense_streak_total_SSIM_input += scene_SSIM_input / len(rainy_img_paths)
        dense_streak_total_PSNR_output += scene_PSNR_output / len(rainy_img_paths)
        dense_streak_total_SSIM_output += scene_SSIM_output / len(rainy_img_paths)
        dense_streak_num_scenes += 1
num_scenes = len(scene_paths)
print(f"Total PSNR Input: {total_PSNR_input / (num_scenes)}")
print(f"Total SSIM Input: {total_SSIM_input / num_scenes}")
print(f"Total PSNR Output: {total_PSNR_output / num_scenes}")
print(f"Total SSIM Output: {total_SSIM_output / num_scenes}")

print(f"rain accumulation Total PSNR Input: {rain_acc_total_PSNR_input / (rain_acc_num_scenes)}")
print(f"rain accumulation Total SSIM Input: {rain_acc_total_SSIM_input / rain_acc_num_scenes}")
print(f"rain accumulation Total PSNR Output: {rain_acc_total_PSNR_output / rain_acc_num_scenes}")
print(f"rain accumulation Total SSIM Output: {rain_acc_total_SSIM_output / rain_acc_num_scenes}")

print(f"dense streak Total PSNR Input: {dense_streak_total_PSNR_input / (dense_streak_num_scenes)}")
print(f"dense streak Total SSIM Input: {dense_streak_total_SSIM_input / dense_streak_num_scenes}")
print(f"dense streak Total PSNR Output: {dense_streak_total_PSNR_output / dense_streak_num_scenes}")
print(f"dense streak Total SSIM Output: {dense_streak_total_SSIM_output / dense_streak_num_scenes}")

  0%|          | 0/7 [00:00<?, ?it/s]

  0%|          | 0/300 [00:00<?, ?it/s]

  scene_SSIM_input += ssim(gt_img, img, multichannel=True)
  scene_SSIM_output += ssim(gt_img, output, multichannel=True)


Scene: Gurutto_0-0
Scene PSNR Input: 23.60950765841992
Scene SSIM Input: 0.8380728183190028
Scene PSNR Output: 25.50575434811268
Scene SSIM Output: 0.8521561815341314


  0%|          | 0/300 [00:00<?, ?it/s]

Scene: M1135_0-0
Scene PSNR Input: 20.05757617949582
Scene SSIM Input: 0.7491096985340119
Scene PSNR Output: 24.017451437237643
Scene SSIM Output: 0.7891694058974584


  0%|          | 0/300 [00:00<?, ?it/s]

Scene: Oinari_0-0
Scene PSNR Input: 19.964709196310626
Scene SSIM Input: 0.6833025070031484
Scene PSNR Output: 21.888833244974762
Scene SSIM Output: 0.784458329876264


  0%|          | 0/300 [00:00<?, ?it/s]

Scene: Oinari_1-1
Scene PSNR Input: 16.340532017570467
Scene SSIM Input: 0.5764776265621185
Scene PSNR Output: 18.858074345046255
Scene SSIM Output: 0.6167993277311326


  0%|          | 0/300 [00:00<?, ?it/s]

Scene: Table_Rock_0-0
Scene PSNR Input: 22.59963590501475
Scene SSIM Input: 0.8792925266424815
Scene PSNR Output: 28.437692094501127
Scene SSIM Output: 0.9100381143887838


  0%|          | 0/300 [00:00<?, ?it/s]

Scene: Winter_Garden_0-1
Scene PSNR Input: 16.995620433847723
Scene SSIM Input: 0.5754281296332677
Scene PSNR Output: 19.560499289603907
Scene SSIM Output: 0.6192726192871729


  0%|          | 0/300 [00:00<?, ?it/s]

Scene: Winter_Garden_0-4
Scene PSNR Input: 16.89636563239744
Scene SSIM Input: 0.5234880097707113
Scene PSNR Output: 19.44240435367706
Scene SSIM Output: 0.541059419910113
Total PSNR Input: 19.49484957472239
Total SSIM Input: 0.6893101880663917
Total PSNR Output: 22.530101301879064
Total SSIM Output: 0.7304219140892937
rain accumulation Total PSNR Input: 20.87397376027373
rain accumulation Total SSIM Input: 0.7705682440598807
rain accumulation Total PSNR Output: 24.781325592237845
rain accumulation Total SSIM Output: 0.8278886167208355
dense streak Total PSNR Input: 18.460506435558884
dense streak Total SSIM Input: 0.6283666460712751
dense streak Total PSNR Output: 20.841683084109977
dense streak Total SSIM Output: 0.6573218871156375
