## Setup

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.layers import (
    Concatenate, Lambda, Dense, Dropout, Activation, Flatten, LSTM, SpatialDropout2D, Conv2D, MaxPooling2D,
    AveragePooling2D, GlobalAveragePooling2D, UpSampling2D, BatchNormalization, ReLU, Input
)
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras import optimizers, regularizers, backend as K
from sklearn.metrics import (
    confusion_matrix, accuracy_score, f1_score, recall_score, precision_score, classification_report,
    cohen_kappa_score, hamming_loss, log_loss, zero_one_loss, matthews_corrcoef, roc_curve, auc
)
from sklearn.preprocessing import MinMaxScaler, StandardScaler, label_binarize
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
from tensorflow.keras.utils import to_categorical, plot_model
from time import time
import time as tm
import datetime
from skimage.util.shape import view_as_blocks
import glob
import os
import random
import ntpath
import copy
from tensorflow import keras
from tensorflow.keras import layers

from tfkan.layers import DenseKAN

## Filters and Dataset 

In [2]:
################################################## 30 SRM FILTERS
srm_weights = np.load('../SRM_Kernels.npy') 
biasSRM=np.ones(30)
print (srm_weights.shape)
################################################## TLU ACTIVATION FUNCTION
T3 = 3;
def Tanh3(x):
    tanh3 = K.tanh(x)*T3
    return tanh3
##################################################
def thtanh(x,t):
    th=K.tanh(x)*t
    return th

(5, 5, 1, 30)


S-UNIWARD BOSSbase 1.01 PAYLOAD = 0.4bpp

In [3]:
#Train
X_train = np.load('../data_gbras/X_train.npy')
y_train = np.load('../data_gbras/y_train.npy')
#Valid
X_valid = np.load('../data_gbras/X_valid.npy')
y_valid = np.load('../data_gbras/y_valid.npy')
#Test
X_test = np.load('../data_gbras/X_test.npy')
y_test = np.load('../data_gbras/y_test.npy')

print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)
print(X_test.shape)
print(y_test.shape)

(12000, 256, 256, 1)
(12000, 2)
(4000, 256, 256, 1)
(4000, 2)
(6000, 256, 256, 1)
(4000, 2)


## Functions arquitecture

In [4]:
def squeeze_excitation_layer(input_layer, out_dim, ratio, conv):
  squeeze = tf.keras.layers.GlobalAveragePooling2D()(input_layer)
  excitation = tf.keras.layers.Dense(units=out_dim / ratio, activation='relu')(squeeze)
  excitation = tf.keras.layers.Dense(out_dim,activation='sigmoid')(excitation)
  excitation = tf.reshape(excitation, [-1,1,1,out_dim])
  scale = tf.keras.layers.multiply([input_layer, excitation])
  if conv:
    shortcut = tf.keras.layers.Conv2D(out_dim,kernel_size=1,strides=1,
                                      padding='same',kernel_initializer='he_normal')(input_layer)
    shortcut = tf.keras.layers.BatchNormalization()(shortcut)
  else:
    shortcut = input_layer
  out = tf.keras.layers.add([shortcut, scale])
  return out



def sreLu (input):
  return ReLU(negative_slope=0.1, threshold=0)(input)

def sConv(input,parameters,size,nstrides):
  return Conv2D(parameters, (size,size), strides=(nstrides,nstrides),padding="same", kernel_initializer='glorot_normal', kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(input)

def sBN (input):
  return tf.keras.layers.BatchNormalization(momentum=0.2, epsilon=0.001, center=True, scale=True, trainable=True, fused=None, renorm=False, renorm_clipping=None, renorm_momentum=0.4, adjustment=None)(input)

def sGlobal_Avg_Pooling (input):
  return tf.keras.layers.GlobalAveragePooling2D()(input)

def sDense (input, n_units, activate_c):
  return tf.keras.layers.Dense(n_units,activation=activate_c)(input)

def smultiply (input_1, input_2):
  return tf.keras.layers.multiply([input_1, input_2])

def sadd (input_1, input_2):
  return tf.keras.layers.add([input_1, input_2])

# Initial learning rate for the optimizer
initial_learning_rate = 0.001

# Learning rate schedule with exponential decay
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate, decay_steps=100000, decay_rate=0.96, staircase=True
)
  

