# load packages

In [0]:
!pip install tensorflow-gpu==1.15

%tensorflow_version 1.x
import os
import tensorflow as tf
from pathlib import Path
from tensorflow.data import Iterator
from tensorflow.contrib import summary

import matplotlib.pyplot as plt

import numpy as np

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))


Collecting tensorflow-gpu==1.15
[?25l  Downloading https://files.pythonhosted.org/packages/a5/ad/933140e74973fb917a194ab814785e7c23680ca5dee6d663a509fe9579b6/tensorflow_gpu-1.15.0-cp36-cp36m-manylinux2010_x86_64.whl (411.5MB)
[K     |████████████████████████████████| 411.5MB 42kB/s 
Collecting tensorflow-estimator==1.15.1
[?25l  Downloading https://files.pythonhosted.org/packages/de/62/2ee9cd74c9fa2fa450877847ba560b260f5d0fb70ee0595203082dafcc9d/tensorflow_estimator-1.15.1-py2.py3-none-any.whl (503kB)
[K     |████████████████████████████████| 512kB 42.4MB/s 
Collecting gast==0.2.2
  Downloading https://files.pythonhosted.org/packages/4e/35/11749bf99b2d4e3cceb4d55ca22590b0d7c2c62b9de38ac4a4a7f4687421/gast-0.2.2.tar.gz
Collecting tensorboard<1.16.0,>=1.15.0
[?25l  Downloading https://files.pythonhosted.org/packages/1e/e9/d3d747a97f7188f48aa5eda486907f3b345cd409f0a0850468ba867db246/tensorboard-1.15.0-py3-none-any.whl (3.8MB)
[K     |████████████████████████████████| 3.8MB 41.7MB/s 


# load data

In [0]:
def decode_img(img):
	# processes grayscale png images
	img = tf.image.decode_png(img,channels=1)
	img = tf.image.convert_image_dtype(img,
		tf.uint8)
	return img

def process_path(file_path):
	# reads and decodes png file 
	img = tf.io.read_file(file_path)
	img = decode_img(img)
	return img

def load_set(img_path, set_type='train'):
	list_ds = tf.data.Dataset.list_files(
		str(img_path/'*.png'))
	dataset = list_ds.map(process_path,
		num_parallel_calls=
		tf.data.experimental.AUTOTUNE)
	return dataset

def load_data(dataset='mias',DIR = Path('.')):
	
	if dataset == 'mias':
		path = DIR/'mias_dataset'
	elif dataset == 'mias_augmented':
		path = DIR/'mias_dataset_augmented'
	elif dataset == 'dental':
		path = DIR/'dental_dataset'
	elif dataset == 'dental_augmented':
		path = DIR/'dental_dataset_augmented'
	elif dataset == 'both_augmented':
		path = DIR/'both_datasets_augmented'
	else:
		path = DIR/'both_datasets'

	train_path = path/'train'
	val_path = path/'val'
	test_path = path/'test'

	train_ds = load_set(train_path,
	                    set_type='train')
	val_ds = load_set(val_path,
	                  set_type='val')
	test_ds = load_set(test_path,
	                   set_type='test')

	return train_ds,val_ds,test_ds


# loss functions


In [0]:
def huber_loss(prediction, ground_truth,
                delta=1.0):
    # set a small delta value to make 
    # it behave like L1 loss
    loss = tf.keras.losses.Huber(
        delta=delta)
    return loss(prediction, ground_truth)


def l2_loss(prediction, ground_truth):

    loss = tf.nn.l2_loss(
        prediction - ground_truth)
    return loss


def ssim_loss(prediction, ground_truth, 
              max_val, filter_size=11, 
              filter_sigma=1.5, k1=0.01, 
              k2=0.03):
    # Gaussian filter of size 11x11 and 
    # width 1.5 is used. 
    # Image has to be at least 11x11 big.

    ssim_loss = tf.image.ssim(
        prediction,
        ground_truth,
        max_val,
        filter_size=filter_size,
        filter_sigma=filter_sigma,
        k1=k1,
        k2=k2)
    return 1.0-tf.math.reduce_mean(ssim_loss)

def compute_psnr(ref, target):
  # correlated with MSE / L2
    diff = target - ref
    sqr = tf.multiply(diff, diff)
    err = tf.reduce_sum(sqr)
    v = tf.reduce_prod(diff.get_shape())
    mse = err / tf.cast(v, tf.float32)+1e-12
    psnr = 10. * (tf.log(
        255.* 255./ mse)/tf.log(10.))

    return psnr


# additive noise

In [0]:
def gaussian_noise(image,proportion=0.2,
	mean=0, std=1):
    '''Apply gaussina noise of given mean,
    	standard deviation and proportion
    	or scale of the noise
    Args:
        image: Tensor [batch, height, width]
        proportion: scalar - scale of noise
        mean: scalar
        std: scalar - standard deviation
    Returns:
        Tensor of same dtype and shape as
        image
    '''

    noise = proportion*tf.random.normal(
        shape=image.get_shape().as_list(),
        mean=mean,
        stddev=std,
        dtype=tf.float32)
    image = tf.cast(image,
        tf.float32) / 255.0 + noise

    img_min = tf.math.reduce_min(image)
    img_max = tf.math.reduce_max(image)

    img_range = img_max - img_min

    image = tf.cast(
    	255*(image - img_min)/img_range,
    	tf.uint8)

    return image

def poisson_noise(image,proportion=0.2,
	lam=1):
    '''Apply poisson noise of given lambda,
    	and proportion/scale of the noise
    	Implemented as additive noise
    Args:
        image: Tensor [batch, height, width]
        proportion: scalar - scale of noise
        lam: scalar - Expected value of 
                      poisson
    Returns:
        Tensor of same dtype and shape as
        image
    '''

    noise = proportion*tf.random.poisson(
        shape=image.get_shape().as_list(),
        lam=lam, dtype=tf.float32)
    image = tf.cast(image,
        tf.float32) / 255.0 + noise

    img_min = tf.math.reduce_min(image)
    img_max = tf.math.reduce_max(image)

    img_range = img_max - img_min

    image = tf.cast(
    	255*(image - img_min)/img_range,
    	tf.uint8)

    return image

def random_noise(image,proportion=0.2,
	std_range=[1,5], lam_range=[1,5]):
	'''Randomly apply gaussian or poisson noise 
		with stdev and lambda selected randomly 
		from the range
    Args:
        image: Tensor [batch, height, width]
        proportion: scalar - scale of noise
        std_range: '2D list of int'
        lam_range: '2D list of int'
    Returns:
        Tensor of same dtype and shape as
        image
    '''

	lam = tf.cast(
		tf.random.shuffle(tf.constant(\
		list(range(lam_range[0],lam_range[1])))),
		tf.float32)

	std = tf.cast(
		tf.random.shuffle(tf.constant(\
		list(range(std_range[0],std_range[1])))),
		tf.float32)

    # conditionally add noise
	image = tf.cond(tf.random.uniform(
            shape=[],
            dtype=tf.float32) > 0.5,
        lambda: gaussian_noise(image,\
        	proportion=proportion, std=std[0]),
        lambda: poisson_noise(image,\
        	proportion=proportion, lam=lam[0]))
    
	return image

# denoising autoencoder

In [0]:
def conv_layer(input,layer_name,shape,padding='SAME',
               batch_norm=False,max_pool=False,
               activation_fn=None,is_training=False):
   '''Creates a conv/ conv+maxpool/ conv+batchnorm
      or conv+maxpool+batchnorm layer based on 
      arguments
    Args:
        input: Tensor [batch, height, width, in_channel]
        shape: array/list [filter_height, filter_width,
                            in_channel, out_channel]
        padding: string 'SAME' or 'VALID'
        activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
    Returns:
        Tensor with out_channel channels and dimensions
        might vary with padding type
    '''
    
    with tf.variable_scope(layer_name):
      w_init = tf.keras.initializers.TruncatedNormal(
          stddev=0.1,dtype=tf.float32)
      b_init = tf.constant_initializer(0.0)

      W = tf.get_variable('w',shape=shape,
                          initializer=w_init)
      b = tf.get_variable('b',shape=[shape[3]],
                          initializer=b_init)
      
      conv_out = tf.nn.conv2d(
          input, W, strides=[1, 1, 1, 1],
          padding=padding, name='conv_2d')
      
      conv_out = tf.nn.bias_add(conv_out,b)

      if batch_norm:
        conv_out = tf.layers.batch_normalization(
            conv_out,scale=True,center=True,
            training=is_training, name='batch_norm')
      
      if max_pool:
        conv_out = tf.nn.max_pool2d(
            conv_out,ksize=[1,2,2,1],
            strides=[1,2,2,1],padding='VALID')
        
      if activation_fn != None:
        conv_out = activation_fn(conv_out)
      
      return conv_out

def deconv_layer(input,out_filters,ksize=(2,2),
                 strides=(2,2),padding='SAME'):

    deconv_out = tf.layers.conv2d_transpose(
        input, filters=out_filters,
        kernel_size=ksize, strides=strides,
        padding=padding)
    
    return deconv_out

def upsample_layer(input,ksize=(2,2),
                   interp='nearest'):
    upsample = tf.keras.layers.UpSampling2D(
        size=ksize,interpolation=interp)
    
    return upsample(input)


In [0]:
def build_DAE(x_input,k_size,num_filters,
                    activation_fn,fin_activation_fn,
                    is_training=False,
                    verbose=False,batch_norm=False):
   '''Build a denoising autoencoder network as 
      discussed by L. Gondara. 'Medical image 
      denoising using convolutional denoising 
      autoencoders.' In2016IEEE 16th International 
      Conference on Data Mining Workshops (ICDMW), 
      pages 241–246,2016
    Args:
        x_input: Tensor [batch, height, width, in_channel]
        k_size: array/list [filter_height, filter_width]
        num_filters: scalar - starting number of filters
        activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
        fin_activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
    Returns:
        prediction: Tensor of the same shape as x_input
    '''
  # encoder bit
  encode_conv1 = conv_layer(
      x_input,'econv1', 
      shape=[k_size, k_size, 1,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=True, activation_fn=activation_fn,
      is_training=is_training)
  
  encode_conv2 = conv_layer(
      encode_conv1,'econv2',
      shape=[k_size,k_size,num_filters,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=True, activation_fn=activation_fn,
      is_training=is_training)
  
  encode_conv3 = conv_layer(
      encode_conv2,'econv3',
      shape=[k_size,k_size,num_filters,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=False, activation_fn=activation_fn,
      is_training=is_training)
  
  
  # decoder bit
  decode_conv6 = upsample_layer(encode_conv3,
                                ksize=(2,2),
                                interp='nearest')

  decode_conv5 = conv_layer(
      decode_conv6,'dconv5',
      shape=[k_size,k_size,num_filters,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=False, activation_fn=activation_fn,
      is_training=is_training)

  decode_conv4 = upsample_layer(decode_conv5,
                                ksize=(2,2),
                                interp='nearest')

  decode_conv3 = conv_layer(
      decode_conv4,'dconv3',
      shape=[k_size,k_size,num_filters,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=False, activation_fn=activation_fn,
      is_training=is_training)

  # decode_conv2 = upsample_layer(decode_conv3,
  #                               ksize=(2,2),
  #                               interp='nearest')

  prediction = conv_layer(
      decode_conv4,'dconv1',
      shape=[k_size, k_size, num_filters, 1],
      padding='SAME', batch_norm=batch_norm,
      max_pool=False, activation_fn=fin_activation_fn,
      is_training=is_training)

  return prediction

# linear_AE_skip_connection

In [0]:
def build_AE_linear_skip(x_input,k_size,num_filters,
                         activation_fn,fin_activation_fn,
                         is_training=False,
                         verbose=False,batch_norm=False):
     '''Build a denoising autoencoder network similar 
      to Denoising Autoencoder proposed by L.Gondara.
      With added skip connections from encoder to decoder
    Args:
        x_input: Tensor [batch, height, width, in_channel]
        k_size: array/list [filter_height, filter_width]
        num_filters: scalar - starting number of filters
        activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
        fin_activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
    Returns:
        prediction: Tensor of the same shape as x_input
    '''
  # encoder bit
  encode_conv1 = conv_layer(
      x_input,'econv1', 
      shape=[k_size, k_size, 1,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=True, activation_fn=activation_fn,
      is_training=is_training)
  
  encode_conv2 = conv_layer(
      encode_conv1,'econv2',
      shape=[k_size,k_size,num_filters,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=True, activation_fn=activation_fn,
      is_training=is_training)
  
  encode_conv3 = conv_layer(
      encode_conv2,'econv3',
      shape=[k_size,k_size,num_filters,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=True, activation_fn=activation_fn,
      is_training=is_training)
  
  encode_conv4 = conv_layer(
      encode_conv3,'econv4',
      shape=[k_size,k_size,num_filters,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=False, activation_fn=activation_fn,
      is_training=is_training)
  
  # decoder bit
  decode_conv6 = upsample_layer(encode_conv4,
                                ksize=(2,2),
                                interp='nearest')

  decode_conv5 = conv_layer(
      tf.concat([decode_conv6,encode_conv2],axis=-1),
      'dconv5',
      shape=[k_size,k_size,2*num_filters,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=False, activation_fn=activation_fn,
      is_training=is_training)

  decode_conv4 = upsample_layer(decode_conv5,
                                ksize=(2,2),
                                interp='nearest')
  
  decode_conv3 = conv_layer(
      tf.concat([decode_conv4,encode_conv1],axis=-1),
      'dconv3',
      shape=[k_size,k_size,2*num_filters,num_filters],
      padding='SAME', batch_norm=batch_norm,
      max_pool=False, activation_fn=activation_fn,
      is_training=is_training)

  decode_conv2 = upsample_layer(decode_conv3,
                                ksize=(2,2),
                                interp='nearest')

  prediction = conv_layer(
      decode_conv2,x_input, 'dconv1',
      shape=[k_size,k_size,num_filters,1],
      padding='SAME', batch_norm=batch_norm,
      max_pool=False, activation_fn=fin_activation_fn,
      is_training=is_training)

  return prediction

# original quadratic autoencoder

In [0]:
   ''' Quadratic autoencoder code provided by
       Fenglei Fan as part of their work on 
       'Quadratic autoencoder (q-ae) for low-dose 
       ct denoising' .IEEE Transactions on 
       Medical Imaging,pages 1–1, 2019
    '''

def bias_variable(shape):
    initial = tf.constant(
        -0.02, shape=shape,dtype=tf.float32)
    return tf.Variable(initial)

def weight_variable_Wr(shape):
    initial = tf.truncated_normal(shape, stddev=0.1,dtype=tf.float32)
    return tf.Variable(initial)

def weight_variable_Wg(shape):
    initial = tf.truncated_normal(shape, stddev=0.1,dtype=tf.float32)
    return tf.Variable(initial)

def weight_variable_Wb(shape):
    initial = tf.truncated_normal(shape, stddev=0.01,dtype=tf.float32)
    return tf.Variable(initial)

def conv2d_valid(x, W):
    return tf.nn.conv2d(
        x, W, strides=[1, 1, 1, 1], 
        padding='VALID')

def conv2d_same(x, W):
    return tf.nn.conv2d(
        x, W, strides=[1, 1, 1, 1],
        padding='SAME')

def deconv2d_valid(x,W,outputshape):
    return tf.nn.conv2d_transpose(
        x,W,output_shape=outputshape,
        strides= [1,1,1,1],
        padding = 'VALID')

def deconv2d_same(x,W,outputshape):
    return tf.nn.conv2d_transpose(
        x,W,output_shape=outputshape,
        strides= [1,1,1,1],
        padding = 'SAME')

def Quad_deconv_layer_valid_linear(
    input, shape, outputshape):
    
    W_r = weight_variable_Wr(shape)
    #W_g = weight_variable_Wg(shape)
    W_g = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))
    #W_b = weight_variable_Wb(shape)
    W_b = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))
    
    b_r = tf.Variable(tf.constant(
        0, shape=[shape[2]],dtype=tf.float32))
    b_g = tf.Variable(tf.constant(
        1, shape=[shape[2]],dtype=tf.float32))
    c = tf.Variable(tf.constant(
        0, shape=[shape[2]],dtype=tf.float32))
    
    return (deconv2d_valid(input, W_r, outputshape)+b_r)*\
      (deconv2d_valid(input, W_g,outputshape)+b_g)+\
      deconv2d_valid(input*input, W_b,outputshape)+c

def Quad_deconv_layer_same_linear(
    input, shape, outputshape):
    
    W_r = weight_variable_Wr(shape)
    #W_g = weight_variable_Wg(shape)
    W_g = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))
    #W_b = weight_variable_Wb(shape)
    W_b = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))
    
    b_r = tf.Variable(tf.constant(
        0, shape=[shape[2]],dtype=tf.float32))
    b_g = tf.Variable(tf.constant(
        1, shape=[shape[2]],dtype=tf.float32))
    c = tf.Variable(tf.constant(
        0, shape=[shape[2]],dtype=tf.float32))
    
    return (deconv2d_same(input, W_r, outputshape)+b_r)*\
      (deconv2d_same(input, W_g,outputshape)+b_g)+\
      deconv2d_same(input*input, W_b,outputshape)+c

def Quad_deconv_layer_same(
    input, shape, outputshape,activation_fn):
    
    W_r = weight_variable_Wr(shape)
    #W_g = weight_variable_Wg(shape)
    W_g = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))
    #W_b = weight_variable_Wb(shape)
    W_b = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))
    
    b_r = tf.Variable(tf.constant(
        0, shape=[shape[2]],dtype=tf.float32))
    b_g = tf.Variable(tf.constant(
        1, shape=[shape[2]],dtype=tf.float32))
    c = tf.Variable(tf.constant(
        0, shape=[shape[2]],dtype=tf.float32))
    
    return activation_fn(
        (deconv2d_same(input, W_r, outputshape)+b_r)*\
        (deconv2d_same(input, W_g,outputshape)+b_g)+\
        deconv2d_same(input*input, W_b,outputshape)+c)


def Quad_conv_layer_valid(
    input, shape,activation_fn):
    
    W_r = weight_variable_Wr(shape)
    #W_g = weight_variable_Wg(shape)
    W_g = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))
    #W_b = weight_variable_Wb(shape)
    W_b = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))  
    # W_b can also be initialized by 
    # Gaussian with samll variance

    b_r = tf.Variable(tf.constant(
        0, shape=[shape[3]],dtype=tf.float32))
    b_g = tf.Variable(tf.constant(
        1, shape=[shape[3]],dtype=tf.float32))
    c = tf.Variable(tf.constant(
        0, shape=[shape[3]],dtype=tf.float32))
    
    return activation_fn(
        (conv2d_valid(input, W_r)+b_r)*\
        (conv2d_valid(input, W_g)+b_g)+\
        conv2d_valid(input*input, W_b)+c) 

