In [None]:
import tensorflow as tf
from tensorflow.keras import Input, layers
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Conv2D,BatchNormalization,LeakyReLU
from tensorflow.keras.optimizers.schedules import PiecewiseConstantDecay
from tqdm.notebook import tqdm
import cv2
import numpy as np
import matplotlib.pyplot as plt
import argparse
import random
import math
import os

In [None]:
def norm(x):
    """
    Scale [0,1] to [-1,1] tensor
    """
    out = (x - 0.5) *2
    out = tf.clip_by_value(out, -1, 1)
    return out

def denorm(x):
    """
    Scale [-1,1] to [0,1] tensor
    """
    out = (x + 1) / 2
    out = tf.clip_by_value(out, 0, 1)
    return out

In [None]:
def np_to_tensor(x):
    """
    for a x*y*c [0,255] nparray x, return a transformed s*x*y*c [-1,1] tf.tensor
    """
    x = x[None,:,:,:]/255 #add a sample dimension, scale to [0,1]
    x = tf.convert_to_tensor(x, dtype=tf.float32)
    x = norm(x) #scale to [-1,1]
    return x

def tensor_to_np(x):
    """
    for a s*x*y*c [-1,1] tensor, generate its x*y*c [0,255] nparray
    """
    x = denorm(x[0]).numpy()*255
    x = x.astype(np.uint8)
    return x

def convert_image_np(inp):
    """
    for a s*x*y*c [-1,1] tensor to a x*y*c [0,1] np.array
    """
    inp = denorm(inp)#[-1,1] to [0,1]
    inp = inp.numpy()[0] # add a sample dimension
    return inp

In [None]:
def save_as_numpy(ts, name, dir):
    np.save(f"{dir}/{name}", ts.numpy())

def load_from_numpy(name, dir):
    np_f = np.load(f"{dir}/{name}")
    return tf.convert_to_tensor(np_f)

In [None]:
from skimage import io as img
def read_image_np(opt):
    """
    read the image and return the np.array form
    """
    x = img.imread(f'{opt.input_dir}/{opt.input_name}')
    return x

def read_image_tensor(opt):
    """
    read the image and return the [-1,1] tensor form
    """
    #read the image defined in opt then return to a tensor
    x = img.imread(f'{opt.input_dir}/{opt.input_name}')
    return np_to_tensor(x)

In [None]:
def save_vars(G, z, noiseamp, real, opt, scale_num):
    G.save(f"{opt.out_}/Gs_{scale_num}.tf")
    save_as_numpy(z, f"Zs_{scale_num}", opt.out_)
    save_as_numpy(real, f"reals_{scale_num}", opt.out_)
    np.save(f"{opt.out_}/NoiseAmp_{scale_num}", np.array(noiseamp))

def load_trained_pyramid(opt):
    """
    load every model trained 
    """
    #get the direction and
    dir = generate_dir2save(opt)
    Gs = []
    Zs = []
    reals = []
    NoiseAmp = []
    print(dir)
    if(os.path.exists(dir)):
        i = 0
        while(True):
            try:
                Zs.append(load_from_numpy(f"Zs_{i}.npy", dir))
                reals.append(load_from_numpy(f"reals_{i}.npy", dir))
                model = load_model(f'{dir}/Gs_{i}.tf')
                Gs.append(model)
                NoiseAmp.append(np.load(f'{dir}/NoiseAmp_{i}.npy').item())
                i += 1
            except:
                break
        return Gs,Zs,reals,NoiseAmp
    else:
        raise NameError('no appropriate trained model is exist, 4 empty lists are returned')

In [None]:
def generate_dir2save(opt):
    """
    manually defined directory
    """
    dir2save = f'{opt.out}/{opt.input_name}/layer={opt.num_layer}, additional_scale={bool(opt.additional_scale)}, iteration={opt.niter}, scale_factor={opt.scale_factor_init}, alpha={opt.alpha}'
    return dir2save

In [None]:
def generate_noise(size,num_samp=1):
    """
    generate the noise of size = [width, height, channel]
    """
    noise = tf.random.normal([num_samp, size[0], size[1], size[2]])
    return noise

In [None]:
def save_networks(netG,netD,z,opt):
    """
    save model netG, netD and z into the directory specified by opt
    """
    #to tf mode here
    netG.save(f'{opt.outf}/netG.tf')
    netD.save(f'{opt.outf}/netD.tf')
    netG.save_weights(f'{opt.outf}/netGweights.tf')
    netD.save_weights(f'{opt.outf}/netDweights.tf')
    save_as_numpy(z, 'z_opt', opt.outf)

In [None]:
def create_reals_pyramid(real,opt):
    """
    from image real, generate reals, the whole list of different scale image
    """
    reals = [imresize_scale_tensor(real,math.pow(opt.scale_factor, opt.stop_scale-i)) for i in range(0,opt.stop_scale+1)]
    return reals

In [None]:
def get_arguments():
    parser = argparse.ArgumentParser()
    
    #load, input, save configurations:
    parser.add_argument('--manualSeed', type=int, help='manual seed')
    parser.add_argument('--nc_z',type=int,help='noise # channels',default=3)
    parser.add_argument('--nc_im',type=int,help='image # channels',default=3)
    parser.add_argument('--out',help='output folder',default='Output')
        
    #networks hyper parameters:
    parser.add_argument('--nfc', type=int, default=32)
    parser.add_argument('--min_nfc', type=int, default=32)
    parser.add_argument('--ker_size',type=int,help='kernel size',default=3)
    parser.add_argument('--num_layer',type=int,help='number of layers',default=5)
    parser.add_argument('--stride',help='stride',default=1)
    parser.add_argument('--padd_size',type=int,help='net pad size',default=0)#math.floor(opt.ker_size/2)
        
    #pyramid parameters:
    parser.add_argument('--scale_factor',type=float,help='pyramid scale factor',default=0.75)#pow(0.5,1/6))
    parser.add_argument('--noise_amp',type=float,help='addative noise cont weight',default=0.1)
    parser.add_argument('--min_size',type=int,help='image minimal size at the coarser scale',default=25)
    parser.add_argument('--max_size', type=int,help='image minimal size at the coarser scale', default=250)

    #optimization hyper parameters:
    parser.add_argument('--niter', type=int, default=2000, help='number of epochs to train per scale')
    parser.add_argument('--gamma',type=float,help='scheduler gamma',default=0.1)
    parser.add_argument('--lr_g', type=float, default=0.0005, help='learning rate, default=0.0005')
    parser.add_argument('--lr_d', type=float, default=0.0005, help='learning rate, default=0.0005')
    parser.add_argument('--beta1', type=float, default=0.5, help='beta1 for adam. default=0.5')
    parser.add_argument('--lambda_grad',type=float, help='gradient penelty weight',default=0.1)
    parser.add_argument('--alpha',type=float, help='reconstruction loss weight',default=5)

    return parser

