In [None]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold
from sklearn.metrics import roc_auc_score

In [None]:
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

In [None]:
# Define synthetic data parameters
num_samples = int(1e6)
num_features = 500
num_classes = 2


# Generate coefficients for the logistic regression function
coefficients = np.random.uniform(low=-5, high=5, size=(num_features))

# Generate synthetic data
# X_synthetic = np.random.uniform(low=-3, high=3, size=(num_samples, num_features))
X_synthetic = np.random.normal(loc=0, scale=1, size=(num_samples, num_features))


# Generate labels using a logistic regression function
logits = np.dot(X_synthetic, coefficients)
probabilities = 1 / (1 + np.exp(-logits))
y_synthetic = np.round(probabilities).astype(int)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_synthetic, y_synthetic, test_size=0.2, random_state=42)

# Standardize the data
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test_all = scaler.transform(X_test)
y_test_all = y_test

In [None]:
"""
The abstract robust model
"""
import tensorflow as tf
import numpy as np


class RobustifyNetwork(tf.keras.Model):

    def __init__(self, num_classes, epsilon):
        super(RobustifyNetwork, self).__init__()

        self.num_classes = num_classes
        self.train_variables = []
        pass

    def feedforward_pass(self, input):
        pass

    def __call__(self, input):
        self.x_input = input

        return self.feedforward_pass(input)

    @tf.function
    def train_step(self, input, label, epsilon, robust=True, type_robust='linf',):
        self._full_call(input, label, epsilon, robust=robust,
                        evaluate=False, type_robust=type_robust)

    @tf.function
    def evaluate(self, input, label, epsilon, step=-1, summary=None, type_robust='linf', evaluate_bound=False):
        self._full_call(input, label, epsilon, step=step, robust=True,
                        evaluate=True, summary=summary, type_robust=type_robust)

    def evaluate_bound(self, input, label, epsilon):
        self._full_call(input, label, epsilon,  robust=True,  type_robust='certificate',
                      evaluate=True)

    def evaluate_approx_bound(self, input, label, epsilon, type_robust):
        self._full_call(input, label, epsilon,  robust=True,  type_robust=type_robust,
                      evaluate=True, evaluate_bound=True)

    def _full_call(self, input, label, epsilon, robust=True, evaluate=False, evaluate_bound=False,
                 summary=None, step=-1, type_robust='linf'):

        self.x_input = input
        self.y_input = label

        if not robust: #vanilla training

            print("Not robust")

            with tf.GradientTape() as self.tape:

                self.feedforward_pass(self.x_input)

                y_xent = tf.nn.sparse_softmax_cross_entropy_with_logits(
                      labels=tf.cast(self.y_input, tf.int32), logits=self.pre_softmax)
                self.loss = tf.reduce_mean(y_xent)

        else: #robust training

            if type_robust == "clipping":
                self.M = tf.math.minimum(1 - self.x_input, epsilon)
                self.m = tf.math.maximum(-self.x_input, -epsilon)

            if not type_robust == "certificate":

                with tf.GradientTape() as self.tape:
                    with tf.GradientTape(persistent=True) as self.second_tape:

                        self.second_tape.watch(self.x_input)
                        self.feedforward_pass(self.x_input)

                        if type_robust == 'grad': #linf
                            y_xent = tf.nn.sparse_softmax_cross_entropy_with_logits(
                                labels=self.y_input, logits=self.pre_softmax)
                            self.xent = tf.reduce_mean(y_xent)
                            self.loss = self.xent + epsilon*self.second_tape.gradient(self.xent, self.x_input)

                        else:
                            # Compute linear approximation for robust cross-entropy.
                            data_range = tf.range(tf.shape(self.y_input)[0])
                            indices = tf.map_fn(lambda n: tf.stack([n, tf.cast(self.y_input[n], tf.int32)]), data_range)

                            self.nom_exponent = []
                            for i in range(self.num_classes):
                                self.nom_exponent += [self.pre_softmax[:,i] - tf.gather_nd(self.pre_softmax, indices)]

                            sum_exps = 0
                            for i in range(self.num_classes):
                                grad = self.second_tape.gradient(self.nom_exponent[i], self.x_input)
                                if type_robust == 'clipping': #linf
                                    positive_terms = tf.math.multiply(self.M, tf.nn.relu(grad[0]))
                                    negative_terms = tf.math.multiply(self.m, tf.nn.relu(-grad[0]))
                                    exponent = tf.reduce_sum(positive_terms - negative_terms, axis=1) + self.nom_exponent[i]
                                elif type_robust == 'l1':
                                    exponent = np.sqrt(self.num_features) * epsilon * tf.reduce_max(tf.abs(grad), axis=1) + \
                                               self.nom_exponent[i]
                                elif type_robust == 'l1+inf':
                                    # exponent = np.sqrt(self.num_features) * epsilon * tf.reduce_max(tf.abs(grad), axis=1) + \
                                    #          epsilon * tf.reduce_sum(tf.abs(grad), axis=1) + self.nom_exponent[i]
                                    # exponent = tf.sqrt(tf.convert_to_tensor(self.num_features, dtype=tf.float32)) * epsilon * tf.reduce_max(tf.abs(grad), axis=1) + \
                                    #  epsilon * tf.reduce_sum(tf.abs(grad), axis=1) + self.nom_exponent[i]
                                    # exponent = tf.sqrt(tf.cast(self.num_features, dtype=tf.float32)) * epsilon * tf.reduce_max(tf.abs(grad), axis=1) + epsilon * tf.reduce_sum(tf.abs(grad), axis=1) + self.nom_exponent[i]
                                    exponent = tf.sqrt(tf.cast(self.num_features, dtype=tf.float32)) * epsilon * tf.reduce_max(tf.abs(grad), axis=1) + epsilon * tf.reduce_sum(tf.square(grad), axis=1) + self.nom_exponent[i]
                                    # exponent = epsilon * tf.reduce_sum(tf.square(grad), axis=1) + self.nom_exponent[i]

                                else: #linf
                                    exponent = epsilon * tf.reduce_sum(tf.abs(grad), axis=1) + self.nom_exponent[i]
                                sum_exps += tf.math.exp(exponent)

                            self.loss = tf.reduce_mean(tf.math.log(sum_exps))

            else: #certificate objective:
                if not evaluate:
                    with tf.GradientTape() as self.tape:

                        self.feedforward_pass(self.x_input)

                        #L1 certificate -- epsilon is converted
                        self.loss, self.acc_bound = self.certificate_loss(np.sqrt(self.num_features) * epsilon, label)

                else:
                    self.feedforward_pass(self.x_input)

                    self.loss, self.acc_bound = self.certificate_loss(epsilon, label)

                    self.acc_bound = (self.acc_bound).numpy()

        if not evaluate:

            self.optimizer.apply_gradients(zip(self.tape.gradient(self.loss, self.train_variables),
                                               self.train_variables))
            #print("\n Graph Created! \n")

        else:
            # Evaluation
            y_xent = tf.nn.sparse_softmax_cross_entropy_with_logits(
                labels=tf.cast(label, tf.int32), logits=self.pre_softmax)
            self.xent = tf.reduce_mean(y_xent)

            self.y_pred = tf.argmax(self.pre_softmax, 1)
            correct_prediction = tf.equal(tf.cast(self.y_pred, tf.int64), label)
            self.num_correct = tf.reduce_sum(tf.cast(correct_prediction, tf.int64))
            self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

            if evaluate_bound==True:
                self.eval_approx_bound = (self.loss).numpy()
                self.eval_xent = (self.xent).numpy()

            if summary:
                with summary.as_default():
                    tf.summary.scalar('Cross Entropy', self.xent, step)
                    tf.summary.scalar('Accuracy', self.accuracy, step)
                    tf.summary.scalar('Robust Loss', self.loss, step)
                    tf.summary.scalar('Learning Rate', self.optimizer.learning_rate(step), step)
                    tf.summary.scalar('Epsilon', epsilon, step)

            #print("\n Evaluate Graph Created! \n")

    def load_all(self, path, load_optimizer=True):

        if load_optimizer:
            opt_weights = np.load(path + '_optimizer.npy', allow_pickle=True)

            grad_vars = self.trainable_weights
            zero_grads = [tf.zeros_like(w) for w in grad_vars]
            self.optimizer.apply_gradients(zip(zero_grads, grad_vars))
            self.optimizer.set_weights(opt_weights)

        self.load_weights(path)

    def save_all(self, path):
        self.save_weights(path)
        np.save(path + '_optimizer.npy', self.optimizer.get_weights())