def Quad_conv_layer_same(
    input, shape,activation_fn):
    
    W_r = weight_variable_Wr(shape)
    #W_g = weight_variable_Wg(shape)
    W_g = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))
    #W_b = weight_variable_Wb(shape)
    W_b = tf.Variable(tf.constant(
        0, shape=shape,dtype=tf.float32))
    
    b_r = tf.Variable(tf.constant(
        0, shape=[shape[3]],dtype=tf.float32))
    b_g = tf.Variable(tf.constant(
        1, shape=[shape[3]],dtype=tf.float32))
    c = tf.Variable(tf.constant(
        0, shape=[shape[3]],dtype=tf.float32))
    
    return activation_fn(
        (conv2d_same(input, W_r)+b_r)*\
        (conv2d_same(input, W_g)+b_g)+\
        conv2d_same(input*input, W_b)+c) 


In [0]:
def encoder_QAE_original(x_input,k_size,num_filters,
            activation_fn,is_training=False,
						verbose=False):

	encode_conv1 = Quad_conv_layer_same(
			x_input,
			shape=[k_size, k_size,1,num_filters],
			activation_fn=activation_fn)
 
	encode_conv2 = Quad_conv_layer_same(
			encode_conv1,
			shape=[k_size,k_size,num_filters,num_filters],
			activation_fn=activation_fn)
 
	encode_conv3 = Quad_conv_layer_same(
			encode_conv2,
			shape=[k_size,k_size,num_filters,num_filters],
			activation_fn=activation_fn)
 
	encode_conv4 = Quad_conv_layer_same(
			encode_conv3,
			shape=[k_size,k_size,num_filters,num_filters],
			activation_fn=activation_fn)
 
	encode_conv5 = Quad_conv_layer_valid(
			encode_conv4,
			shape=[k_size,k_size,num_filters,num_filters],
			activation_fn=activation_fn)

	layer_dict = {'encode_1':encode_conv1,
		'encode_2':encode_conv2,
		'encode_3':encode_conv3,
		'encode_4':encode_conv4,
		'encode_5':encode_conv5}

	return encode_conv5, layer_dict