def post_config(opt):
    """
    the additional specification in the opt
    """
    # init fixed parameters
    opt.niter_init = opt.niter
    opt.noise_amp_init = opt.noise_amp
    opt.nfc_init = opt.nfc
    opt.min_nfc_init = opt.min_nfc
    opt.scale_factor_init = opt.scale_factor
    random.seed(opt.manualSeed)
    tf.random.set_seed(opt.manualSeed)
    return opt

In [None]:
def contributions(in_length, out_length, scale, kernel, kernel_width, antialiasing):
    """
    support function of imresize_in,calculates a set of 'filters' and a set of field_of_view that will later on be applied
    such that each position from the field_of_view will be multiplied with a matching filter from the  'weights' based on 
    the interpolation method and the distance of the sub-pixel location from the pixel centers around it. 
    This is only done for one dimension of the image.
    """

    # When anti-aliasing is activated (default and only for downscaling) the receptive field is stretched to size of
    # 1/sf. this means filtering is more 'low-pass filter'.
    fixed_kernel = (lambda arg: scale * kernel(scale * arg)) if antialiasing else kernel
    kernel_width *= 1.0 / scale if antialiasing else 1.0

    # These are the coordinates of the output image
    out_coordinates = np.arange(1, out_length+1)

    # These are the matching positions of the output-coordinates on the input image coordinates.
    # Best explained by example: say we have 4 horizontal pixels for HR and we downscale by SF=2 and get 2 pixels:
    # [1,2,3,4] -> [1,2]. Remember each pixel number is the middle of the pixel.
    # The scaling is done between the distances and not pixel numbers (the right boundary of pixel 4 is transformed to
    # the right boundary of pixel 2. pixel 1 in the small image matches the boundary between pixels 1 and 2 in the big
    # one and not to pixel 2. This means the position is not just multiplication of the old pos by scale-factor).
    # So if we measure distance from the left border, middle of pixel 1 is at distance d=0.5, border between 1 and 2 is
    # at d=1, and so on (d = p - 0.5).  we calculate (d_new = d_old / sf) which means:
    # (p_new-0.5 = (p_old-0.5) / sf)     ->          p_new = p_old/sf + 0.5 * (1-1/sf)
    match_coordinates = 1.0 * out_coordinates / scale + 0.5 * (1 - 1.0 / scale)

    # This is the left boundary to start multiplying the filter from, it depends on the size of the filter
    left_boundary = np.floor(match_coordinates - kernel_width / 2)

    # Kernel width needs to be enlarged because when covering has sub-pixel borders, it must 'see' the pixel centers
    # of the pixels it only covered a part from. So we add one pixel at each side to consider (weights can zeroize them)
    expanded_kernel_width = np.ceil(kernel_width) + 2

    # Determine a set of field_of_view for each each output position, these are the pixels in the input image
    # that the pixel in the output image 'sees'. We get a matrix whos horizontal dim is the output pixels (big) and the
    # vertical dim is the pixels it 'sees' (kernel_size + 2)
    field_of_view = np.squeeze(np.uint(np.expand_dims(left_boundary, axis=1) + np.arange(expanded_kernel_width) - 1))

    # Assign weight to each pixel in the field of view. A matrix whos horizontal dim is the output pixels and the
    # vertical dim is a list of weights matching to the pixel in the field of view (that are specified in
    # 'field_of_view')
    weights = fixed_kernel(1.0 * np.expand_dims(match_coordinates, axis=1) - field_of_view - 1)

    # Normalize weights to sum up to 1. be careful from dividing by 0
    sum_weights = np.sum(weights, axis=1)
    sum_weights[sum_weights == 0] = 1.0
    weights = 1.0 * weights / np.expand_dims(sum_weights, axis=1)

    # We use this mirror structure as a trick for reflection padding at the boundaries
    mirror = np.uint(np.concatenate((np.arange(in_length), np.arange(in_length - 1, -1, step=-1))))
    field_of_view = mirror[np.mod(field_of_view, mirror.shape[0])]

    # Get rid of  weights and pixel positions that are of zero weight
    non_zero_out_pixels = np.nonzero(np.any(weights, axis=0))
    weights = np.squeeze(weights[:, non_zero_out_pixels])
    field_of_view = np.squeeze(field_of_view[:, non_zero_out_pixels])

    # Final products are the relative positions and the matching weights, both are output_size X fixed_kernel_size
    return weights, field_of_view

In [None]:
def resize_along_dim(im, dim, weights, field_of_view):
    """
    support function of imresize_in, resize im along the dim given
    """
    
    # To be able to act on each dim, we swap so that dim 0 is the wanted dim to resize
    tmp_im = np.swapaxes(im, dim, 0)

    # We add singleton dimensions to the weight matrix so we can multiply it with the big tensor we get for
    # tmp_im[field_of_view.T], (bsxfun style)
    weights = np.reshape(weights.T, list(weights.T.shape) + (np.ndim(im) - 1) * [1])

    # This is a bit of a complicated multiplication: tmp_im[field_of_view.T] is a tensor of order image_dims+1.
    # for each pixel in the output-image it matches the positions the influence it from the input image (along 1 dim
    # only, this is why it only adds 1 dim to the shape). We then multiply, for each pixel, its set of positions with
    # the matching set of weights. we do this by this big tensor element-wise multiplication (MATLAB bsxfun style:
    # matching dims are multiplied element-wise while singletons mean that the matching dim is all multiplied by the
    # same number
    tmp_out_im = np.sum(tmp_im[field_of_view.T] * weights, axis=0)

    # Finally we swap back the axes to the original order
    return np.swapaxes(tmp_out_im, dim, 0)

