In [None]:
import sklearn.model_selection
import numpy as np
from numpy import expand_dims
from numpy import zeros
from numpy import ones
from numpy import random
from numpy.random import randn
from numpy.random import randint
import keras
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers import RMSprop
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from tensorflow.keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import ReLU
from keras.layers import BatchNormalization
from keras.layers import LayerNormalization
from keras.layers import Dropout
from keras.layers import Layer
from google.colab import files
from google.colab import drive
from tensorflow.keras import backend as K

from keras import Input
from keras import Model
from keras.utils import generic_utils
from keras.initializers import RandomNormal
from keras.constraints import Constraint
from tensorflow.python.keras.layers.merge import _Merge
from keras.layers import InputSpec
import keras.backend as K
import math
import sys

import tensorflow as tf
import numpy as np

In [None]:
class SpectralNormalization(tf.keras.layers.Wrapper):
    """
    Performs spectral normalization on weights.
    This wrapper controls the Lipschitz constant of the layer by
    constraining its spectral norm, which can stabilize the training of GANs.
    See [Spectral Normalization for Generative Adversarial Networks](https://arxiv.org/abs/1802.05957).
    """

    def __init__(self, layer: tf.keras.layers, power_iterations: int = 1, **kwargs):
        super().__init__(layer, **kwargs)
        if power_iterations <= 0:
            raise ValueError(
                "`power_iterations` should be greater than zero, got "
                "`power_iterations={}`".format(power_iterations)
            )
        self.power_iterations = power_iterations
        self._initialized = False

    def build(self, input_shape):
        """Build `Layer`"""
        super().build(input_shape)
        input_shape = tf.TensorShape(input_shape)
        self.input_spec = tf.keras.layers.InputSpec(shape=[None] + input_shape[1:])

        if hasattr(self.layer, "kernel"):
            self.w = self.layer.kernel
        elif hasattr(self.layer, "embeddings"):
            self.w = self.layer.embeddings
        else:
            raise AttributeError(
                "{} object has no attribute 'kernel' nor "
                "'embeddings'".format(type(self.layer).__name__)
            )

        self.w_shape = self.w.shape.as_list()

        self.u = self.add_weight(
            shape=(1, self.w_shape[-1]),
            initializer=tf.initializers.TruncatedNormal(stddev=0.02),
            trainable=False,
            name="sn_u",
            dtype=self.w.dtype,
        )

    def call(self, inputs, training=None):
        """Call `Layer`"""
        if training is None:
            training = tf.keras.backend.learning_phase()

        if training:
            self.normalize_weights()

        output = self.layer(inputs)
        return output

    def compute_output_shape(self, input_shape):
        return tf.TensorShape(self.layer.compute_output_shape(input_shape).as_list())

    @tf.function
    def normalize_weights(self):
        """Generate spectral normalized weights.
        This method will update the value of `self.w` with the
        spectral normalized value, so that the layer is ready for `call()`.
        """

        w = tf.reshape(self.w, [-1, self.w_shape[-1]])
        u = self.u

        with tf.name_scope("spectral_normalize"):
            for _ in range(self.power_iterations):
                v = tf.math.l2_normalize(tf.matmul(u, w, transpose_b=True))
                u = tf.math.l2_normalize(tf.matmul(v, w))

            sigma = tf.matmul(tf.matmul(v, w), u, transpose_b=True)

            self.w.assign(self.w / sigma)
            self.u.assign(u)

    def get_config(self):
        config = {"power_iterations": self.power_iterations}
        base_config = super().get_config()
        return {**base_config, **config}