def decoder_QAE_original(d_input,layer_dict,k_size,
            num_filters,activation_fn,
						is_training=False,verbose=False):
	
	decode_conv4 = activation_fn(
		Quad_deconv_layer_valid_linear(
			d_input,
			shape=[k_size,k_size,num_filters,num_filters],
			outputshape=tf.shape(
				layer_dict['encode_4']))+
		layer_dict['encode_4'])
 
	decode_conv3 = Quad_deconv_layer_same(
		decode_conv4,
		shape=[k_size,k_size,num_filters,num_filters],
		outputshape=tf.shape(
			layer_dict['encode_3']),
			activation_fn=activation_fn)
 
	decode_conv2 = activation_fn(
		Quad_deconv_layer_same_linear(
			decode_conv3,
			shape=[k_size,k_size,num_filters,num_filters],
			outputshape=tf.shape(
				layer_dict['encode_2']))+
		layer_dict['encode_2'])
 
	decode_conv1 = Quad_deconv_layer_same(
		decode_conv2,
		shape=[k_size,k_size,num_filters,num_filters],
		outputshape=tf.shape(
			layer_dict['encode_1']),
			activation_fn=activation_fn)

	return decode_conv1


def build_QAE_original(x_input,k_size,num_filters,
              activation_fn,fin_activation_fn,
							batch_norm=False,
							is_training=False,
							verbose=False):
	
	encoder_out, layer_dict = encoder_QAE_original(
			x_input=x_input,k_size=k_size,
			num_filters=num_filters,
			activation_fn=activation_fn,
			is_training=is_training, verbose=verbose)

	decoder_out = decoder_QAE_original(
			d_input=encoder_out,layer_dict=layer_dict,
			k_size=k_size,num_filters=num_filters,
			activation_fn=activation_fn,
			is_training=is_training, verbose=verbose)

	prediction = fin_activation_fn(
			Quad_deconv_layer_same_linear(
					decoder_out,
					shape=[k_size, k_size,1,num_filters],
					outputshape=tf.shape(x_input))+x_input)

	return prediction