In [None]:
# interpolation methods. x is the distance from the left pixel center
def cubic(x):
    absx = np.abs(x)
    absx2 = absx ** 2
    absx3 = absx ** 3
    return ((1.5*absx3 - 2.5*absx2 + 1) * (absx <= 1) +
            (-0.5*absx3 + 2.5*absx2 - 4*absx + 2) * ((1 < absx) & (absx <= 2)))

In [None]:
def imresize_in(im, scale_factor=None):
    """
    complete procedure of resizing
    """
    # First standardize values and fill missing arguments (if needed) by deriving scale from output shape or vice versa
    input_shape = im.shape
    
    scale_factor = [scale_factor, scale_factor]
    scale_factor = list(scale_factor) + ([1] * (len(input_shape) - len(scale_factor)))

    # Dealing with missing output-shape. calculating according to scale-factor
    output_shape = np.uint(np.ceil(np.array(input_shape) * np.array(scale_factor)))
    
    # Choose interpolation method, each method has the matching kernel size
    method = cubic 
    kernel_width = 4.0
    
    # Antialiasing is only used when downscaling
    antialiasing = (scale_factor[0] < 1)

    # Sort indices of dimensions according to scale of each dimension. since we are going dim by dim this is efficient
    dims_sorted = np.argsort(np.array(scale_factor)).tolist()

    # Iterate over dimensions to calculate local weights for resizing and resize each time in one direction
    out_im = np.copy(im)
    for dim in dims_sorted:
        # No point doing calculations for scale-factor 1. nothing will happen anyway
        if scale_factor[dim] == 1.0:
            continue

        # for each coordinate (along 1 dim), calculate which coordinates in the input image affect its result and the
        # weights that multiply the values there to get its result.
        weights, field_of_view = contributions(im.shape[dim], output_shape[dim], scale_factor[dim],
                                               method, kernel_width, antialiasing)

        # Use the affecting position values and the set of weights to calculate the result of resizing along this 1 dim
        out_im = resize_along_dim(out_im, dim, weights, field_of_view)

    return out_im

In [None]:
def imresize_scale_tensor(im,scale_factor): # original imresize
    """
    the wrapper of imresize_in for tensors, rescale a tensor by scale factor = scale
    """
    #change image from tensor to numpy array,  
    im = tensor_to_np(im)
    #resize it with scale,
    im = imresize_in(im, scale_factor=scale_factor)
    #then get back to tensor
    im = np_to_tensor(im)
    return im

In [None]:
def zero_pad(tensor, pad_width):
    """
    give a zero padding to im of width = pad_width
    can be applied to both 3D and 4D tensor
    """
    width = tensor.shape[len(tensor.shape)-3]
    height = tensor.shape[len(tensor.shape)-2]
    return tf.image.resize_with_crop_or_pad(tensor, 
                                            width + 2 * pad_width,
                                            height + 2 * pad_width)

In [None]:
def generator(opt, pad_noise):
    N = opt.nfc
    model = tf.keras.Sequential()
    model.add(layers.Conv2D(filters = max(N,opt.min_nfc),#output
                            input_shape= (opt.real_x+2*pad_noise, opt.real_y+2*pad_noise, opt.nc_im),
                            padding = "valid",
                            kernel_size = opt.ker_size,
                            strides = 1,
                            kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)
                           ))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(alpha = 0.2))
    for i in range(opt.num_layer - 2):
        N = int(opt.nfc/pow(2,(i+1)))
        model.add(layers.Conv2D(filters = max(N,opt.min_nfc), 
                                                  padding = "valid",
                                                  kernel_size = opt.ker_size,
                                                  strides = 1,
                                                  kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU(alpha = 0.2))
    
    model.add(layers.Conv2D(filters = opt.nc_im, 
                            padding = "valid",
                            kernel_size = opt.ker_size,
                            strides = 1,
                            activation='tanh',
                            kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)))
    return model

# class generator(tf.keras.Model):
#     def __init__(self,opt):
#         super(generator, self).__init__()
#         N = opt.nfc
#         self.conv_first = layers.Conv2D(
#                                 filters = max(N,opt.min_nfc),#output
#                                 input_shape= (opt.real_x, opt.real_y, opt.nc_im),
#                                 padding = "valid",
#                                 kernel_size = opt.ker_size,
#                                 strides = 1,
#                                 kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)
#                                )
#         self.batch_first = layers.BatchNormalization()
#         self.relu_first = layers.LeakyReLU(alpha = 0.2)
#         self.body_layers = []
#         for i in range(opt.num_layer - 2):
#             N = int(opt.nfc/pow(2,(i+1)))
#             self.body_layers.append(layers.Conv2D(filters = max(N,opt.min_nfc), 
#                                                   padding = "valid",
#                                                   kernel_size = opt.ker_size,
#                                                   strides = 1,
#                                                   kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)))
#             self.body_layers.append(layers.BatchNormalization())
#             self.body_layers.append(layers.LeakyReLU(alpha = 0.2))
#         self.end_layer = layers.Conv2D(filters = opt.nc_im, 
#                                   padding = "valid",
#                                   kernel_size = opt.ker_size,
#                                   strides = 1,
#                                   activation='tanh',
#                                   kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None))
        