In [None]:
import tensorflow as tf
import numpy as np

class robustMLP(RobustifyNetwork):
    def __init__(self, config, num_features):
        super().__init__(config['num_classes'], config['epsilon'])

        l1_size = config['l1_size']
        l2_size = config['l2_size']
        l3_size = config['l3_size']
        l4_size = config['l4_size']

        initial_learning_rate = float(config['initial_learning_rate'])
        training_batch_size = config['training_batch_size']
        num_classes = config['num_classes']
        batch_decrease_learning_rate = float(config['batch_decrease_learning_rate'])

        self.mode = 'train'
        self.num_features = num_features
        self.train_variables = []

        self.W1 = self._weight_variable([num_features, l1_size])
        self.train_variables += [self.W1]
        self.b1 = self._bias_variable([l1_size])
        self.train_variables += [self.b1]

        self.W2 = self._weight_variable([l1_size, l2_size])
        self.train_variables += [self.W2]
        self.b2 = self._bias_variable([l2_size])
        self.train_variables += [self.b2]

        self.W3 = self._weight_variable([l2_size, l3_size])
        self.train_variables += [self.W3]
        self.b3 = self._bias_variable([l3_size])
        self.train_variables += [self.b3]

        self.W4 = self._weight_variable([l3_size, l4_size])
        self.train_variables += [self.W4]
        self.b4 = self._bias_variable([l4_size])
        self.train_variables += [self.b4]

        self.W5 = self._weight_variable([l4_size, num_classes])
        self.train_variables += [self.W5]
        self.b5 = self._bias_variable([num_classes])
        self.train_variables += [self.b5]

        # Setting up the optimizer
        self.learning_rate = tf.keras.optimizers.schedules.ExponentialDecay(
            initial_learning_rate,
            training_batch_size * batch_decrease_learning_rate,
            0.85,
            staircase=True
        )

        self.optimizer = tf.keras.optimizers.Adam(self.learning_rate)

    def feedforward_pass(self, input):
        # Fully connected layers.
        self.h1 = tf.nn.relu(tf.matmul(tf.cast(input, dtype=tf.float32), self.W1) + self.b1)
        self.h2 = tf.nn.relu(tf.matmul(self.h1, self.W2) + self.b2)
        self.h3 = tf.nn.relu(tf.matmul(self.h2, self.W3) + self.b3)
        self.h4 = tf.nn.relu(tf.matmul(self.h3, self.W4) + self.b4)
        self.pre_softmax = tf.matmul(self.h4, self.W5) + self.b5
        return self.pre_softmax

    def set_mode(self, mode='train'):
        self.mode = mode

    @staticmethod
    def _weight_variable(shape):
        initial = tf.keras.initializers.GlorotUniform()
        return tf.Variable(initial_value=initial(shape), name=str(np.random.randint(1e10)), trainable=True)

    @staticmethod
    def _bias_variable(shape):
        initial = tf.constant(0.1, shape=shape)
        return tf.Variable(initial)