# quadratic autoencoder (QAE)

In [0]:
def Quad_conv_same(input,layer_name,shape,
                   activation_fn,
                   max_pool=False,
                   batch_norm=False,
                   is_training=False):
    '''Creates a quadratic conv/ conv+maxpool/ 
      conv+batchnorm or conv+maxpool+batchnorm 
      layer based on arguments
      quad_conv = (conv(input,w_r)+b_r)*
                  (conv(input,w_g)+b_g)+
                  (conv(input,w_b)+c)
    Args:
        input: Tensor [batch, height, width, in_channel]
        shape: array/list [filter_height, filter_width,
                            in_channel, out_channel]
        padding: string 'SAME' or 'VALID'
        activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
    Returns:
        Tensor with out_channel channels and dimensions
        might vary with padding type
    '''
  
    with tf.variable_scope(layer_name):
      w_init = tf.keras.initializers.TruncatedNormal(
          stddev=0.1,dtype=tf.float32)
      # w_init = tf.keras.initializers.glorot_normal

      W_r = tf.get_variable(
          'w_r',shape=shape,initializer=w_init)
      b_r = tf.get_variable(
          'b_r',shape=[shape[3]],
          initializer=tf.constant_initializer(0.0))
      
      W_g = tf.get_variable(
          'w_g',shape=shape,
          initializer=tf.constant_initializer(0.0))
      b_g = tf.get_variable(
          'b_g',shape=[shape[3]],
          initializer=tf.constant_initializer(1.0))
      
      W_b = tf.get_variable(
          'w_b',shape=shape,
          initializer=tf.constant_initializer(0.0))
      c = tf.get_variable(
          'c',shape=[shape[3]],
          initializer=tf.constant_initializer(0.0))
      
      conv_1 = tf.nn.conv2d(
          input, W_r, strides=[1, 1, 1, 1],
          padding='SAME',name='conv_2d_1')
      conv_1 = tf.nn.bias_add(conv_1,b_r)

      conv_2 = tf.nn.conv2d(
          input, W_g, strides=[1, 1, 1, 1],
          padding='SAME',name='conv_2d_2')
      conv_2 = tf.nn.bias_add(conv_2,b_g)

      conv_3 = tf.nn.conv2d(
          input, W_b, strides=[1, 1, 1, 1],
          padding='SAME',name='conv_2d_3')
      conv_3 = tf.nn.bias_add(conv_3,c)

      quad_conv_out = conv_1 * conv_2 + conv_3

      if max_pool:
        quad_conv_out = tf.nn.max_pool2d(
            quad_conv_out,ksize=[1,2,2,1],
            strides=[1,2,2,1],padding='VALID')
        
      if batch_norm:
        quad_conv_out = tf.layers.batch_normalization(
            quad_conv_out, scale=True,
            center=True, training=is_training)

      return activation_fn(quad_conv_out) 


In [0]:
def encoder_QAE(x_input,k_size,
                    num_filters,
                    activation_fn,
                    is_training=False,
                    verbose=False,
                    batch_norm=False):
	# encoder chunck
	encode_conv1 = Quad_conv_same(
      x_input,'econv1',
      shape=[k_size,k_size,1,num_filters],
      activation_fn=activation_fn,max_pool=True,
      batch_norm=batch_norm,is_training=is_training)
 
	encode_conv2 = Quad_conv_same(
      encode_conv1,'econv2',
      shape=[k_size,k_size,num_filters,2*num_filters],
      activation_fn=activation_fn,
      max_pool=True,batch_norm=batch_norm,
      is_training=is_training)
 
	encode_conv3 = Quad_conv_same(
      encode_conv2,'econv3',
      shape=[k_size,k_size,2*num_filters,4*num_filters],
      activation_fn=activation_fn,
      max_pool=True,batch_norm=batch_norm,
      is_training=is_training)

	layer_dict = {'encode_1':encode_conv1,
		'encode_2':encode_conv2,
		'encode_3':encode_conv3}

	return encode_conv3, layer_dict