#     def call(self, x):
#         x = self.conv_first(x)
#         x = self.batch_first(x)
#         x = self.relu_first(x)
#         for i in self.body_layers:
#             x = i(x)
#         x = self.end_layer(x)
#         #if the shape is different, choose y in the "middle"
#         return x

In [None]:
def discriminator(opt):
    N = opt.nfc
    model = tf.keras.Sequential()
    model.add(layers.Conv2D(filters = max(N,opt.min_nfc),#output
                                        input_shape= (opt.real_x, opt.real_y, opt.nc_im),
                                        padding = "valid",
                                        kernel_size = opt.ker_size,
                                        strides = 1,
                                        bias_initializer='zeros',
                                        kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)
                                        ))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(alpha = 0.2))
    for i in range(opt.num_layer - 2):
        N = int(opt.nfc/pow(2,(i+1)))
        model.add(layers.Conv2D(filters = max(N,opt.min_nfc), 
                                    padding = "valid",
                                    kernel_size = opt.ker_size,
                                    strides = 1,
                                    kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU(alpha = 0.2))
    
    model.add(layers.Conv2D(filters = 1, 
                            padding = "valid",
                            kernel_size = opt.ker_size,
                            strides = 1,
                            kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)))
    return model


# class discriminator(tf.keras.Model):
#     def __init__(self,opt):
#         super(discriminator, self).__init__()
#         N = opt.nfc
#         #self.input_layer = Input(shape=(opt.real_x, opt.real_y, opt.nc_im))
#         self.conv_first = layers.Conv2D(filters = max(N,opt.min_nfc),#output
#                                         input_shape= (opt.real_x, opt.real_y, opt.nc_im),
#                                         padding = "valid",
#                                         kernel_size = opt.ker_size,
#                                         strides = 1,
#                                         bias_initializer='zeros',
#                                         kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)
#                                         )
#         self.batch_first = layers.BatchNormalization()
#         self.relu_first = layers.LeakyReLU(alpha = 0.2)
#         self.body_layers = []
#         for i in range(opt.num_layer - 2):
#             N = int(opt.nfc/pow(2,(i+1)))
#             self.body_layers.append(layers.Conv2D(filters = max(N,opt.min_nfc), 
#                                     padding = "valid",
#                                     kernel_size = opt.ker_size,
#                                     strides = 1,
#                                     kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None)))
#             self.body_layers.append(layers.BatchNormalization())
#             self.body_layers.append(layers.LeakyReLU(alpha = 0.2))

#         self.conv_last = layers.Conv2D(filters = 1, 
#                           padding = "valid",
#                           kernel_size = opt.ker_size,
#                           strides = 1,
#                           activation='tanh',
#                           kernel_initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02, seed=None))

        
#     def call(self, x):
        
#         x = self.conv_first(x)
#         x = self.batch_first(x)
#         x = self.relu_first(x)
#         for i in self.body_layers:
#             x = i(x)
#         x = self.conv_last(x)

#         return x

In [None]:
def init_models(opt, pad_noise):
    """
    initialize a pair of generator and discriminator
    """
    #generator initialization:
    netG = generator(opt, pad_noise)
    #discriminator initialization:
    netD = discriminator(opt)
    #note: both model has fixed initializer
    return netG,netD

In [None]:
def G_process(netG, image_input, prev_image):
    """
    to keap a linear sequential model, append the prev_image afterwards
    """
    res = netG(tf.stop_gradient(image_input), training=False)
    width = int((prev_image.shape[1]-res.shape[1])/2)
    result = res + prev_image[:,width:(prev_image.shape[1]-width),width:(prev_image.shape[2]-width),:]
    return result

In [None]:
def calc_gradient_penalty(netD, real_data, fake_data, LAMBDA):
    """
    calculate the gradient penalty in order to improve the performance of Wasserstein GANs
    from  https://arxiv.org/pdf/1704.00028.pdf
    """

    #a N(0,1) variable in correct shape
    alpha = tf.random.uniform(shape = [1,1])
    alpha = tf.broadcast_to(alpha, real_data.shape)

    #linear interpolation of these two stuff, make it a trainable tensor
    interpolates = alpha * real_data + ((1 - alpha) * fake_data)
    interpolates = tf.Variable(interpolates, trainable=True)

    with tf.GradientTape() as penalty_tape:
        penalty_tape.watch(interpolates)
        disc_interpolates = netD(interpolates)

    #generate the autograd instance
    gradients = penalty_tape.gradient(target=disc_interpolates,
                                      sources=interpolates,
                                      output_gradients=tf.ones_like(disc_interpolates))

    gradient_penalty = LAMBDA * tf.math.reduce_mean((tf.norm(gradients, axis=1) -1) ** 2)

    return gradient_penalty

In [None]:
def adjust_scales2image(real_,opt):
    #calculate the number of scale
    opt.num_scales = math.ceil((math.log(math.pow(opt.min_size / (min(real_.shape[1], real_.shape[2])), 1), opt.scale_factor_init))) \
                     + 1 * opt.scale_plus1 + 1 * opt.additional_scale # newly added here
    
    opt.stop_scale = opt.num_scales - math.ceil(math.log(min([opt.max_size, max([real_.shape[1], real_.shape[2]])]) / max([real_.shape[1], real_.shape[2]]),opt.scale_factor_init))
    opt.scale1 = min(opt.max_size / max([real_.shape[1], real_.shape[2]]),1)  # min(250/max([real_.shape[0],real_.shape[1]]),1)
    real_shape = imresize_scale_tensor(real_, opt.scale1).shape
    opt.scale_factor = math.pow(opt.min_size/(min(real_shape[1],real_shape[2])),1/(opt.stop_scale))