class SelfAttention(Layer):

    def __init__(self, ch, **kwargs):
        super(SelfAttention, self).__init__(**kwargs)
        self.channels = ch
        self.filters_f_g = self.channels // 8
        self.filters_h = self.channels

    def build(self, input_shape):
        kernel_shape_f_g = (1, 1) + (self.channels, self.filters_f_g)
        kernel_shape_h = (1, 1) + (self.channels, self.filters_h)

        # Create a trainable weight variable for this layer:
        self.gamma = self.add_weight(name='gamma', shape=[1], initializer='zeros', trainable=True)
        self.kernel_f = self.add_weight(shape=kernel_shape_f_g,
                                        initializer='glorot_uniform',
                                        name='kernel_f',
                                        trainable=True)
        self.kernel_g = self.add_weight(shape=kernel_shape_f_g,
                                        initializer='glorot_uniform',
                                        name='kernel_g',
                                        trainable=True)
        self.kernel_h = self.add_weight(shape=kernel_shape_h,
                                        initializer='glorot_uniform',
                                        name='kernel_h',
                                        trainable=True)

        super(SelfAttention, self).build(input_shape)
        # Set input spec.
        self.input_spec = InputSpec(ndim=4,
                                    axes={3: input_shape[-1]})
        self.built = True

    def call(self, x):
        def hw_flatten(x):
            return K.reshape(x, shape=[K.shape(x)[0], K.shape(x)[1]*K.shape(x)[2], K.shape(x)[3]])

        f = K.conv2d(x,
                     kernel=self.kernel_f,
                     strides=(1, 1), padding='same')  # [bs, h, w, c']
        g = K.conv2d(x,
                     kernel=self.kernel_g,
                     strides=(1, 1), padding='same')  # [bs, h, w, c']
        h = K.conv2d(x,
                     kernel=self.kernel_h,
                     strides=(1, 1), padding='same')  # [bs, h, w, c]

        s = K.batch_dot(hw_flatten(g), K.permute_dimensions(hw_flatten(f), (0, 2, 1)))  # # [bs, N, N]

        beta = K.softmax(s, axis=-1)  # attention map

        o = K.batch_dot(beta, hw_flatten(h))  # [bs, N, C]

        o = K.reshape(o, shape=K.shape(x))  # [bs, h, w, C]
        x = self.gamma * o + x

        return x

    def compute_output_shape(self, input_shape):
        return input_shape
  
def input_shapes(model, prefix):
    shapes = [il.shape[1:] for il in 
        model.inputs if il.name.startswith(prefix)]
    shapes = [tuple([d for d in dims]) for dims in shapes]
    return shapes

class NoiseGenerator(object):
    def __init__(self, noise_shapes, batch_size=512, random_seed=None):
        self.noise_shapes = noise_shapes
        self.batch_size = batch_size
        self.prng = np.random.RandomState(seed=random_seed)

    def __iter__(self):
        return self

    def __next__(self, mean=0.0, std=1.0):

        def noise(shape):
            shape = (self.batch_size,) + shape

            n = self.prng.randn(*shape).astype(np.float32)
            if std != 1.0:
                n *= std
            if mean != 0.0:
                n += mean
            return n

        return [noise(s) for s in self.noise_shapes]


def wasserstein_loss(y_true, y_pred):
    return K.mean(y_true * y_pred, axis=-1)

class RandomWeightedAverage(_Merge):
    def build(self, input_shape):
        super(RandomWeightedAverage, self).build(input_shape)
        if len(input_shape) != 2:
            raise ValueError('A `RandomWeightedAverage` layer should be '
                             'called on exactly 2 inputs')

    def _merge_function(self, inputs):
        if len(inputs) != 2:
            raise ValueError('A `RandomWeightedAverage` layer should be '
                             'called on exactly 2 inputs')

        (x,y) = inputs
        shape = K.shape(x)
        weights = K.random_uniform(shape[:1],0,1)
        for i in range(len(K.int_shape(x))-1):
            weights = K.expand_dims(weights,-1)
        rw = x*weights + y*(1-weights)
        return rw


class Nontrainable(object):
    
    def __init__(self, model):
        self.model = model

    def __enter__(self):
        self.trainable_status = self.model.trainable
        self.model.trainable = False
        return self.model

    def __exit__(self, type, value, traceback):
        self.model.trainable = self.trainable_status

class GradientPenalty(Layer):
    def call(self, inputs):
        real_image, generated_image, disc = inputs
        avg_image = RandomWeightedAverage()(
        [real_image, generated_image]
        )
        with tf.GradientTape() as tape:
          tape.watch(avg_image)
          disc_avg = disc(avg_image)
        
        grad = tape.gradient(disc_avg,[avg_image])[0]
        print(grad)
        GP = K.sqrt(K.sum(K.batch_flatten(K.square(grad)), axis=1, keepdims=True))-1
        return GP
############################## Normalizing real data ################################

#source: https://stackoverflow.com/questions/58646790/python-scaling-with-4d-data

from sklearn.preprocessing import MinMaxScaler

def scaling (data):
  X_transformed = np.zeros_like(data)
  mmx = MinMaxScaler()
  slc = data[:, :, :, 0].reshape(23951, 238*62) # make it a bunch of row vectors
  transformed = mmx.fit_transform(slc)
  transformed = transformed.reshape(23951, 238, 62) # reshape it back to tiles
  X_transformed[:, :, :, 0] = transformed # put it in the transformed array
  return X_transformed


def compute_output_shape(self, input_shapes):
        return (input_shapes[1][0], 1)

def load_real_samples():
    trainX = np.load("/content/drive/MyDrive/FTCP_rep.npy",allow_pickle = True)
    trainX = expand_dims(trainX,axis = -1)
    X = trainX.astype('float32')
    X = scaling(X)
    return X

X_train = load_real_samples() 
input_dim = X_train.shape[1]
channel_dim = X_train.shape[2]