def decoder_QAE(d_input,layer_dict,
                       k_size,num_filters,
                       activation_fn,
                       is_training=False,
                       verbose=False,
                       batch_norm=False):
  
  upsample_3 = upsample_layer(d_input,
                              ksize=(2,2),
                              interp='nearest')

  decode_conv3 = Quad_conv_same(
      upsample_3,'dconv3',
      shape=[k_size,k_size,4*num_filters,2*num_filters],
      activation_fn=activation_fn,
      max_pool=False,batch_norm=batch_norm,
      is_training=is_training)
  
  upsample_2 = upsample_layer(decode_conv3,
                              ksize=(2,2),
                              interp='nearest')
  
  decode_conv2 = Quad_conv_same(
      upsample_2,'dconv2',
      shape=[k_size,k_size,2*num_filters,num_filters],
      activation_fn=activation_fn,
      max_pool=False,batch_norm=batch_norm,
      is_training=is_training)
  
  upsample_1 = upsample_layer(decode_conv2,
                              ksize=(2,2),
                              interp='nearest')

  return upsample_1



def build_QAE(x_input,k_size,
              num_filters,activation_fn,
              fin_activation_fn,
              is_training=False,
              verbose=False,
              batch_norm=False):
  
  '''Build a channel doubling quadratic 
    autoencoder
    Args:
        x_input: Tensor [batch, height, width, in_channel]
        k_size: array/list [filter_height, filter_width]
        num_filters: scalar - starting number of filters
        activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
        fin_activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
    Returns:
        prediction: Tensor of the same shape as x_input
    '''
  encoder_out, layer_dict = encoder_QAE(
      x_input=x_input,k_size=k_size,
      num_filters=num_filters,
      activation_fn=activation_fn,
      is_training=is_training, verbose=verbose,
      batch_norm=batch_norm)
  
  decoder_out = decoder_QAE(
      d_input=encoder_out,layer_dict=layer_dict,
      k_size=k_size,num_filters=num_filters,
      activation_fn=activation_fn,
      is_training=is_training,
      verbose=verbose, batch_norm=batch_norm)
  
  prediction = Quad_conv_same(
      decoder_out,'dconv1',
      shape=[k_size,k_size,num_filters,1],
      activation_fn=fin_activation_fn,
      max_pool=False,batch_norm=batch_norm,
      is_training=is_training)
   
  return prediction
	

# linear autoencoder (QAE architecture)

In [0]:
def encoder_linear(x_input,k_size,
                    num_filters,
                    activation_fn,
                    is_training=False,
                    verbose=False,
                    batch_norm=False):
	
  encode_conv1 = conv_layer(
      x_input,'econv1',
      shape=[k_size,k_size,1,num_filters],
      padding='SAME',batch_norm=batch_norm,
      max_pool=True, activation_fn=activation_fn,
      is_training=is_training)
  
  encode_conv2 = conv_layer(
      encode_conv1,'econv2',
      shape=[k_size,k_size,num_filters,2*num_filters],
      padding='SAME',batch_norm=batch_norm,
      max_pool=True,activation_fn=activation_fn,
      is_training=is_training)
  
  encode_conv3 = conv_layer(
      encode_conv2,'econv3',
      shape=[k_size,k_size,2*num_filters,4*num_filters],
      padding='SAME',batch_norm=batch_norm,
      max_pool=True,activation_fn=activation_fn,
      is_training=is_training)

  layer_dict = {'encode_1':encode_conv1,
               'encode_2':encode_conv2,
               'encode_3':encode_conv3}

  return encode_conv3, layer_dict

def decoder_linear(d_input,layer_dict,
                   k_size,num_filters,
                   activation_fn,
                   is_training=False,
                   verbose=False,
                   batch_norm=False):

  upsample_3 = upsample_layer(d_input,
                              ksize=(2,2),
                              interp='nearest')

  decode_conv3 = conv_layer(
      upsample_3,'dconv3',
      shape=[k_size,k_size,4*num_filters,2*num_filters],
      activation_fn=activation_fn,
      max_pool=False,batch_norm=batch_norm,
      is_training=is_training)
  
  upsample_2 = upsample_layer(decode_conv3,
                              ksize=(2,2),
                              interp='nearest')
  
  decode_conv2 = conv_layer(
      upsample_2,'dconv2',
      shape=[k_size,k_size,2*num_filters,num_filters],
      activation_fn=activation_fn,
      max_pool=False,batch_norm=batch_norm,
      is_training=is_training)
  
  upsample_1 = upsample_layer(decode_conv2,
                              ksize=(2,2),
                              interp='nearest')

  return upsample_1



def build_LAE(x_input,k_size,
                     num_filters,
                     activation_fn,
                     fin_activation_fn,
                     is_training=False,
                     verbose=False,
                     batch_norm=False):
  '''Build a channel doubling linear neuron autoencoder
    with the same architecture as QAE
    Args:
        x_input: Tensor [batch, height, width, in_channel]
        k_size: array/list [filter_height, filter_width]
        num_filters: scalar - starting number of filters
        activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
        fin_activation_fn: function -None, tf.nn.relu,
                                tf.nn.tanh
    Returns:
        prediction: Tensor of the same shape as x_input
    '''
  encoder_out, layer_dict = encoder_linear(
      x_input=x_input,k_size=k_size,
      num_filters=num_filters,
      activation_fn=activation_fn,
      is_training=is_training,
      verbose=verbose, batch_norm=batch_norm)
  
  decoder_out = decoder_linear(
      d_input=encoder_out,
      layer_dict=layer_dict,
      k_size=k_size, num_filters=num_filters,
      activation_fn=activation_fn,
      is_training=is_training,
      verbose=verbose, batch_norm=batch_norm)
  
  prediction = conv_layer(
      decoder_out,'dconv1',
      shape=[k_size,k_size,num_filters,1],
      activation_fn=fin_activation_fn,
      max_pool=False,batch_norm=batch_norm,
      is_training=is_training)
   
  return prediction
	

# Training Script

In [0]:
class AttrDict(dict):
	# Helper class for argument management
    def __init__(self, *args, **kwargs):
        super(AttrDict, self).__init__(
						*args, **kwargs)
        self.__dict__ = self