In [None]:
def draw_concat(Gs,Zs,reals,NoiseAmp,in_s,mode,pad_noise,pad_image,opt):
    """
    Given current information of all the previous scales, generate the image cut 
    pass to the next layer (see Figure 4 in the article)
    """
    #if it's the first scale return in_s
    G_z = in_s
    #if it's not the first scale
    if len(Gs) > 0:
        #rand mode: draw with random noise
        if mode == 'rand':
            count = 0
            #for each scale, note that real_next from reals[1:] is the next layer size
            for G,Z_opt,real_curr,real_next,noise_amp in zip(Gs,Zs,reals,reals[1:],NoiseAmp):
                
                #for the first scale
                if count == 0:
                    #generate a 1 channel noise with 2 pad_noise smaller than Z_opt on each axis
                    #expand it to the original shape and 3 channels
                    z = generate_noise([Z_opt.shape[1] - 2 * pad_noise, Z_opt.shape[2] - 2 * pad_noise, 1])
                    z = tf.broadcast_to(z, (1,z.shape[1], z.shape[2],3))
                else:
                    #generate a noise with 2 pad_noise smaller than Z_opt on each axis and channel specified
                    z = generate_noise([Z_opt.shape[1] - 2 * pad_noise, Z_opt.shape[2] - 2 * pad_noise,opt.nc_z])
                
                #give it a additional zero pad
                z = zero_pad(z, pad_noise)
                
                #cut in_s into the shape of current scale real image,give it a image pad
                G_z = G_z[:,0:real_curr.shape[1],0:real_curr.shape[2],:]
                G_z = zero_pad(G_z, pad_image)
                
                #have the noise scaled and add it to the cut
                z_in = noise_amp*z+G_z
                #generate the part to the generator, then added with original image(see the article)
                G_z = G_process(G,z_in, G_z)
                
                #resize it by 1/opt.scale_factor (scale up)
                G_z = imresize_scale_tensor(G_z,1/opt.scale_factor)
                
                #cut it into the shape fit real graph in the next scale(in case there's rounding issue)
                G_z = G_z[:,0:real_next.shape[1],0:real_next.shape[2],:]
                
                count += 1
                
        #fix mode: draw with noise Z_opt
        if mode == 'fix':
            #similarly
            for G,Z_opt,real_curr,real_next,noise_amp in zip(Gs,Zs,reals,reals[1:],NoiseAmp):
                #cut in_s into the shape of current layer real image
                G_z = G_z[:, 0:real_curr.shape[1], 0:real_curr.shape[2],:]
                #give it a image pad
                G_z = zero_pad(G_z, pad_image)
                #have the Z_opt noise scaled and add it to the cut
                z_in = noise_amp*Z_opt+G_z
                #generate the part to the generator, then added with original image(see the article)
                G_z = G_process(G,z_in, G_z)
                #resize it by 1/opt.scale_factor (scale up)
                G_z = imresize_scale_tensor(G_z,1/opt.scale_factor)
                #cut it into the shape of real graph in the next layer (in case there's rounding issue)
                G_z = G_z[:,0:real_next.shape[1],0:real_next.shape[2],:]
    return G_z