### Blocks

In [5]:
def Block_1 (input, parameter):
  output = sConv(input, parameter, 3, 1)
  output = sBN(output)
  output = sreLu(output)
  return output
  


def SE_Block(input, out_dim, ratio):
  output = sGlobal_Avg_Pooling(input)
  output = sDense(output, out_dim/ratio, 'relu')
  output = sDense(output, out_dim, 'sigmoid')
  return output
  
  
  
def Block_2 (input, parameter):
  output = Block_1(input, parameter)
  output = sConv(output, parameter, 3, 1)
  output = sBN(output)
  multiplier = SE_Block(output,  parameter, parameter)
  # output = smultiply(output, output)
  output = smultiply(multiplier, output)
  output = sadd(output, input)
  return output
  
  

def Block_3 (input, parameter):
  addition = sConv(input, parameter, 1, 2)
  addition = sBN(addition)
  output = sConv(input, parameter, 3, 2)
  output = sBN(output)
  output = sreLu(output)
  output = sConv(output, parameter, 3, 1)
  output = sBN(output)
  multiplier = SE_Block(output,  parameter, parameter)
  output = smultiply(multiplier, output)
  output = sadd(output, addition)
  return output  
  
def Block_4 (input, parameter):
  output = Block_1(input, parameter)
  output = sConv(input, parameter, 3, 1)
  output = sBN(output)
  
  return output  

### CapsNet

In [7]:
num_clases=5
vec=16
class Capsnet(Layer):
    # creating a layer class in keras
    def __init__(self, **kwargs):
        super(Capsnet, self).__init__(**kwargs)
        self.kernel_initializer = tf.keras.initializers.get('glorot_uniform')

    def build(self, input_shape):
        # initialize weight matrix for each capsule in lower layer
        self.W = self.add_weight(shape = [1,1,num_clases, vec, 512], initializer = self.kernel_initializer, name = 'weights')
        self.built = True

    def call(self, inputs):

        u = tf.reshape(inputs, (-1, 1, 512)) # u.shape: (None, 1152, 8)
        u = tf.expand_dims(u, axis=-2) # u.shape: (None, 1152, 1, 8)
        u = tf.expand_dims(u, axis=-1) # u.shape: (None, 1152, 1, 8, 1)
        u_hat = tf.matmul(self.W, u) # u_hat.shape: (None, 1152, 10, 16, 1)
        u_hat = tf.squeeze(u_hat, [4]) # u_hat.shape: (None, 1152, 10, 16)
        b = tf.zeros(shape = [K.shape(inputs)[0],2, num_clases, 1])

# routing algorithm with updating coupling coefficient c, using scalar product b/w input capsule and output capsule
        for i in range(3-1):
            c = tf.nn.softmax(b, axis=-2) # c.shape: (None, 1152, 10, 1)
            s = tf.reduce_sum(tf.multiply(c, u_hat), axis=1, keepdims=True) # s.shape: (None, 1, 10, 16)
            v = squash(s) # v.shape: (None, 1, 10, 16)
            agreement = tf.squeeze(tf.matmul(tf.expand_dims(u_hat, axis=-1), tf.expand_dims(v, axis=-1), transpose_a=True), [4]) # agreement.shape: (None, 1152, 10, 1)
            # Before matmul following intermediate shapes are present, they are not assigned to a variable but just for understanding the code.
            # u_hat.shape (Intermediate shape) : (None, 1152, 10, 16, 1)
            # v.shape (Intermediate shape): (None, 1, 10, 16, 1)
            # Since the first parameter of matmul is to be transposed its shape becomes:(None, 1152, 10, 1, 16)
            # Now matmul is performed in the last two dimensions, and others are broadcasted
            # Before squeezing we have an intermediate shape of (None, 1152, 10, 1, 1)
            b += agreement

        return v

    def compute_output_shape(self, input_shape):
        return tuple([None, num_clases, vec])