def train(args):

	# Determine dataset size
	if 'augmented' in args.dataset:
		train_size = 750
	else:
		train_size = 300

	# Per epoch and total iterations
	epoch_iter = train_size//args.batch_size
	iterations = args.epochs*epoch_iter

	# Path to this .ipynb document as well as data
	# current_dir = Path('.')/'drive'/'My Drive'/'JEB1433_Project'
	current_dir = Path('.')

	# Checkpoint save paths
	if args.save_model and args.exp_name is 'grid_search':
		expt_folder = current_dir/args.exp_name
		expt_folder.mkdir(parents=True,exist_ok=True)
		
		sub_folder= '_lr_{0:.5f}_bs_{1:2d}'.format(
				args.learn_rate,args.batch_size)
		ckpt_folder = expt_folder/sub_folder
		ckpt_folder.mkdir(parents=True,exist_ok=True)
	
	elif args.save_model and args.exp_name is not None:
		ckpt_folder = current_dir/args.exp_name
		ckpt_folder.mkdir(parents=True,exist_ok=True)
	
	else:
		ckpt_folder = current_dir

	ckpt_path = str(ckpt_folder.absolute())+'/'

	#load train and test datasets
	data_dir = current_dir/'data'
	train_true,val_true,test_true=load_data(
			dataset=args.dataset,DIR=data_dir)
	
	if args.verbose:
		print(args.dataset+' datasets loaded...')

	# repeat, batch and prefetch all datasets
	train_true = train_true.repeat().batch(
			args.batch_size)
	train_true = train_true.prefetch(
			buffer_size=tf.data.experimental.AUTOTUNE)
 
	val_true = val_true.repeat().batch(
			args.batch_size)
	val_true = val_true.prefetch(
			buffer_size=tf.data.experimental.AUTOTUNE)
	
	test_true = test_true.batch(
			args.batch_size)
	test_true = test_true.prefetch(
			buffer_size =tf.data.experimental.AUTOTUNE)
	
	if args.verbose:
		print('Batches of {} created...'.format(args.batch_size))
 
	#create iterators for all datasets
	iter_train_handle = train_true.make_one_shot_iterator().string_handle()
	iter_val_handle = val_true.make_one_shot_iterator().string_handle()
	iter_test_handle = test_true.make_one_shot_iterator().string_handle()

	handle = tf.placeholder(tf.string, shape=[])

	iterator = Iterator.from_string_handle(handle,
		tf.compat.v1.data.get_output_types(train_true),
		tf.compat.v1.data.get_output_shapes(train_true))

	next_batch = iterator.get_next()


	#load selected noise type for image preprocessing
	noise_args = {'proportion':args.noise_proportion}
	if args.noise_type == 'random':
		noise_fn = random_noise
		noise_args['std_range'] = args.noise_std_range
		noise_args['lam_range'] = args.noise_lam_range
	elif args.noise_type == 'poisson':
		noise_fn = poisson_noise
		noise_args['lam'] = args.noise_lam
	else:
		noise_fn = gaussian_noise
		noise_args['mean'] = args.noise_mean
		noise_args['std'] = args.noise_std
	
	if args.verbose:
		print('Noise type: '+args.noise_type+\
		      '. Parameters',noise_args)

	#add noise to images and cast to float for training
	true_img = tf.placeholder(tf.uint8, 
		shape=[args.batch_size, 64, 64, 1])
	noised_img = noise_fn(**noise_args,
		image=true_img)
	model_input = tf.cast(noised_img,
		dtype=tf.float32)
	
	# model output scaling factor based on final activation
	output_scale_factor = 1.0
	output_shift = 0.0
	if args.final_activation=='tanh':
		fin_activation_fn = tf.keras.activations.tanh
		output_scale_factor = 127.5
		output_shift = 1.0
	elif args.final_activation=='sigmoid':
		fin_activation_fn = tf.keras.activations.sigmoid
		output_scale_factor = 255.0
	else:
		fin_activation_fn = tf.nn.relu

	if args.activation=='tanh':
		activation_fn = tf.keras.activations.tanh
	elif args.activation=='sigmoid':
		activation_fn = tf.keras.activations.sigmoid
	else:
		activation_fn = tf.nn.relu
	
	if args.verbose:
		print('Model activation function:',activation_fn.__name__)
		print('Model output activation function:',fin_activation_fn.__name__)
 
	#SELECT MODEL TO LOAD
	with tf.device('/device:GPU:0'):
		if args.model=='DAE':
			model = build_DAE
		elif args.model=='LAE':
			model = build_LAE
		elif args.model=='QAE_original':
			model = build_QAE_original
			args.batch_norm = False
		elif args.model=='QAE':
			model = build_QAE
		else:
			print('selected model does not exist')
			return		

		with tf.variable_scope(args.model):
			denoised_train_img = model(
					model_input,k_size=args.kernel_size,
					activation_fn=activation_fn,
					fin_activation_fn=fin_activation_fn,
					num_filters=args.num_filters,
					batch_norm=args.batch_norm,
					is_training=True)
		with tf.variable_scope(args.model,reuse=tf.AUTO_REUSE):
			denoised_val_img = model(
					model_input,k_size=args.kernel_size,
					activation_fn=activation_fn,
					fin_activation_fn=fin_activation_fn,
					num_filters=args.num_filters,
					batch_norm=args.batch_norm,
					is_training=False)

	if args.verbose:
		print('Model loaded: '+args.model)
	

	#load selected loss functions, selection of l2 and ssim		
	train_l2_loss, train_ssim_loss = 0.0, 0.0
	val_l2_loss, val_ssim_loss = 0.0, 0.0
	l2_loss_weight, ssim_loss_weight = 0.0, 0.0

	scaled_denoised_train_image = output_scale_factor*(
					output_shift+denoised_train_img)

	scaled_denoised_val_image = output_scale_factor*(
					output_shift+denoised_val_img)

	if 'l2' in args.loss_type:
		l2_loss_weight = 1.0
		train_l2_loss = l2_loss(
				tf.cast(true_img,dtype=tf.float32),
				scaled_denoised_train_image)
		val_l2_loss = l2_loss(
				tf.cast(true_img,dtype=tf.float32),
				scaled_denoised_val_image)	
	if 'ssim' in args.loss_type:
		ssim_loss_weight = 1.0
		train_ssim_loss = ssim_loss(
				tf.cast(true_img,dtype=tf.float32),
				scaled_denoised_train_image,max_val=255)
		val_ssim_loss = ssim_loss(
				tf.cast(true_img,dtype=tf.float32),
				scaled_denoised_val_image,max_val=255)
		
	
	l2_loss_weight = l2_loss_weight/(
			len(args.loss_type)*args.batch_size*2048*255*255)
	ssim_loss_weight = ssim_loss_weight/\
		len(args.loss_type)

	total_train_loss = l2_loss_weight*train_l2_loss+\
		ssim_loss_weight*train_ssim_loss

	total_val_loss = l2_loss_weight*val_l2_loss+\
		ssim_loss_weight*val_ssim_loss

	if args.verbose:
		print('Training losses: ',args.loss_type)
	
	# Evaluation Metrics

	noisy_l2_loss = l2_loss(
			tf.cast(true_img,dtype=tf.float32),
			tf.cast(noised_img,dtype=tf.float32))
	noisy_ssim_loss = ssim_loss(
			tf.cast(true_img,dtype=tf.float32),
			tf.cast(noised_img,dtype=tf.float32),
			max_val=255)
	noisy_metric_psnr = compute_psnr(
			tf.cast(true_img,dtype=tf.float32),
			tf.cast(noised_img,dtype=tf.float32))

	test_metric_l2 = l2_loss(
			tf.cast(true_img,dtype=tf.float32),
			scaled_denoised_val_image)
	test_metric_ssim = 1.0 - ssim_loss(
			tf.cast(true_img,dtype=tf.float32),
			scaled_denoised_val_image,max_val=255)
	test_metric_psnr = compute_psnr(
			tf.cast(true_img,dtype=tf.float32),
			scaled_denoised_val_image)
 
	total_noisy_loss = l2_loss_weight*noisy_l2_loss+\
		ssim_loss_weight*noisy_ssim_loss

	#load optimizer for loss minimization
	if args.optimizer=='adam':
		optimizer = tf.train.AdamOptimizer(
				learning_rate=args.learn_rate).minimize(
				total_train_loss)
	elif args.optimizer=='gd':
		optimizer = tf.train.GradientDescentOptimizer(
				learning_rate=args.learn_rate).minimize(
				total_train_loss)
	elif args.optimizer=='momentum':
		optimizer = tf.train.MomentumOptimizer(
				learning_rate=args.learn_rate,
				momentum=0.9).minimize(total_train_loss)
	else:
		print('Selected optimizer not available')
		return
	
	if args.verbose:
		print('Training optimzer:',args.optimizer)	

	# tf summaries to keep track of losses
	tf.summary.scalar('train_l2_loss',train_l2_loss)
	tf.summary.scalar('train_ssim_loss',train_ssim_loss)
	tf.summary.scalar('total_train_loss',total_train_loss)
	tf.summary.scalar('val_l2_loss',val_l2_loss)
	tf.summary.scalar('val_ssim_loss',val_ssim_loss)
	tf.summary.scalar('total_val_loss',total_val_loss)
	merged_summary = tf.summary.merge_all()
	

	with tf.Session().as_default() as sess:
		if args.exp_name is not None:
			train_writer = tf.summary.FileWriter(
					current_dir/args.exp_name)
		else:
			train_writer = tf.summary.FileWriter(
					current_dir/'train_data')

		# initialize all global and local variables
		init_vars = tf.group(
				tf.global_variables_initializer(),
				tf.local_variables_initializer())

		saver = tf.train.Saver()
		
		if args.verbose:
			train_params_count = np.sum(
					[np.prod(v.get_shape().as_list())
		 			for v in tf.trainable_variables()])
			print('Trainable parameters: {}'.\
			      format(train_params_count))
			# for v in tf.trainable_variables():
			# 	print(v.name+'. shape:{}'.format(v.shape))
			# return
			print('')
	 
		global_step = tf.train.get_global_step()

		handle_train,handle_val,handle_test = sess.run(
			[iter_train_handle, iter_val_handle,
			 iter_test_handle])

		sess.run(init_vars)

		train_loss, val_loss = [], []

		for step in range(iterations+1):
			train_true_img = sess.run(next_batch,
				feed_dict={handle: handle_train})

			val_true_img = sess.run(next_batch,
				feed_dict={handle: handle_val})

			_,t_loss,train_noise_loss = sess.run(
					[optimizer,total_train_loss,total_noisy_loss],
					feed_dict={true_img:train_true_img})
			
			v_loss,val_noise_loss = sess.run(
					[total_val_loss,total_noisy_loss],
					feed_dict = {true_img:val_true_img})
			
			if step%epoch_iter == 0:
				train_loss.append(t_loss)
				val_loss.append(v_loss)
			# train_writer.add_summary(t_summ,step)

			if args.verbose and step%epoch_iter == 0:
				print('Epoch:{0:3d}/{1}, '\
				      'Training Loss {2:.5f},...'\
							'Noise Loss {3:.5f}'.format(
									step//epoch_iter,args.epochs,
									t_loss,train_noise_loss))
				print('Epoch:{0:3d}/{1}, '\
				      'Validation Loss {2:.5f},...'\
							'Noise Loss {3:.5f}'.format(
									step//epoch_iter,args.epochs,
									v_loss,val_noise_loss))
		
			if args.visualize and step%(args.viz_every_epoch*epoch_iter)==0:
				input_image = sess.run(
						model_input,
						feed_dict={true_img:val_true_img})
				output_image = sess.run(
						denoised_val_img,
						feed_dict={true_img:val_true_img})
				fig, axes = plt.subplots(
						nrows=1,ncols=3,figsize=(10,10))
				axes[0].axis('off')
				axes[1].axis('off')
				axes[2].axis('off')
				axes[0].imshow(val_true_img[0,:,:,0], cmap='gray')
				axes[1].imshow(input_image[0,:,:,0], cmap='gray')
				axes[2].imshow(output_image[0,:,:,0], cmap='gray')
				plt.draw()
		if args.plot:
			fig = plt.figure(frameon=True,figsize=(8,6))
			plt.plot(train_loss,label='Train loss')
			plt.plot(val_loss,label='Validation loss')
			plt.gca().tick_params(axis='both', which='major', labelsize=14)
			plt.xlabel('Epochs',fontsize=16)
			plt.ylabel('Total Loss',fontsize=16)
			plt.legend(loc=1,fontsize=14)
			plt.savefig(args.model+'_'+str(args.epochs)+\
			            '_train_plot.png',format='png',
			            bbox_inches='tight')#,pad_inches = 0)
		
		if args.save_model:
			saver.save(sess,ckpt_path)

		if args.run_inference:	
			test_summary = {}

			test_l2, test_ssim, test_psnr = [], [], []
			noisy_l2, noisy_ssim, noisy_psnr = [], [], []

			if args.verbose:
				print('Starting test procedure')
			for t_step in range(15):

				test_true_img = sess.run(
						next_batch,feed_dict={handle: handle_test})
				
				t_l2,t_ssim,t_psnr,in_l2,in_dssim,in_psnr = sess.run(
						[test_metric_l2,test_metric_ssim,
						test_metric_psnr,noisy_l2_loss,
						noisy_ssim_loss,noisy_metric_psnr],
						feed_dict = {true_img:test_true_img})

				if args.visualize and t_step==10:
					input_image = sess.run(
							model_input,
							feed_dict={true_img:test_true_img})
					output_image = sess.run(
							denoised_val_img,
							feed_dict={true_img:test_true_img})
					
					fig, axes = plt.subplots(
							nrows=1,ncols=3,figsize=(10,10))
					axes[0].axis('off')
					axes[1].axis('off')
					axes[2].axis('off')
					axes[0].imshow(test_true_img[0,:,:,0], cmap='gray')
					axes[1].imshow(input_image[0,:,:,0], cmap='gray')
					axes[2].imshow(output_image[0,:,:,0], cmap='gray')
					plt.draw()
					plt.savefig(args.model+'_'+str(args.epochs)+\
											'_test_images.png',format='png',
											bbox_inches='tight',pad_inches = 0)
				
				test_l2.append(t_l2)
				test_ssim.append(t_ssim)
				test_psnr.append(t_psnr)

				noisy_l2.append(in_l2)
				noisy_ssim.append(1.0-in_dssim)
				noisy_psnr.append(in_psnr)

			test_summary['test_l2']=np.mean(test_l2)
			test_summary['test_ssim']=np.mean(test_ssim)
			test_summary['test_psnr']=np.mean(test_psnr)
			test_summary['noisy_l2']=np.mean(noisy_l2)
			test_summary['noisy_ssim']=np.mean(noisy_ssim)
			test_summary['noisy_psnr']=np.mean(noisy_psnr)

			return test_summary
		else:
			return train_loss,val_loss
	