In [None]:
def train_single_scale(reals,Gs,Zs,in_s,NoiseAmp,opt, nfc_prev, scale_num):
    """
    given a pair of discriminator and generator, train the new scale with all known
    data from previous scales
    """
    #get the corresponding real image
    real = reals[len(Gs)]
    #get its shape
    opt.real_x = real.shape[1]
    opt.real_y = real.shape[2]
    pad_noise = int(((opt.ker_size - 1) * opt.num_layer) / 2)
    pad_image = int(((opt.ker_size - 1) * opt.num_layer) / 2)
    z_opt = tf.fill([1,opt.real_x + 2 * pad_noise,opt.real_y + 2 * pad_noise,opt.nc_z], 0.)
    alpha = opt.alpha
    
    #init the discriminator and generator
    netG,netD = init_models(opt, pad_noise)
    #if the scales are not "far" from each other, warm-start with the last model
    if (nfc_prev == opt.nfc):
        netG.load_weights(f'{opt.out_}/{scale_num-1}/netGweights.tf')
        netD.load_weights(f'{opt.out_}/{scale_num-1}/netDweights.tf')

    # setup optimizer & scheduler
    decay_lr_D = PiecewiseConstantDecay(boundaries=[1600],values=[opt.lr_d, opt.lr_d*opt.gamma])
    decay_lr_G = PiecewiseConstantDecay(boundaries=[1600],values=[opt.lr_g, opt.lr_g*opt.gamma])
    optimizerD = tf.keras.optimizers.Adam(learning_rate=decay_lr_D, beta_1=opt.beta1, beta_2=0.999)
    optimizerG = tf.keras.optimizers.Adam(learning_rate=decay_lr_G, beta_1=opt.beta1, beta_2=0.999)
    
    #build the model
    noise = zero_pad(tf.fill([1,opt.real_x, opt.real_y, opt.nc_z], 1.0), pad_noise)
    fake = netG.predict(tf.stop_gradient(noise))
    netD.predict(tf.stop_gradient(fake))
    del noise, fake
    
    #for niter's loop
    for epoch in tqdm(range(opt.niter), desc = f"scale {len(Gs)}", leave = False):
        #if there's no G in Gs
        if (Gs == []):
            #generate a normal noise and a normal z_opt, give it pad
            z_opt = generate_noise([opt.real_x,opt.real_y,1])
            z_opt = zero_pad(tf.broadcast_to(z_opt,(1,opt.real_x,opt.real_y,3)), pad_noise)
            noise_ = generate_noise([opt.real_x,opt.real_y,1])
            noise_ = zero_pad(tf.broadcast_to(noise_,(1,opt.real_x,opt.real_y,3)), pad_noise)
        else:
            #noise is normal, then padded
            noise_ = generate_noise([opt.real_x, opt.real_y, opt.nc_z])
            noise_ = zero_pad(noise_, pad_noise)
        #noise_list = []
        #Update Discrimiator in Wasserstein GANs: maximize D(x) + D(G(z))
        for j in range(3):
            #for the first step in this scale
            if (j==0) & (epoch == 0):
                #if it's the first scale (very first step)
                if (Gs == []):
                    #set prev,z_prev and in_s as all 0
                    prev = tf.fill((1,opt.real_x,opt.real_y,opt.nc_z), 0.)
                    z_prev = tf.fill((1,opt.real_x,opt.real_y,opt.nc_z), 0.)
                    in_s = prev
                    
                    #pad prev as a image, pad z_prev as a noise
                    prev = zero_pad(prev, pad_image)
                    z_prev = zero_pad(z_prev, pad_noise)
                    
                    #set amplify coefficient as 1
                    opt.noise_amp = 1
                    
                #if the first step but not the first scale
                else:
                    #draw the random noise concate,add the padding as a image
                    prev = draw_concat(Gs,Zs,reals,NoiseAmp,in_s,'rand',pad_noise,pad_image,opt)
                    prev = zero_pad(prev, pad_image)
                    
                    #draw the fix noise concate
                    z_prev = draw_concat(Gs,Zs,reals,NoiseAmp,in_s,'fix',pad_noise,pad_image,opt)
                    
                    #use the RMSE between the real image and the fix noise concate to update the amplify coefficient 
                    MSE = tf.keras.losses.MeanSquaredError()
                    RMSE = tf.math.sqrt(MSE(real, z_prev))
                    opt.noise_amp = opt.noise_amp_init*RMSE
                    
                    #padded the fix noise concate after RMSE is calculated
                    z_prev = zero_pad(z_prev,pad_image)
                    
            #if it's not the first step in this scale
            else:
                #draw random noise concatenation, pad it as an image
                prev = draw_concat(Gs,Zs,reals,NoiseAmp,in_s,'rand',pad_noise,pad_image,opt)
                prev = zero_pad(prev, pad_image)

            #if it's the first scale
            if (Gs == []):
                #make the noise generated as the initial noise
                noise = noise_

            else:
                #if not, add scaled new noise into the "scaled-accumulation" of noise
                noise = opt.noise_amp*noise_+prev
                
            #noise_list.append(tf.stop_gradient(noise))
            with tf.GradientTape(watch_accessed_variables=False) as disc_tape:
                disc_tape.watch(netD.trainable_variables)
                
                #generate a result with real image, as the discriminator, the error for real should be -a
                output_real = netD(real, training=True)
                errD_real = -tf.reduce_mean(output_real)
                
                #generate a result with fake image and prev
                fake = netG(tf.stop_gradient(noise), training=False)
                width = int((prev.shape[1]-fake.shape[1])/2)
                fake = fake + prev[:,width:(prev.shape[1]-width),width:(prev.shape[2]-width),:]
                
                #as the discriminator, the error for fake should be a
                output_fake = netD(tf.stop_gradient(fake), training=True)
                errD_fake = tf.reduce_mean(output_fake)
                
                #calculate the gradient_penalty
                gradient_penalty = calc_gradient_penalty(netD, real, fake, opt.lambda_grad)
                errD = errD_real + errD_fake + gradient_penalty
            
            gradients_of_discriminator = disc_tape.gradient(errD, netD.trainable_variables)
            optimizerD.apply_gradients(zip(gradients_of_discriminator, netD.trainable_variables))
        
        # Update G network in Wasserstein GANs: maximize D(G(z))
        
        with tf.GradientTape(watch_accessed_variables=False, persistent=True) as extra_tape:
            extra_tape.watch(netG.trainable_variables)
            fake = netG(tf.stop_gradient(noise),training=True)
            width = int((prev.shape[1]-fake.shape[1])/2)
            fake = fake + prev[:,width:(prev.shape[1]-width),width:(prev.shape[2]-width),:]
            D_output = netD(fake, training=False)
            errG = -tf.reduce_mean(D_output)
        
        for j in range(3):

            with tf.GradientTape(watch_accessed_variables=False,) as gen_tape:
                gen_tape.watch(netG.trainable_variables)
                #fake = netG(tf.stop_gradient(noise),training=True)
                #width = int((prev.shape[1]-fake.shape[1])/2)
                #fake = fake + prev[:,width:(prev.shape[1]-width),width:(prev.shape[2]-width),:]
                
                #for fake example we should maximize the absolute calue of D(fake)
                #D_output = netD(fake, training=False)
                #errG = -tf.reduce_mean(D_output)
                
                #scale the noise, accumulate the weighted noise with the noise get through G
                Z_opt = opt.noise_amp*z_opt+z_prev

                #generate the result from Z_opt (accumulated z_opt) and z_prev(last z)
                result = netG(tf.stop_gradient(Z_opt), training=True)
                width = int((z_prev.shape[1]-result.shape[1])/2)
                result = result + z_prev[:,width:(z_prev.shape[1]-width),width:(z_prev.shape[2]-width),:]
                #rec loss is the weighted MSE of real image and generated image
                loss = tf.keras.losses.MeanSquaredError()
                rec_loss = alpha*loss(result,real)

                #total_G = errG + rec_loss
                
            #update
            extra_gradient = extra_tape.gradient(errG, netG.trainable_variables)
            #optimizerG.apply_gradients(zip(extra_gradient, netG.trainable_variables))
            gradient_of_generator = gen_tape.gradient(rec_loss, netG.trainable_variables)
            optimizerG.apply_gradients(zip(gradient_of_generator+extra_gradient, netG.trainable_variables))
        
        del extra_tape
        if epoch % 250 == 0 or epoch == (opt.niter-1): 
        #if epoch == (opt.niter-1): #only saved once (for small graph)
            plt.imsave(f'{opt.outf}/fake_sample.png', convert_image_np(tf.stop_gradient(fake)))
            plt.imsave(f'{opt.outf}/G(z_opt).png', 
                       convert_image_np(tf.stop_gradient(G_process(netG, tf.stop_gradient(Z_opt), z_prev))), vmin=0, vmax=1)

    save_networks(netG,netD,z_opt,opt)
    return z_opt,in_s,netG

In [None]:
def freeze(model):
    for layer in model.layers:
        layer.trainable=False
    return model