epsilon = 1e-7

def output_layer(inputs):
    return K.sqrt(K.sum(K.square(inputs), -1) + K.epsilon())

def squash(inputs):
    # take norm of input vectors
    squared_norm = tf.keras.backend.sum(tf.keras.backend.square(inputs), axis = -1, keepdims = True)

    # use the formula for non-linear function to return squashed output
    return ((squared_norm/(1+squared_norm))/(K.sqrt(squared_norm+K.epsilon())))*inputs

def safe_norm(v, axis=-1, epsilon=1e-7):
    v_ = tf.reduce_sum(tf.square(v), axis = axis, keepdims=True)
    return tf.sqrt(v_ + epsilon)

## Functions to training

In [8]:

def train(model, X_train, y_train, X_valid, y_valid, X_test, y_test, batch_size, epochs, initial_epoch = 0, model_name="", num_test=""):
    start_time = tm.time()
    log_dir="D:/testing_"+num_test+"/"+model_name+"_"+"{}".format(time())
    tensorboard = tf.keras.callbacks.TensorBoard(log_dir)
    filepath = log_dir+"/saved-model.hdf5"
    checkpoint = tf.keras.callbacks.ModelCheckpoint(
        filepath, 
        monitor='val_accuracy', 
        save_best_only=True, 
        mode='max',
        verbose=1
    )
    model.reset_states()
    history=model.fit(X_train, y_train, epochs=epochs, 
                        callbacks=[tensorboard,  checkpoint], 
                        batch_size=batch_size,
                        validation_data=(X_valid, y_valid),
                        initial_epoch=initial_epoch)
    
    metrics = model.evaluate(X_test, y_test, verbose=0)
    results_dir="D:/testing_"+num_test+"/"+model_name+"/"
    if not os.path.exists(results_dir):
        os.makedirs(results_dir)
      
        plt.figure(figsize=(10, 10))
        #plt.subplot(1,2,1)
        #Plot training & validation accuracy values
        plt.plot(history.history['accuracy'])
        plt.plot(history.history['val_accuracy'])
        plt.title('Accuracy Vs Epochs')
        plt.ylabel('Accuracy')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        plt.grid('on')
        plt.savefig(results_dir+'Accuracy_Xu_Net_'+model_name+'.eps', format='eps')
        plt.savefig(results_dir+'Accuracy_Xu_Net_'+model_name+'.svg', format='svg')
        plt.savefig(results_dir+'Accuracy_Xu_Net_'+model_name+'.pdf', format='pdf')
        plt.show()
        
        plt.figure(figsize=(10, 10))
        #plt.subplot(1,2,2)
        #Plot training & validation loss values
        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.title('Loss Vs Epochs')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        plt.grid('on')
        plt.savefig(results_dir+'Loss_Xu_Net_'+model_name+'.eps', format='eps')
        plt.savefig(results_dir+'Loss_Xu_Net_'+model_name+'.svg', format='svg')
        plt.savefig(results_dir+'Loss_Xu_Net_'+model_name+'.pdf', format='pdf')
        plt.show()

    TIME = tm.time() - start_time
    print("Time "+model_name+" = %s [seconds]" % TIME)
    return {k:v for k,v in zip (model.metrics_names, metrics)}

## ConvCapsNet