# run training script

In [0]:
args = AttrDict()
model_list = ['QAE','QAE_original',\
              'LAE','DAE']
args_dict = { 
              'model':model_list[2],
              'kernel_size':3, 
              'num_filters':16, 
              'learn_rate':6e-4,
              'optimizer': 'adam',
              'activation':'tanh',
              'final_activation':'tanh',
              'batch_norm':False, 
              'batch_size':10, 
              'epochs':100,
              'plot':True,
              'visualize': True,
              'viz_every_epoch':10, 
              'noise_type':'gaussian',
              'noise_proportion':0.2,
              'noise_mean':0,
              'noise_std':1,
              'noise_lam':1,
              'noise_std_range':[1,5],
              'noise_lam_range':[1,5],
              'loss_type':['l2'],
              'dataset':'dental_augmented',
              'verbose':True,
              'exp_name': 'test',
              'save_model': False,
              'run_inference': True,
             
}
args.update(args_dict)
# tf.reset_default_graph()
# test_results = train(args)
# plt.show()

In [0]:
def test_losses():
  models = ['QAE','LAE']
  losses = ['l2','ssim']
  args.run_inference = True
  runs = 5
  summary = {}
  args.verbose=False
  args.plot=False
  args.visualize=False

  result_summ = {}
  for mod in models:
    args.model=mod
    for loss in losses:
      args.loss_type=[loss]
      noise_psnr, noise_ssim = 0.0, 0.0 
      test_psnr, test_ssim = 0.0, 0.0,
      for i in range(runs):
        tf.reset_default_graph()
        results = train(args)
        noise_psnr = noise_psnr + results['noisy_psnr']
        noise_ssim = noise_ssim + results['noisy_ssim']
        test_psnr = test_psnr + results['test_psnr']
        test_ssim = test_ssim + results['test_ssim']
      result_summ[mod+'_'+loss] = [test_psnr/runs, test_ssim/runs]
      result_summ[mod+'_'+loss+'_noise'] = [noise_psnr/runs, noise_ssim/runs]
      print('Done with '+mod+'_'+loss)
  return result_summ