def generate_real_samples(dataset,n_samples):
    ix = randint(0,dataset.shape[0],n_samples)
    X = dataset[ix]
    y = -ones((n_samples,1))
    return X,y
    
def generate_latent_points(latent_dim,n_samples):
    x_input = randn(latent_dim*n_samples)
    x_input = x_input.reshape(n_samples,latent_dim)
    return x_input

def generate_fake_samples(generator,latent_dim,n_samples):
    x_input = generate_latent_points(latent_dim,n_samples)
    X = generator.predict(x_input)
    y = ones((n_samples,1))
    return X,y

In [None]:
############################### Defining discriminator function ###############################
import tensorflow as tf
from keras import backend as K
from keras.layers import Input, Dense, Lambda, Conv1D, Conv2DTranspose, \
    LeakyReLU, Activation, Flatten, Reshape, BatchNormalization
from keras import layers
from keras.models import Model
latent_dim = 512
max_filters = 128
filter_size = [5,3,3]
strides = [2,2,1]

def define_critic(in_shape):
  max_filters = 128
  filter_size = [5,3,3]
  strides = [2,2,1]
  critic_inputs = Input(shape=(input_dim, channel_dim,), name="input")
  x = SpectralNormalization(Conv1D(max_filters//4, filter_size[0], strides=strides[0], padding='SAME'))(critic_inputs) #Specteral normalization added.
  x = LayerNormalization()(x) #Newly added
  x = LeakyReLU(0.2)(x)
  
  x = SpectralNormalization(Conv1D(max_filters//2, filter_size[1], strides=strides[1], padding='SAME'))(x) #Specteral normalization added.
  x = LayerNormalization()(x) #Newly added
  x = LeakyReLU(0.2)(x)

  x = SpectralNormalization(Conv1D(max_filters, filter_size[2], strides=strides[2], padding='SAME'))(x) #Specteral normalization added.
  x = LayerNormalization()(x) #Newly added
  x = LeakyReLU(0.2)(x)
  x = Flatten()(x)
  x = Dense(1024, activation="relu")(x)
  out = Dense(1, activation = 'linear')(x)
  model = Model(inputs=critic_inputs, outputs=out)
  for layer in model.layers:
    print(layer.output_shape)
  return model

############################### Defining generator function ###############################

def generate_real_samples(dataset,n_samples):
    ix = randint(0,dataset.shape[0],n_samples)
    X = dataset[ix]
    y = -ones((n_samples,1))
    return X,y
    
def generate_latent_points(latent_dim,n_samples):
    x_input = randn(latent_dim*n_samples)
    x_input = x_input.reshape(n_samples,latent_dim)
    return x_input
    
noise = generate_latent_points(512,10)

def define_generator(latent_dim):
    latent_inputs = Input(shape=(latent_dim,), name="noise_input")
    map_size =  59  

    x = SpectralNormalization(Dense(max_filters*map_size, activation='LeakyReLU'))(latent_inputs) #Specteral normalization added.
    x = LayerNormalization()(x) #Newly added
    x = Reshape((map_size, 1, max_filters))(x) 
    #x = BatchNormalization()(x)
    print(x.shape)
    x = SpectralNormalization(Conv2DTranspose(max_filters//2, (3, 1), strides=(strides[2], 1), 
                        padding='SAME'))(x) #Specteral normalization added.
    x = LayerNormalization()(x) #Newly added

    print(x.shape)
    #x = BatchNormalization()(x)
    x = Activation('LeakyReLU')(x)
    x = SpectralNormalization(Conv2DTranspose(max_filters//4, (3, 1), strides=(strides[1], 1), 
                        padding='VALID'))(x) #Specteral normalization added.
    x = LayerNormalization()(x) #Newly added                    
    print("x",x.shape) 
   # x = BatchNormalization()(x)
    x = Activation('LeakyReLU')(x)
    x = SpectralNormalization(Conv2DTranspose(channel_dim, (3,1), strides=(2,1), 
                        padding='SAME'))(x) #Specteral normalization added.
    x = LayerNormalization()(x) #Newly added                    
    print(x.shape)
    x = Activation('sigmoid')(x) #sigmoid is returned. In Second GAN, it was tanh.
    out = Lambda(lambda x: K.squeeze(x, axis=2))(x)
    model = Model(inputs=latent_inputs, outputs=out)
    for layer in model.layers:
      print(layer.output_shape)
    return model

In [None]:
class WGANGP(object):
    def __init__(self, gen, disc, lr_gen=0.0001, lr_disc=0.0001):

      self.gen = gen
      self.disc = disc
      self.lr_gen = lr_gen
      self.lr_disc = lr_disc
      self.build()

    def build(self):
        
        tens_shape = input_shapes(self.disc, "input")[0]
        noise_shapes = input_shapes(self.gen, "noise_input")

        self.opt_disc = Adam(self.lr_disc, beta_1=0.0, beta_2=0.9)
        self.opt_gen = Adam(self.lr_gen, beta_1=0.0, beta_2=0.9)
        
        with Nontrainable(self.gen):
            real_image = Input(shape=tens_shape)
            noise = [Input(shape=s) for s in noise_shapes]
            
            print("real_image", real_image.shape)
            disc_real = self.disc(real_image)
            print("disc_real", disc_real.shape)
            generated_image = self.gen(noise)
            print("generated_image", generated_image.shape)
            disc_fake = self.disc(generated_image)


            gp = GradientPenalty()([real_image, generated_image, self.disc])

            self.disc_trainer = Model(
                inputs=[real_image, noise],
                outputs=[disc_real, disc_fake, gp]
            )
            self.disc_trainer.compile(optimizer=self.opt_disc,
                loss=[wasserstein_loss, wasserstein_loss, 'mse'],
                loss_weights=[1.0, 1.0, 10.0]
            )
        
        with Nontrainable(self.disc):
            noise = [Input(shape=s) for s in noise_shapes]
            
            generated_image = self.gen(noise)
            disc_fake = self.disc(generated_image)
            
            self.gen_trainer = Model(
                inputs=noise, 
                outputs=disc_fake
            )
            self.gen_trainer.compile(optimizer=self.opt_gen,
                loss=wasserstein_loss)
      
    def fit_generator(self, noise_gen, dataset, latent_dim, n_epochs = 10, n_batch = 256,n_critic = 5):
        bat_per_epoch = int(dataset.shape[0]/n_batch)
        n_steps = bat_per_epoch*n_epochs
        half_batch = int(n_batch/2)
        disc_out_shape = (n_batch, self.disc.output_shape[1])
        real_target = -np.ones(disc_out_shape, dtype=np.float32)
        fake_target = -real_target
        gp_target = np.ones_like(real_target)
        genLossArr = []
        disc0LossArr = []
        disc1LossArr = []
        disc2LossArr = []
        disc3LossArr = []
        for epoch in range(n_epochs):
            print("Epoch {}/{}".format(epoch+1, n_epochs))
            # Initialize progbar and batch counter
            progbar = generic_utils.Progbar(
                bat_per_epoch*n_batch)
            for step in range(bat_per_epoch):

                # Train discriminator
                with Nontrainable(self.gen):
                    for repeat in range(n_critic):
                        tens_batch = generate_real_samples(dataset, n_batch)
                        noise_batch = next(noise_gen)
                        disc_loss = self.disc_trainer.train_on_batch(
                            [tens_batch[0]]+noise_batch,
                            [real_target, fake_target, gp_target]
                        )

                # Train generator
                with Nontrainable(self.disc):
                    noise_batch = next(noise_gen)
                    gen_loss = self.gen_trainer.train_on_batch(
                        noise_batch, real_target)
                losses = []

                for (i,dl) in enumerate(disc_loss):
                    losses.append(("D{}".format(i), dl))
                    if i == 0:
                      disc0LossArr.append(dl)
                    elif i == 1:
                      disc1LossArr.append(dl)
                    elif i == 2:
                      disc2LossArr.append(dl)
                    elif i == 3:
                      disc3LossArr.append(dl)
                losses.append(("G0", gen_loss))
                genLossArr.append(gen_loss)
                progbar.add(n_batch, 
                    values=losses)
            self.disc.save("/content/drive/MyDrive/WGAN_2/WGANGPDisc_1")
            self.gen.save("/content/drive/MyDrive/WGAN_2/WGANGPNorm_1")
            np.savetxt("/content/drive/MyDrive/WGAN_2/WGPDNorm0Loss_1",disc0LossArr)
            np.savetxt("/content/drive/MyDrive/WGAN_2/WGPDNorm1Loss_1",disc1LossArr)
            np.savetxt("/content/drive/MyDrive/WGAN_2/WGPDNorm2Loss_1",disc2LossArr)
            np.savetxt("/content/drive/MyDrive/WGAN_2/WGPDNorm3Loss_1",disc3LossArr)
            np.savetxt("/content/drive/MyDrive/WGAN_2/WGPNormGloss_1",genLossArr)

noise_dim = 512
critic = define_critic(7)
generator = define_generator(noise_dim)
gan = WGANGP(generator, critic)
dataset = load_real_samples()
noise_gen = NoiseGenerator([(noise_dim,)])
gan.fit_generator(noise_gen, dataset, noise_dim, n_epochs = 100, n_batch = 512, n_critic = 5)