In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import callbacks
import tensorflow.keras.backend as K
from tensorflow.keras import initializers, layers
import h5py
from tensorflow.keras import layers, models, optimizers
from matplotlib import pyplot as plt
from PIL import Image
import time
import math
import argparse
import pandas
import os
print(tf.__version__)

2.9.2


In [None]:
with np.load('/rbf_mix_length_scales_samp_per_scale_50_size_2000_v_10.npz') as data:
    x_orig = data['inputs']
    y_orig = data['outputs']
x_data = np.expand_dims(x_orig,axis=3)/(np.max(x_orig))
y_data = np.expand_dims(y_orig,axis=3)

In [None]:
train_size = 1000
test_size = 500
val_size = 500

In [None]:
tf.random.set_seed(456)
dataset = tf.data.Dataset.from_tensor_slices((x_data, y_data))
dataset = dataset.shuffle(buffer_size=train_size)

In [None]:
train_dataset = dataset.take(train_size)
val_dataset = dataset.skip(train_size)
test_dataset = test_dataset.skip(test_size)
val_dataset = test_dataset.take(val_size)

In [None]:
def tupleunpack(dataset,number):
  x1 = np.zeros((number,32,32,1))
  y  = np.zeros((number,32,32,1))
  for steps,(t1,t2) in enumerate(dataset):
    x1[steps] = t1
    y[steps] = t2
  return x1,y

In [None]:
x_train,y_train = tupleunpack(train_dataset,train_size)
x_test,y_test = tupleunpack(test_dataset,test_size)
x_val,y_val = tupleunpack(val_dataset,val_size)

# Capsule Layers 