In [None]:
def train(real,opt,Gs,Zs,NoiseAmp):
    in_s = 0
    nfc_prev = 0
    #resize the input real and generate the pyramid
    real = imresize_scale_tensor(real,opt.scale1)
    #print(real.shape)
    reals = create_reals_pyramid(real,opt)
    #parent directory
    opt.out_ = generate_dir2save(opt)
    
    #in each scale
    for scale_num in tqdm(range(opt.stop_scale+1), desc = opt.input_name, leave = True):
        opt.nfc = min(opt.nfc_init * pow(2, math.floor(scale_num / 4)), 128)
        opt.min_nfc = min(opt.min_nfc_init * pow(2, math.floor(scale_num / 4)), 128)
        
        #directory for each scale
        opt.outf = f'{opt.out_}/{scale_num}'
        try:
            os.makedirs(opt.outf)
        except OSError:
            pass

        #in the scale specific directory save the real_scale.png
        plt.imsave(f'{opt.outf}/real_scale.png', convert_image_np(reals[scale_num]), vmin=0, vmax=1)
        #train this scale and return the generator
        z_curr,in_s,G_curr = train_single_scale(reals,Gs,Zs,in_s,NoiseAmp,opt, nfc_prev, scale_num)
        
        G_curr = freeze(G_curr)
        #save them to each list
        Gs.append(G_curr)#model
        #print(z_curr.shape)
        Zs.append(z_curr)#tensor
        NoiseAmp.append(opt.noise_amp)#tensor
        
        #for future use, save all variables and models to the parent directory of the model
        #which is easier to load in one time
        save_vars(G_curr, Zs[-1], NoiseAmp[-1], reals[scale_num], opt, scale_num)
        nfc_prev = opt.nfc
        del G_curr
    return

In [None]:
def upsampling(im,sx,sy):
    #bilinearlly upscale the data
    new = tf.image.resize(im, size = [round(sx),round(sy)])
    return new

def generate_in2coarsest(reals,scale_v,scale_h,opt):
    #pick the coarest scale image
    real = reals[opt.gen_start_scale]
    #upsample it back bilinearlly
    real_down = upsampling(real, scale_v * real.shape[2], scale_h * real.shape[3])
    #for fresh start
    if opt.gen_start_scale == 0:
        #generate from 0
        in_s = tf.fill(real_down.shape, 0.)
    else:
        #otherwise start from real_down
        in_s = upsampling(real_down, real_down.shape[2], real_down.shape[3])
    return in_s

In [None]:
def SinGAN_generate(Gs,Zs,reals,NoiseAmp,opt,in_s = None,gen_start_scale=0,num_samples=50,output_image = False):
    # make in_s a 0 tensor with reals[0] shape
    if in_s is None:
        in_s = tf.fill(reals[0].shape, 0.)
    #generate a pad width ((ker_size-1)*num_layer)/2
    pad = int(((opt.ker_size-1)*opt.num_layer)/2)
    output_list = []
    images_cur = []
    #print(in_s.shape, pad, reals[0].shape)
    
    #for each scale
    for G,Z_opt,noise_amp,n in zip(Gs,Zs,NoiseAmp,range(len(Gs))):
        #the shape inside padding
        real_x = Z_opt.shape[1]-pad*2
        real_y = Z_opt.shape[2]-pad*2

        #get all the previous image
        images_prev,images_cur = images_cur,[]
        
        #for the number of samples
        for i in range(0,num_samples):
            if n == 0:
                #generate a single channel noise,broadcast to 3 channel, then padding it with 0
                z_curr = generate_noise(size = [real_x,real_y,1])
                z_curr = tf.broadcast_to(z_curr,[1,z_curr.shape[1],z_curr.shape[2],3])
                z_curr = zero_pad(z_curr,pad)
            else:
                #generate noise with defined shape,padding
                z_curr = generate_noise(size = [real_x, real_y, opt.nc_z])
                z_curr = zero_pad(z_curr,pad)
                
            #if it's the first scale
            if images_prev == []:
                #use in_s as the first one, get a all zero one
                I_prev = zero_pad(in_s, pad)
            else:
                #get the last image, resize it by 1/scale_factor
                #cut into image shape(not quite necessary but in case there's rounding issue)
                I_prev = images_prev[i]
                I_prev = imresize_scale_tensor(I_prev,1/opt.scale_factor)
                I_prev = I_prev[:, 0:reals[n].shape[1], 0:reals[n].shape[2],:]
                #padding,cut into noise shape (similarly, not quite necessary)
                I_prev = zero_pad(I_prev,pad)
                I_prev = I_prev[:,0:z_curr.shape[1],0:z_curr.shape[2],:]
            
            #for human face generating
            if n < gen_start_scale:
                z_curr = Z_opt
                
            # amplify the z by the param, add the previous graph
            z_in = noise_amp*(z_curr)+I_prev
            I_curr = G_process(G,z_in,I_prev)
            #for the last scale
            if n == len(reals)-1:
                output_graph = convert_image_np(tf.stop_gradient(I_curr))
                #output_graph = convert_image_np(I_prev[:,width:(I_prev.shape[1]-width),width:(I_prev.shape[2]-width),:])
                output_list.append(output_graph)
                if (output_image):
                    #generate the directory
                    directory = os.path.join(generate_dir2save(opt), "image_generated")#modified
                    try:
                        os.makedirs(directory)
                    except OSError:
                        pass
                    #save the new generated image
                    plt.imsave(f'{directory}/{i}.png', output_graph, vmin=0,vmax=1)
            # have the generated image into the list
            images_cur.append(I_curr)
    return output_list

# Copy from MNIST script

In [None]:
#This file is the final version of MNIST data processing
#Note it's used for the modified version of SinGAN under directory SinGAN/.. not the official version --Alex

import os
import time
from tqdm import tqdm_notebook