In [9]:
def new_arch():
  tf.keras.backend.clear_session()
  inputs = tf.keras.Input(shape=(256,256,1), name="input_1")
  #Layer 1
  layers_ty = tf.keras.layers.Conv2D(30, (5,5), weights=[srm_weights,biasSRM], strides=(1,1), padding='same', trainable=False, activation=Tanh3, use_bias=True)(inputs)
  layers_tn = tf.keras.layers.Conv2D(30, (5,5), weights=[srm_weights,biasSRM], strides=(1,1), padding='same', trainable=True, activation=Tanh3, use_bias=True)(inputs)

  layers = tf.keras.layers.add([layers_ty, layers_tn])
  layers1 = tf.keras.layers.BatchNormalization(momentum=0.2, epsilon=0.001, center=True, scale=False, trainable=True, fused=None, renorm=False, renorm_clipping=None, renorm_momentum=0.4, adjustment=None)(layers)
  #Layer 2
  
  # L1
  layers = Block_1(layers1,64)

  # L2
  layers = Block_1(layers,64)

  # L3 - L7
  for i in range(5):
    layers = Block_2(layers,64)

  # L8 - L11
  for i in [64, 64, 128, 256]:
    layers = Block_3(layers,i)

  # L12
  layers = Block_4(layers,512)
  print('output last layer before transformer', layers.shape)

  Pool_1=tf.keras.layers.MaxPool1D(2)(layers)
  #---------------------------------------------------Fin de Convolutional stage 2------------------------------------------------------------------------#
  # Classify outputs.
      #CAPSNET
  squashed_output = tf.keras.layers.Lambda(squash)(Pool_1)
  representation = Capsnet()(squashed_output)
  representation = tf.keras.layers.GlobalAvgPool1D()(representation)
  #Acont= safe_norm(representation)
  #mast = tf.reshape(Acont, (-1,Acont.shape[2],Acont.shape[3]))
  #predictions = tf.keras.layers.Lambda(output_layer)(mast)

      #FC
  layers = Dense(128,kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(representation)
  layers = ReLU(negative_slope=0.1, threshold=0)(layers)
  layers = Dense(64,kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers)
  layers = ReLU(negative_slope=0.1, threshold=0)(layers)
  layers = Dense(32,kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers)
  layers = ReLU(negative_slope=0.1, threshold=0)(layers)

  predictions = Dense(2, activation="softmax", name="output_1",kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers)
  model =tf.keras.Model(inputs = inputs, outputs=predictions)
 

  model =tf.keras.Model(inputs = inputs, outputs=predictions)
 
  # Compile the model if the compile flag is set

  optimizer = tf.keras.optimizers.SGD(learning_rate=lr_schedule, momentum=0.95)
  if compile:
      model.compile(optimizer= optimizer,
                    loss='binary_crossentropy',
                    metrics=['accuracy'])
      
      print ("ConvCapsNet create")

  return model


model = new_arch()  
  

output last layer before transformer (None, 16, 16, 512)
(None, 16, 16, 512)
Transformer_create


## Training

In [None]:
base_name="04S-UNIWARD"
name="Model_"+'CAPSNET_prueba1'+"_"+base_name
_, history  = train(model, X_train, y_train, X_valid, y_valid, X_test, y_test, batch_size=8, epochs=800, model_name=name, num_test='1_capsnet')

Epoch 1/800
Epoch 1: val_accuracy improved from -inf to 0.50225, saving model to D:/testing_1_cvt/Model_CVT_prueba1_04S-UNIWARD_1729515488.1369982\saved-model.hdf5
Epoch 2/800
Epoch 2: val_accuracy improved from 0.50225 to 0.53375, saving model to D:/testing_1_cvt/Model_CVT_prueba1_04S-UNIWARD_1729515488.1369982\saved-model.hdf5
Epoch 3/800
Epoch 3: val_accuracy improved from 0.53375 to 0.55350, saving model to D:/testing_1_cvt/Model_CVT_prueba1_04S-UNIWARD_1729515488.1369982\saved-model.hdf5
Epoch 4/800
Epoch 4: val_accuracy improved from 0.55350 to 0.63100, saving model to D:/testing_1_cvt/Model_CVT_prueba1_04S-UNIWARD_1729515488.1369982\saved-model.hdf5
Epoch 5/800
Epoch 5: val_accuracy improved from 0.63100 to 0.66850, saving model to D:/testing_1_cvt/Model_CVT_prueba1_04S-UNIWARD_1729515488.1369982\saved-model.hdf5
Epoch 6/800
Epoch 6: val_accuracy improved from 0.66850 to 0.67175, saving model to D:/testing_1_cvt/Model_CVT_prueba1_04S-UNIWARD_1729515488.1369982\saved-model.hdf5
E