In [None]:
def squash(vectors, axis=-1):
    s_squared_norm = tf.reduce_sum(tf.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / tf.sqrt(s_squared_norm + K.epsilon())
    return scale * vectors

In [None]:
class CapsuleLayer(layers.Layer):
    def __init__(self, num_capsule, dim_capsule, routings=3,
                 kernel_initializer='glorot_uniform',
                 **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_capsule = dim_capsule
        self.routings = routings
        self.kernel_initializer = initializers.get(kernel_initializer)

    def build(self, input_shape):
        assert len(input_shape) >= 3, 
        self.input_num_capsule = input_shape[1]
        self.input_dim_capsule = input_shape[2]

        self.W = self.add_weight(shape=[self.input_num_capsule, self.num_capsule,
                                        self.dim_capsule, self.input_dim_capsule],
                                 initializer=self.kernel_initializer,
                                 name='W')

        self.built = True

    def call(self, inputs, training=None):
        primaryout_expand = tf.expand_dims(tf.expand_dims(inputs, -1),2)
        primaryout_tiled = tf.tile(primaryout_expand, [1, 1, self.num_capsule, 1, 1],name="caps1_output_tiled")

        inputs_hat = tf.map_fn(lambda x: tf.matmul(self.W, x), elems=primaryout_tiled)
        inputs_hat_stopped = K.stop_gradient(inputs_hat)
     
        b = tf.zeros(shape=[K.shape(inputs_hat)[0], self.input_num_capsule, self.num_capsule,1,1])
        assert self.routings > 0, 'The routings should be > 0.'
        for i in range(self.routings):

            c = tf.nn.softmax(b, axis=2)

            predictions = tf.multiply(c,inputs_hat , name="weighted_predictions")
            w_sum = tf.reduce_sum(predictions, axis=1, keepdims=True, name="weighted_sum")
            outputs = squash(w_sum,axis=-2)
            if i < self.routings - 1:

                tiled_output = tf.tile(outputs, [1, self.input_num_capsule, 1, 1, 1]) 
                b += tf.matmul(inputs_hat_stopped, tiled_output, transpose_a=True)

        return tf.squeeze(outputs,[1,4])

    def compute_output_shape(self, input_shape):
        return tuple([None, self.num_capsule, self.dim_capsule])

    def get_config(self):
        config = {
            'num_capsule': self.num_capsule,
            'dim_capsule': self.dim_capsule,
            'routings': self.routings
        }
        base_config = super(CapsuleLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

In [None]:
def PrimaryCap(inputs, dim_capsule, n_channels, kernel_size, strides, padding):

    output = layers.Conv2D(filters=dim_capsule*n_channels, kernel_size=kernel_size, strides=strides, padding=padding, kernel_initializer="glorot_uniform", bias_initializer="zeros", use_bias= False)(inputs)
    outputs = layers.Reshape(target_shape=[-1, dim_capsule])(output)
    return layers.Lambda(squash)(outputs)


# Model Construction

In [None]:
def encoder_decoder(input,input_shape, n_class, routings):

    normalised = layers.BatchNormalization()(input)

    conv1 = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding='valid', activation='relu')(normalised)
    conv2 = layers.Conv2D(filters=128, kernel_size=(3, 3), strides=2, padding='valid', activation='relu')(conv1)

    primarycaps = PrimaryCap(conv2, dim_capsule=8, n_channels=32, kernel_size=(3,3), strides=2, padding='valid')


    capslayer = CapsuleLayer(num_capsule = n_class, dim_capsule=16, routings = routings)(primarycaps)



    reshape_layer = layers.Reshape(target_shape = [n_class*16])(capslayer)
    decoder_layer1 = layers.Dense(512, activation='relu')(reshape_layer)
    #decoder_layer2 = layers.Dense(1024, activation='relu')(decoder_layer1)
    decoder_layer3 = layers.Dense(np.prod(input_shape), activation=tf.keras.layers.LeakyReLU(alpha=0.3))(decoder_layer1)
    decoder_layer4         = layers.Reshape(target_shape=input_shape)(decoder_layer3)
    output = layers.Concatenate(axis=3)([input,decoder_layer4])
    return output


def decoder(input, input_shape):
    normalised = layers.BatchNormalization()(input)
    deconv1 = layers.Conv2DTranspose(filters = 3, kernel_size = (3,3), strides=1, padding='same', activation='relu', 
                                            use_bias=True, kernel_initializer='glorot_uniform',bias_initializer='zeros')(normalised)
    normalised2 = layers.BatchNormalization()(deconv1)
    deconv2 = layers.Conv2DTranspose(filters = 2, kernel_size = (3,3), strides=1, padding='same', activation='relu', 
                                            use_bias=True, kernel_initializer='glorot_uniform',bias_initializer='zeros')(normalised2)
    normalised3 = layers.BatchNormalization()(deconv2)
    deconv3 = layers.Conv2DTranspose(filters = 1, kernel_size = (3,3), strides=1, padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.3), 
                                            use_bias=True, kernel_initializer='glorot_uniform',bias_initializer='zeros')(normalised3)
    return deconv3

In [None]:
K.set_image_data_format('channels_last')
def CapsNet(input_shape, n_class, routings):
  x = layers.Input(shape=input_shape)
  encoded1 = encoder_decoder(input = x,input_shape = input_shape, n_class = n_class, routings = routings)
  encoded2 = encoder_decoder(input = encoded1,input_shape = input_shape, n_class = n_class, routings = routings)
  decoded = decoder(input = encoded2, input_shape = input_shape)
  train_model = models.Model(x,decoded)
  eval_model = models.Model(x, decoded)
  return train_model, eval_model

In [None]:
def plot_log(filename, show=True):

    data = pandas.read_csv(filename)

    fig = plt.figure(figsize=(4,6))
    fig.subplots_adjust(top=0.95, bottom=0.05, right=0.95)
    fig.add_subplot(211)
    for key in data.keyargs():
        if key.find('loss') >= 0 and not key.find('val') >= 0:  
            plt.plot(data['epoch'].values, data[key].values, label=key)
    plt.legend()
    plt.title('Training loss')

    fig.add_subplot(212)
    for key in data.keys():
        if key.find('acc') >= 0:  
            plt.plot(data['epoch'].values, data[key].values, label=key)
    plt.legend()
    plt.title('Training and validation accuracy')

    # fig.savefig('result/log.png')
    if show:
        plt.show()


def combine_images(generated_images, height=None, width=None):
    num = generated_images.shape[0]
    if width is None and height is None:
        width = int(math.sqrt(num))
        height = int(math.ceil(float(num)/width))
    elif width is not None and height is None:  
        height = int(math.ceil(float(num)/width))
    elif height is not None and width is None:  
        width = int(math.ceil(float(num)/height))

    shape = generated_images.shape[1:3]
    image = np.zeros((height*shape[0], width*shape[1]),
                     dtype=generated_images.dtype)
    for index, img in enumerate(generated_images):
        i = int(index/width)
        j = index % width
        image[i*shape[0]:(i+1)*shape[0], j*shape[1]:(j+1)*shape[1]] = \
            img[:, :, 0]
    return image

In [None]:
def train(model, x_train,y_train,x_test,y_test, args):


    # callbacks
    log = callbacks.CSVLogger(args.save_dir + '/log.csv')
    tb = callbacks.TensorBoard(log_dir=args.save_dir + '/tensorboard-logs', histogram_freq=int(args.debug))
    !mkdir "./result/vl_cp"
    !mkdir "./result/tr_cp"
    filepath_vlcp = args.save_dir + "/vl_cp/imp_vl.hdf5"
    filepath_trcp = args.save_dir + "/tr_cp/imp_tr.hdf5"
    vl_cp = tf.keras.callbacks.ModelCheckpoint(filepath=filepath_vlcp, monitor='val_loss', verbose=0, save_best_only=True, save_weights_only=True, mode='auto')
    tr_cp = tf.keras.callbacks.ModelCheckpoint(filepath=filepath_trcp, monitor='loss', verbose=0, save_best_only=True, save_weights_only=True, mode='auto')
    lr_decay = callbacks.LearningRateScheduler(schedule=lambda epoch: args.lr * (args.lr_decay ** epoch))

    # compile the model
    model.compile(optimizer=optimizers.Adam(learning_rate=args.lr),
                  loss=['mse'])

    tic = time.time()
    
    model.fit(x_train, y_train, batch_size=args.batch_size, epochs=args.epochs,
              validation_data=[x_test, y_test], callbacks=[log, tb,vl_cp,tr_cp, lr_decay])
    tic2 = time.time()

    model.save_weights(args.save_dir + '/trained_model.h5')
    print('Trained model saved to \'%s/trained_model.h5\'' % args.save_dir)
    plot_log(args.save_dir + '/log.csv', show=True)

    return model

In [None]:
def test(model, x_test,y_test, args):
    x_recon = model.predict(x_test, batch_size=100)
    print('-'*30 + 'Begin: test' + '-'*30)
    #print('Test acc:', np.sum(np.argmax(y_pred, 1) == np.argmax(y_test, 1))/y_test.shape[0])

    img = combine_images(np.concatenate([y_test[:50],x_recon[:50]]))
    image = img * 255
    Image.fromarray(image.astype(np.uint8)).save(args.save_dir + "/real_and_recon.png")
    print()
    print('Reconstructed images are saved to %s/real_and_recon.png' % args.save_dir)
    print('-' * 30 + 'End: test' + '-' * 30)
    plt.imshow(plt.imread(args.save_dir + "/real_and_recon.png"))
    plt.show()

In [None]:
parser = argparse.ArgumentParser(description="Capsule Network on MNIST.")
parser.add_argument('--epochs', default=140, type=int)
parser.add_argument('--batch_size', default=25, type=int)
parser.add_argument('--lr', default=0.05,type=float,
                        help="Initial learning rate")
parser.add_argument('--lr_decay', default=0.98, type=float,
                        help="The value multiplied by lr at each epoch. Set a larger value for larger epochs")
parser.add_argument('-r', '--routings', default=3, type=int,
                        help="Number of iterations used in routing algorithm. should > 0")

parser.add_argument('--debug', action='store_true',
                        help="Save weights by TensorBoard")
parser.add_argument('--save_dir', default='./result')

args, unknown = parser.parse_known_args()
print(args)

if not os.path.exists(args.save_dir):
    os.makedirs(args.save_dir)

In [None]:
model, eval_model = CapsNet(input_shape=x_train.shape[1:], n_class = 30, routings = args.routings)
model.summary()

In [None]:
train(model=model,x_train = x_train, y_train = y_train, x_test = x_val, y_test = y_val, args=args)

In [None]:
model2, eval_model2 = CapsNet(input_shape=x_train.shape[1:], n_class = 30, routings = args.routings)
model2.compile(optimizer=optimizers.Adam(learning_rate=args.lr),
                  loss=['mse'])
model2.load_weights('/content/result/tr_cp/imp_tr.hdf5')

In [None]:
x_recon1 = model2.predict(x_train, batch_size=50)



In [None]:
x_recon2 = model2.predict(x_test, batch_size=50)



In [None]:
x_recon3 = model2.predict(x_val, batch_size=50)



In [None]:
mse_val = model2.evaluate(x = x_val,y = y_val, batch_size=50)



In [None]:
mse_test = model2.evaluate(x = x_test,y = y_test, batch_size=50)



In [None]:
mse_train = model2.evaluate(x = x_train,y = y_train, batch_size=50)