In [None]:
with tf.device('/device:GPU:0'):

  num_runs = 5

  hypothesis_stability_res = []
  uniform_stability_res = []
  error_stability_res = []

  for i in range(num_runs):

    num_samples = 250
    X_synthetic = np.random.normal(loc=0, scale=1, size=(num_samples, num_features))
    logits = np.dot(X_synthetic, coefficients)
    probabilities = 1 / (1 + np.exp(-logits))
    y_synthetic = np.round(probabilities).astype(int)
    X_train, X_test, y_train, y_test = train_test_split(X_synthetic, y_synthetic, test_size=0.2, random_state=42)

    # Standardize the data
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    rhos = [0.00, 0.001, 0.01, 0.05, 0.1, 0.3, 0.5, 1.0, 1.5]
    # rhos = [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
    # rhos = [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
    rhos = np.logspace(-4, 0.3, 10)
    rhos = rhos.astype(np.float32)
    # rhos = rhos[0:3]
    # rhos = [1.5]
    Ws = []
    hypothesis_stability_rhos = []
    uniform_stability_rhos = []
    error_stability_rhos = []

    num_folds = 10

    # Split the training data into 10 folds
    kf = KFold(n_splits=num_folds, shuffle=True)


    for rho in rhos:

      config = {
          'num_classes': 2,
          'epsilon': rho,
          'l1_size': 16,
          'l2_size': 8,
          'l3_size': 8,
          'l4_size': 4,
          'initial_learning_rate': 0.01,
          'training_batch_size': 64,
          'batch_decrease_learning_rate': 1
      }

      num_features = num_features  # Replace with the actual number of features
      model_all = robustMLP(config, num_features)

      num_epochs = 10

      for epoch in range(num_epochs):
        for i in range(0, len(X_train), config['training_batch_size']):
            inputs = X_train[i:i + config['training_batch_size']]
            labels = y_train[i:i + config['training_batch_size']]

            inputs = inputs.astype(np.float32)
            labels = labels.astype(np.float32)

            # Train with 'l1+inf' loss
            model_all.train_step(inputs, labels, epsilon=config['epsilon'], type_robust='l1+inf')
            # model_all.train_step(inputs, labels, epsilon=config['epsilon'], robust=False)

        # # Evaluate on the test data after each epoch
        # train_predictions = tf.nn.softmax(model_all.feedforward_pass(X_train.astype(np.float32)), axis=None, name=None)
        # train_accuracy = accuracy_score(y_train, np.argmax(train_predictions, axis=1))
        # test_predictions = tf.nn.softmax(model_all.feedforward_pass(X_test.astype(np.float32)), axis=None, name=None)
        # test_accuracy = accuracy_score(y_test, np.argmax(test_predictions, axis=1))

        # # Assuming binary classification for AUC calculation
        # train_auc = roc_auc_score(y_train, train_predictions[:, 1])
        # test_auc = roc_auc_score(y_test, test_predictions[:, 1])

      W = np.array(model_all.get_weights(), dtype=object)
      Ws.append(sum(sum(abs(W[0]))) + sum(abs(W[1])))

      # Create an array to store the weights of each model
      all_model_weights = []

      for fold, (train_index, test_index) in enumerate(kf.split(X_train)):

          # Get the training and test data for this fold
          X_fold_train, X_fold_test = X_train[train_index], X_train[test_index]
          y_fold_train, y_fold_test = y_train[train_index], y_train[test_index]

          # Create an instance of your robustOneLayer model for each fold
          model = robustMLP(config, num_features)

          for epoch in range(num_epochs):
            for i in range(0, len(X_train), config['training_batch_size']):
                inputs = X_train[i:i + config['training_batch_size']]
                labels = y_train[i:i + config['training_batch_size']]

                inputs = inputs.astype(np.float32)
                labels = labels.astype(np.float32)

                # Train with 'l1+inf' loss
                model.train_step(inputs, labels, epsilon=config['epsilon'], type_robust='l1+inf')
                # model.train_step(inputs, labels, epsilon=config['epsilon'], robust=False)

          # Save the weights of the model after each fold
          all_model_weights.append(np.array(model.get_weights(), dtype=object))

      hypothesis_stability_all = []
      uniform_stability_all = []
      error_stability_all = []

      for fold2_weights in all_model_weights:  # Skip the first fold since it's already used for model_fold1

          # Calculate the loss at each point in the test set separately
          max_loss_diff = 0.0

          # Create a new instance of the model_fold2 for each fold
          model_fold2 = robustMLP(config, num_features)
          model_fold2.set_weights(fold2_weights)

          # Model 1
          logits_test_fold1 = tf.nn.softmax(model_all.feedforward_pass(X_test_all))  # Note: Pass the entire test set as a batch
          loss_fold1 = tf.keras.losses.sparse_categorical_crossentropy(y_test_all, logits_test_fold1)

          # Model 2
          logits_test_fold2 = tf.nn.softmax(model_fold2.feedforward_pass(X_test_all))
          loss_fold2 = tf.keras.losses.sparse_categorical_crossentropy(y_test_all, logits_test_fold2)

          # Calculate the absolute difference in loss
          loss_diff_at_point = np.abs(loss_fold1.numpy() - loss_fold2.numpy())

          # Update the maximum absolute difference
          hypothesis_stability = np.mean(loss_diff_at_point)
          uniform_stability = np.max(loss_diff_at_point)
          error_stability = np.abs(np.mean(loss_fold1.numpy()) - np.mean(loss_fold2.numpy()))
          error_stability_all.append(error_stability)
          hypothesis_stability_all.append(hypothesis_stability)
          uniform_stability_all.append(uniform_stability)

          # print(f"Maximum Absolute Difference in Cross-Entropy Loss at Each Point: {max_loss_diff2:.2f}")

      hypothesis_stability_rhos.append(np.max(hypothesis_stability_all))
      uniform_stability_rhos.append(np.max(uniform_stability_all))
      error_stability_rhos.append(np.max(error_stability_all))

    hypothesis_stability_res.append(hypothesis_stability_rhos)
    uniform_stability_res.append(uniform_stability_rhos)
    error_stability_res.append(error_stability_rhos)

In [None]:
Ws

In [None]:
import csv


# Specify the CSV file path
csv_file_path = 'hypothesis_stability_5_layer_2.csv'

# Open the CSV file in write mode
with open(csv_file_path, 'w', newline='') as csv_file:
    # Create a CSV writer object
    csv_writer = csv.writer(csv_file)

    # Write each inner array as a row in the CSV file
    for row in hypothesis_stability_res:
        csv_writer.writerow(row)

print(f'The data has been successfully written to {csv_file_path}.')