def train_model(input_name, layer_number = 6, random_seed = 1, epochs = 2000, Scale_plus1 = True, additional_scale = False):
    #configure the option
    INPUT_DIR = os.path.join(os.getcwd(), "input")
    OUTPUT_DIR = os.path.join(os.getcwd(), "output")
    RANDOM_SEED = random_seed
    CHANNEL = 3 #Channel = 1 for SinGan still expect 3 channel, so use this
    INPUT_NAME = input_name
    LAYER_NUMBER = layer_number #5 layer * 3 scale is not quite okay for number 9
    EPOCHS = epochs
    SCALE_PLUS1 = Scale_plus1
    ADDITIONAL_SCALE = additional_scale
    GENERATION_START_SCALE = 0 # for generating 50 examples in the model document
    parser_train = get_arguments()
    parser_train.add_argument('--input_dir')
    parser_train.add_argument('--input_name')
    parser_train.add_argument('--mode')
    
    #newly added, have change in functions.adjust_scales2image --yihao
    parser_train.add_argument('--scale_plus1', type=int, default = 1)
    parser_train.add_argument('--additional_scale', type=int, default = 0)
    parser_train.add_argument("--gen_start_scale", type=int)
    
    opt_train = parser_train.parse_args(["--input_dir", INPUT_DIR, 
                                         "--input_name", INPUT_NAME, 
                                         "--mode", "train",
                                         "--manualSeed", str(RANDOM_SEED),
                                         "--out", OUTPUT_DIR,
                                         "--gen_start_scale", str(GENERATION_START_SCALE),
                                         "--num_layer", str(LAYER_NUMBER),
                                         "--nc_z", str(CHANNEL),
                                         "--nc_im", str(CHANNEL),
                                         "--niter", str(EPOCHS),
                                         "--scale_plus1", str(int(SCALE_PLUS1)),
                                         "--additional_scale", str(int(ADDITIONAL_SCALE))
                                        ])
    opt_train = post_config(opt_train)
    
    # follows the SinGan operation process, slightly simplified
    Gs = []
    Zs = []
    NoiseAmp = []
    
    #save path(note this function is modified)
    dir2save = generate_dir2save(opt_train)
    #if there's existed direction, stop it
    if (os.path.exists(dir2save)):
        print(f'layer={opt_train.num_layer}, iteration={opt_train.niter}, scale_factor={opt_train.scale_factor_init}, alpha={opt_train.alpha} model for {opt_train.input_name} already exist')
    #else run the training
    else:
        try:
            os.makedirs(dir2save)
        except OSError:
            pass
        #read the image
        real = read_image_tensor(opt_train)
        #decide scales
        adjust_scales2image(real, opt_train)
        #time the training
        start = time.time()
        #train
        train(real, opt_train, Gs, Zs, NoiseAmp)
        #stop timing
        end = time.time()
        print(f"input = {opt_train.input_name} finished, traing time {end - start}s, for n_iter = {opt_train.niter}, layer_number = {opt_train.num_layer}")
        # generate the example 50 graphs
        #SinGAN_generate(Gs, Zs, reals, NoiseAmp, 
        #                opt_train, 
        #                gen_start_scale=opt_train.gen_start_scale, 
        #                output_image = True)
    
def generate_data(class_label = 4, epochs = 2000, generate_size = 50, random_seed = 1, sample_size = 5, start_scale = 0, output_image = False):
    '''return generated image in np.array form'''
    #Configures
    INPUT_DIR = os.path.join(os.getcwd(), "input")
    RANDOM_SEED = random_seed
    GENERATION_START_SCALE = start_scale #for human face topic
    OUTPUT_DIR = os.path.join(os.getcwd(), "output")
    CHANNEL = 3 
    INPUT_NAME = [f"MNIST_{class_label}_input_{i}.png" for i in range(sample_size)]
    LAYER_NUMBER = 6
    SCALE_PLUS1 = True
    ADDITIONAL_SCALE = 0
    EPOCHS = epochs
    # the list takes all the result
    output = []
    parser_generate = get_arguments()
    parser_generate.add_argument('--input_dir')
    parser_generate.add_argument('--input_name')
    parser_generate.add_argument('--mode')
    parser_generate.add_argument('--gen_start_scale', type=int)
    parser_generate.add_argument('--scale_plus1', type=int)
    parser_generate.add_argument('--additional_scale', type=int)
    # for all the trained samples
    for i in tqdm(range(sample_size), desc = "loading", leave = False):
        #run standard procedure, create a namespace
        opt_generate = parser_generate.parse_args(["--input_dir", INPUT_DIR, 
                                                   "--input_name", INPUT_NAME[i], 
                                                   "--mode", "random_samples",
                                                   "--manualSeed", str(RANDOM_SEED),
                                                   "--gen_start_scale", str(GENERATION_START_SCALE),
                                                   "--out", OUTPUT_DIR,
                                                   "--num_layer", str(LAYER_NUMBER),
                                                   "--nc_z", str(CHANNEL),
                                                   "--nc_im", str(CHANNEL),
                                                   "--niter", str(EPOCHS),
                                                   "--scale_plus1", str(int(SCALE_PLUS1)),
                                                   "--additional_scale", str(int(ADDITIONAL_SCALE))
                                                  ])
        opt_generate = post_config(opt_generate)
        #print(opt_generate.input_name)
        #save the path(note this function is modified)
        dir2save = generate_dir2save(opt_generate)
        #print(dir2save)
        #read the file
        real = read_image_tensor(opt_generate)
        #adjust scales, write into opt_generate
        adjust_scales2image(real, opt_generate)
        #load the model
        Gs, Zs, reals, NoiseAmp = load_trained_pyramid(opt_generate)
        #in_s = generate_in2coarsest(reals,1,1,opt_generate)
        #generate the output(the function is modified to generate list_output)
        list_output = SinGAN_generate(Gs, Zs,  reals, NoiseAmp, opt_generate, output_image = output_image, gen_start_scale=0,
                                     num_samples = int(np.ceil(generate_size/(sample_size+1))))
        output += list_output
    #change list into np.array
    output = np.array(output)
    #shuffle it by the first axis
    np.random.shuffle(output)
    return output[:generate_size]