test_outputs = test_losses()

Done with QAE_l2
Done with QAE_ssim
Done with LAE_l2
Done with LAE_ssim


In [0]:
test_outputs

{'LAE_l2': [18.676711654663087, 0.5734446048736572],
 'LAE_l2_noise': [14.406198883056641, 0.3587961260477702],
 'LAE_ssim': [17.16096725463867, 0.6681626319885254],
 'LAE_ssim_noise': [14.408073997497558, 0.3585657143592834],
 'QAE_l2': [20.20344772338867, 0.6476222395896911],
 'QAE_l2_noise': [14.386047554016113, 0.35867895126342775],
 'QAE_ssim': [18.381641387939453, 0.7258187294006347],
 'QAE_ssim_noise': [14.38495750427246, 0.35857723553975424]}

In [0]:
def run_simple_models():
  models = ['DAE','QAE','LAE','QAE']
  runs = 5
  summary = {}
  args.verbose=False
  args.plot=False
  args.visualize=False
  args.loss_type=['l2']

  for j,model in enumerate(models):
    args.model=model
    if model=='DAE':
      args.num_filters = 64
      args.epochs = 100
    elif j==3:
      args.num_filters = 16
      args.epochs = 50
    else:
      args.num_filters = 8
      args.epochs = 100
    results = 0.0
    for i in range(runs):
      tf.reset_default_graph()
      results = results + np.fromiter(train(args).values(),dtype='float32')
    print('Done with model: '+model)
    summary[model+str(j)]= results.reshape([2,3])/runs
  
  return summary

simple_test_summary = run_simple_models()

Done with model: DAE
Done with model: QAE
Done with model: LAE
Done with model: QAE


In [0]:
simple_test_summary

{'DAE0': array([[1.4028989e+07, 6.4528197e-01, 1.9823662e+01],
        [4.8286948e+07, 3.5890284e-01, 1.4412954e+01]], dtype=float32),
 'LAE2': array([[2.4052856e+07, 5.2313924e-01, 1.7467979e+01],
        [4.8263640e+07, 3.5902062e-01, 1.4413872e+01]], dtype=float32),
 'QAE1': array([[1.5090723e+07, 6.1836278e-01, 1.9567976e+01],
        [4.8546536e+07, 3.5876462e-01, 1.4391904e+01]], dtype=float32),
 'QAE3': array([[1.4678182e+07, 6.2120342e-01, 1.9637081e+01],
        [4.8086248e+07, 3.5889530e-01, 1.4430010e+01]], dtype=float32)}

In [0]:
def grid_search():
  args.plot = False
  args.visualize = False
  args.epochs = 100
  args.verbose = False
  args.experiment_name = 'grid_search'

  l_rates = [1e-3,6e-4,3e-4,1e-4,6e-5,3e-5,1e-5]
  batch_sizes = [10,16,20,32]

  avg_loss = {}

  for alpha in l_rates:
    args.learn_rate = alpha
    for b_size in batch_sizes:
      args.batch_size = b_size

      tf.reset_default_graph()
      t_loss, v_loss = train(args)

      idx = 'lr_'+str(alpha)+':b_'+str(b_size)

      avg_loss[idx] = [t_loss,v_loss]
      print('Loss for learn_rate {0:.5f} and batch size {1:2d}, : {2:.6f},{3:.6f}'.format(
          alpha,b_size,np.mean(t_loss[-10:]),np.mean(v_loss[-10:])))
  return avg_loss
