In [1]:
import keras
from keras.applications.resnet50 import ResNet50
from keras.applications.resnet50 import preprocess_input, decode_predictions
import numpy as np

import keras.ops as K
from keras.models import Model, Sequential
from keras.layers import Activation

  param_schemas = callee.param_schemas()
  param_schemas = callee.param_schemas()


In [2]:
model = ResNet50(weights='imagenet', classifier_activation=None)

img_path = 'elephant.jpg'
img = keras.utils.load_img(img_path, target_size=(224, 224))
x = keras.utils.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)

preds = model.predict(x)
# decode the results into a list of tuples (class, description, probability)
# (one such list for each sample in the batch)
print('Predicted:', decode_predictions(preds, top=3)[0])
# Predicted: [(u'n02504013', u'Indian_elephant', 0.82658225), (u'n01871265', u'tusker'

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 220ms/step
Predicted: [('n02504013', 'Indian_elephant', 16.045454), ('n02504458', 'African_elephant', 14.072982), ('n01871265', 'tusker', 13.382182)]


In [3]:
relu_name = [e.name for e in model.layers if isinstance(e, Activation) and e.name.split('_')[-1]=='out']

In [4]:
indices=[0, 4, 8, 12, -1]

In [5]:
split = [relu_name[i] for i in indices]+[model.layers[-1].name]

In [6]:
def forward_propagate(node, nested_input, layer_in):


    if not(layer_in is None) and node.operation.name == layer_in.name:
        return nested_input

    else:
        # get parent of nodes
        parent_nodes: List[None] = node.parent_nodes
        if len(parent_nodes):
            parents_inputs=[]
            for p_node in parent_nodes:
                p_input = forward_propagate(p_node, nested_input, layer_in)
                if isinstance(p_input, list):
                    parents_inputs+= p_input
                else:
                    parents_inputs.append(p_input)

    
            layer_node = node.operation
            if isinstance(layer_node.input, list):
                output = layer_node(parents_inputs)
            else:
                try:
                    output = layer_node(parents_inputs[0])
                except ValueError:
                    import pdb; pdb.set_trace()
            return output
        else:
            return nested_input
        

In [7]:
def get_nested_model(layer_out, layer_in, input_shape_wo_batch):

    if not layer_in is None:
        input_shape_wo_batch = layer_in.output.shape[1:]
        
    nested_input = keras.layers.Input(input_shape_wo_batch)
    print(nested_input)
    
    node_out_list = [n for subnodes in model._nodes_by_depth.values() for n in subnodes if n.operation.name==layer_out.name]
    node_out = node_out_list[0]

    output = forward_propagate(node_out, nested_input, layer_in)

    return Model(nested_input, output)
    

In [8]:
import keras

class LipschitzLayer(keras.layers.Layer):
    """
    Custom Keras Layer that contains an L lipschitz function in infinite norm for now ???
    """

    def __init__(self, model:Model, lipschitz:float=0.3, norm=np.inf, **kwargs):
        """
        bla bla bla

        Args:
            model: The keras model
            lipschitz: the lipschitz constant
            norm: the norm for lipschitz
        """
        super(LipschitzLayer, self).__init__(**kwargs)
        self.model: Model = model
        self.lipschitz:float = lipschitz
        self.norm:float = norm
        self.input_shape_wo_batch = list(self.model.input.shape[1:])
        self.output_shape_wo_batch = list(self.model.output.shape[1:])

    def call(self, inputs_):
        return self.model(inputs_)

    def compute_output_shape(self, input_shape):
        return self.output_shape_wo_batch

    def get_config(self):
        config = super().get_config()
        config_model = keras.saving.serialize_keras_object(self.model)
        
        config.update({"model": config_model})
        config["lipschitz"] = self.lipschitz
        config["norm"] = self.norm
        config["input_shape_wo_batch"] = self.input_shape_wo_batch
        
        return config

    @classmethod
    def from_config(cls, config):
        config_model = config.pop("model")
        model = keras.saving.deserialize_keras_object(config_model)

        lipschitz = config.pop("lipschitz")
        norm = config.pop("norm")
        input_shape_wo_batch = config.pop("input_shape_wo_batch")

        model.build(input_shape=input_shape_wo_batch)
        return cls(model=model, lipschitz=lipschitz, norm=norm, **config)

In [9]:
# override Activation and Maxpooling for jacobinet
from jacobinet.layers import BackwardLinearLayer, BackwardAveragePooling2D, BackwardBoundedLinearizedLayer

In [35]:
class BackwardLinearizedActivation(BackwardBoundedLinearizedLayer):

    # create two layer_backward: one upper and one lower
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # create layer_backward_up
        self.layer_up = keras.layers.Lambda(lambda x: x)
        self.layer_low = keras.layers.Lambda(lambda x: 0.*x)
        self.layer_backward_up = keras.layers.Lambda(lambda x: x)
        self.layer_backward_low = keras.layers.Lambda(lambda x: 0.*x)
        

    def call_on_reshaped_gradient(
        self, gradient, input=None, training=None, mask=None
    ):
        return gradient

class BackwardLinearizedMaxPooling(BackwardBoundedLinearizedLayer):
    
    # create two layer_backward: one upper and one lower
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # create layer_backward_up
        # use averagePooling2D
        pool_size =  np.prod(self.layer.pool_size)
        self.layer_backward_up = keras.layers.Lambda(lambda x: pool_size*BackwardAveragePooling2D(self.layer)(x)) # add pooling
        self.layer_backward_low = keras.layers.Lambda(lambda x: self.layer_backward_up(0*x))
        #self.layer_backward_low = self.layer_backward_up
        self.layer_backward = self.layer_backward_up 
        
        self.layer_up = self.layer # temporary, replace by averagepooling2d
        self.layer_low = keras.layers.Lambda(lambda x: self.layer(0*x))
        

In [36]:
# create a jaocbinet model
from jacobinet import clone_to_backward

mapping_keras2backward_classes = {keras.layers.Activation: BackwardLinearizedActivation,
                                  keras.layers.MaxPooling2D: BackwardLinearizedMaxPooling}

In [37]:
from jacobinet import clone_to_backward

layer_in = None
input_shape_wo_batch = list(model.input.shape[1:])
nested_layers=[]
for name in split:
    layer_out = model.get_layer(name)
    #nested_model = get_nested_model(layer_out, layer_in, input_shape_wo_batch)
    #nested_lipschitz_constant = clone_to_backward(nested_model, mapping_keras2backward_classes=mapping_keras2backward_classes)
    # compute over estimation of lipschitz
    break
    lip_layer = LipschitzLayer(model=nested_model)
    print(nested_model.input.shape[1:], nested_model.output.shape[1:])
    nested_layers.append(lip_layer)
    layer_in=layer_out
    
    

In [38]:
def get_nested_model_one_output(layer_out, layer_in, input_shape_wo_batch):

    if not layer_in is None:
        input_shape_wo_batch = layer_in.output.shape[1:]
        
    nested_input = keras.layers.Input(input_shape_wo_batch)
    print(nested_input)
    
    node_out_list = [n for subnodes in model._nodes_by_depth.values() for n in subnodes if n.operation.name==layer_out.name]
    node_out = node_out_list[0]

    output = forward_propagate(node_out, nested_input, layer_in)

    output_dim_wo_batch = output.shape[1:]
    output_flatten = keras.layers.Reshape((np.prod(output_dim_wo_batch),))(output)
    output_one = keras.layers.Dense(1, name='select_output', use_bias=False)(output_flatten)

    return Model(nested_input, output_one)
    

In [39]:
nested_model_one_output = get_nested_model_one_output(layer_out, layer_in, input_shape_wo_batch)

<KerasTensor shape=(None, 3, 224, 224), dtype=float32, sparse=False, name=keras_tensor_587>


In [40]:
dense = nested_model_one_output.get_layer('select_output')

In [41]:
mask = np.zeros((802816, 1), 'float32')
mask[0]=1


In [42]:
dense.set_weights([mask])

In [43]:
grad = clone_to_backward(nested_model_one_output, mapping_keras2backward_classes=mapping_keras2backward_classes)


In [44]:
from decomon.layers import DecomonLayer

In [45]:
from decomon.layers.backward.layer_backward import DecomonBoundedLinearizedLayerBackward

In [46]:
mapping_keras2decomon_classes = {BackwardLinearizedActivation: DecomonBoundedLinearizedLayerBackward, 
                                 BackwardLinearizedMaxPooling:DecomonBoundedLinearizedLayerBackward}

In [47]:
from decomon import clone

decomon_model_affine = clone(grad, method='forward-affine', 
                          mapping_keras2decomon_classes=mapping_keras2decomon_classes)



In [48]:
box = np.ones((1, 2, 1))

In [49]:
w_l, b_l, w_u, b_u = output = decomon_model_affine.predict(box)

Expected: ['perturbation_domain_input_backward_model_1']
Received: inputs=Tensor(shape=torch.Size([1, 2, 1]))


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 303ms/step


In [51]:
# compute upper and lower bounds to get bounds over backwardmaxpooling and relu ? to find linear state at least !

0.0

In [56]:
w_u.max()

7.97735

In [57]:
w_u.min()

0.0

In [None]:
# compare with ibp

In [None]:
k = i + floor(t)


In [30]:
np.maximum(np.abs(output[0]), np.abs(output[1])).max()

1.0

In [None]:
# compare with the real gradient to check that it is bounded ;)

# philippe mueller

In [None]:
decomon_model_ibp.inputs

In [45]:
mask = np.zeros((1, 256, 56, 56))
mask[0, 0, 2, 2]=1

In [46]:
lip_up = up_grad.predict(mask)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 79ms/step


In [47]:
lip_low = low_grad.predict(mask)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 85ms/step


In [48]:
(lip_low-lip_up).max()

0.25361466

In [13]:
input_ = keras.layers.Input((3, 224, 224))
output = input_
for layer in nested_layers:
    output = layer(output)

toto = Model(input_, output)

In [22]:
mask = np.zeros((1, 256, 56, 56))
mask[0, 0, 2, 2]=1

In [23]:
np.abs(jacobinet_model.predict(mask)).max()
# we need to run decomon with dedicated custom layers for BackwardLinearizedMaxPooling and BackwardLinearizedActivation

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 121ms/step


0.25304574

In [27]:
set_class = set()
for layer in jacobinet_model.layers:
    set_class.add(layer.__class__)

print(set_class)

{<class 'jacobinet.layers.merging.add.BackwardAdd'>, <class '__main__.BackwardLinearizedMaxPooling'>, <class 'jacobinet.models.utils.FuseGradients'>, <class 'jacobinet.layers.normalization.batch_normalization.BackwardBatchNormalization'>, <class '__main__.BackwardLinearizedActivation'>, <class 'jacobinet.layers.convolutional.conv2d.BackwardConv2D'>, <class 'keras.src.layers.core.input_layer.InputLayer'>, <class 'jacobinet.layers.reshaping.zero_padding2d.BackwardZeroPadding2D'>}


In [19]:
# compute lipschitz constant
from jacobinet.lipschitz import get_lipschitz_model
import numpy as np

In [21]:
lip_model = get_lipschitz_model(jacobinet_model, p=np.inf) # to check

In [23]:
lip_model.output

<KerasTensor shape=(1, 3, 224), dtype=float32, sparse=False, name=keras_tensor_821>

In [17]:


jacobinet_model.input

<KerasTensor shape=(None, 256, 56, 56), dtype=float32, sparse=False, name=keras_tensor_201_gradient>

In [13]:
layer.norm==np.inf

True

# create Decomon Layer

In [14]:
from decomon.layers import DecomonLayer

In [15]:
# create two layers 
class SurrogateLipschitzLayer(keras.layers.Layer):
    """
    Custom Keras Layer that contains an L lipschitz function in any norm
    """

    def __init__(self, lip_layer:LipschitzLayer, n, batch_size=10, **kwargs):
        """
        Initializes the Pow layer with a specified exponent.

        Args:
            model: The keras model
            lipschitz: the lipschitz constant
            norm: the norm for lipschitz
        """
        super(SurrogateLipschitzLayer,self).__init__(**kwargs)
        self.lip_layer:LipschitzLayer = lip_layer
        self.n = n
        self.batch_size = batch_size
        # build mask of n values
        self._mask = self.add_weight(
                name="mask",
                shape=[n]+self.lip_layer.input_shape_wo_batch,
                initializer=keras.initializers.RandomUniform(minval=0, maxval=1, seed=None),
                #constraint=self.kernel_constraint,
            ) # probably too high dimension, use a neural network instead


    def get_constant_range_norm(self, lower, upper):
        raise NotImplementedError()


    def get_range_abs(self, lower, upper):

        mask_pos = K.clip(K.sign(lower), 0, 1) # mask[i]=1 iff lower[i] >0
        mask_neg = K.clip(K.sign(-upper), 0, 1) # mask_neg[i]=1 iff upper[i] <0

        # mask_neg[i] = 1 (lower >=0)
        upper_neg = upper
        lower_neg = lower

        # mask_pos[i] = 1 (upper <=0)
        upper_pos = upper
        lower_pos = lower

        upper_else = K.maximum(lower, upper)
        lower_else = K.zeros_like(lower)

        mask_else = (1- mask_neg - mask_pos)
        upper = mask_else*upper_else + mask_pos*upper_pos + mask_neg*upper_neg
        lower = mask_else*upper_else + mask_pos*upper_pos + mask_neg*upper_neg

        return lower, upper
        
    def get_constant_upper(self, lower, upper):

        raise NotImplementedError()

    def get_constant_lower(self, lower, upper):

        raise NotImplementedError()

        

class UpperLipschitzLayer_inf(SurrogateLipschitzLayer):
    """
    Custom Keras Layer that contains an L lipschitz function in infinite norm
    """

    def get_constant_upper(self, lower, upper):
        
        coeff = K.clip(K.expand_dims(self._mask, 0), 0, 1)  #(1, n, n_i...)
        lower_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        upper_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        
        y = coeff*lower_e + (1-coeff)*upper_e # (None, n, n_i...)
        y_ = K.reshape(y, [-1]+self.lip_layer.input_shape_wo_batch) # (None*n, n_i...) 

        # do a routine to avoid memory usage ?

        # potentially need to split
        pred_y_list = [self.lip_layer(y_i) for y_i in K.split(y_, self.batch_size, 0)]
        pred_y_ = K.concatenate(pred_y_list, 0) 
        pred_y = K.reshape(pred_y_, [-1, self.n]+self.lip_layer.output_shape_wo_batch)

        n_in = np.prod(self.lip_layer.input_shape_wo_batch)
        u_flat =  K.reshape(upper, [-1, 1, n_in])
        l_flat =  K.reshape(lower, [-1, 1, n_in])
        y_flat = K.reshape(y, [-1, self.n, n_in])

        # compute L_inf distance
        lower_dist = l_flat - y_flat
        upper_dist = u_flat - y_flat
        lower_abs, upper_abs = self.get_range_abs(lower_dist, upper_dist) # |x-y|
        # lower_abs (None, n, n_in)

        # compute lower and upper of max |x-y_i] aka infinite norm
        n_out = len(self.lip_layer.output_shape_wo_batch)
        lower_norm = K.reshape(K.max(lower_abs, axis=-1, keepdims=False), [-1, self.n]+[1]*n_out) # (None, n)
        upper_norm = K.reshape(K.max(upper_abs, axis=-1, keepdims=False), [-1, self.n]+[1]*n_out) # (None, n)

        # improve lip_layer
        upper_input_min = pred_y + self.lip_layer.lipschitz*upper_norm # (None, n, n_out...)
        #lower_input_min = pred_y + self.lip_layer.lipschitz*lower_norm # (None, n, n_out...)

        return K.min(upper_input_min, 1)


class LowerLipschitzLayer_inf(SurrogateLipschitzLayer):
    """
    Custom Keras Layer that contains an L lipschitz function in infinite norm
    """

    def get_constant_lower(self, lower, upper):
        
        coeff = K.clip(K.expand_dims(self._mask, 0), 0, 1)  #(1, n, n_i...)
        lower_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        upper_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        
        y = coeff*lower_e + (1-coeff)*upper_e # (None, n, n_i...)
        y_ = K.reshape(y, [-1]+self.lip_layer.input_shape_wo_batch) # (None*n, n_i...) 


        # potentially need to split
        pred_y_list = [self.lip_layer(y_i) for y_i in K.split(y_, self.batch_size, 0)]
        pred_y_ = K.concatenate(pred_y_list, 0) 
        pred_y = K.reshape(pred_y_, [-1, self.n]+self.lip_layer.output_shape_wo_batch)

        n_in = np.prod(self.lip_layer.input_shape_wo_batch)
        u_flat =  K.reshape(upper, [-1, 1, n_in])
        l_flat =  K.reshape(lower, [-1, 1, n_in])
        y_flat = K.reshape(y, [-1, self.n, n_in])

        # compute L_inf distance
        lower_dist = l_flat - y_flat
        upper_dist = u_flat - y_flat
        lower_abs, upper_abs = self.get_range_abs(lower_dist, upper_dist) # |x-y|
        # lower_abs (None, n, n_in)

        # compute lower and upper of max |x-y_i] aka infinite norm
        n_out = len(self.lip_layer.output_shape_wo_batch)
        lower_norm = K.reshape(K.max(lower_abs, axis=-1, keepdims=False), [-1, self.n]+[1]*n_out) # (None, n)
        upper_norm = K.reshape(K.max(upper_abs, axis=-1, keepdims=False), [-1, self.n]+[1]*n_out) # (None, n)

        # improve lip_layer
        #upper_input_max = pred_y + self.lip_layer.lipschitz*upper_norm # (None, n, n_out...)
        lower_input_max = pred_y - self.lip_layer.lipschitz*upper_norm # (None, n, n_out...)

        return K.max(lower_input_max, 1)




In [16]:
class DecomonLipschitzLayer(DecomonLayer):

    layer: LipschitzLayer
    linear=True

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # create UpperLayer
        if self.layer.norm==np.inf:
            self.layer_upper = UpperLipschitzLayer_inf(self.layer, n=100) # to increase
            self.layer_lower = LowerLipschitzLayer_inf(self.layer, n=100)
            

    def forward_ibp_propagate(self, lower, upper):

        upper_out = self.layer_upper.get_constant_upper(lower, upper)
        lower_out = self.layer_lower.get_constant_lower(lower, upper)
        return lower_out, upper_out

    
        

In [17]:
from decomon import clone

In [18]:
from decomon import clone

decomon_model_ibp = clone(toto, method='forward-ibp', 
                          mapping_keras2decomon_classes={LipschitzLayer:DecomonLipschitzLayer})

box = np.concatenate([x[:,None]]*2, axis=1)

In [19]:
box = np.concatenate([x[:,None]]*2, axis=1)
pred_min, pred_max = decomon_model_ibp.predict(box)

Expected: ['perturbation_domain_input_functional_63']
Received: inputs=Tensor(shape=torch.Size([1, 2, 3, 224, 224]))


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step


In [24]:
preds

array([[ 1.38383341e+00, -1.66132724e+00, -6.52221680e-01,
        -1.55606866e+00, -2.01280165e+00, -2.04981208e-01,
        -1.88581929e-01,  6.08108401e-01, -1.19801211e+00,
         2.84951949e+00, -2.69594026e+00, -1.78844631e+00,
        -2.34228516e+00, -1.71915734e+00, -2.11963367e+00,
        -1.36635661e+00, -2.60568810e+00, -1.68987823e+00,
        -1.22667205e+00, -2.80112696e+00, -2.27826333e+00,
        -9.44791555e-01, -8.04038703e-01,  2.97354865e+00,
        -3.01802009e-01, -1.80710733e+00, -8.26533198e-01,
        -2.14105487e+00, -2.56034756e+00, -2.83927393e+00,
        -1.52490067e+00, -2.78349376e+00, -2.42548752e+00,
         1.13246775e+00,  1.24704145e-01,  4.67589229e-01,
         1.45470262e+00,  1.66417751e-02, -1.02337694e+00,
         9.83343840e-01, -1.58516836e+00, -1.98346519e+00,
        -9.58953977e-01, -7.50093400e-01,  4.01561052e-01,
         2.02960324e+00, -1.85688102e+00, -1.32057950e-01,
         6.38707256e+00,  1.29813623e+00,  1.45504564e-0

In [23]:
(pred_min- pred_max).max()

2.3841858e-07

In [21]:
pred_max

array([[6.11042083e-07, 2.74317046e-07, 3.03645237e-07, 2.76184551e-07,
        2.69330940e-07, 3.29675714e-07, 3.30868971e-07, 4.20255049e-07,
        2.84232499e-07, 1.78854316e-06, 2.63471065e-07, 2.72308597e-07,
        2.66007731e-07, 2.73371597e-07, 2.68131402e-07, 2.80089438e-07,
        2.64035776e-07, 2.73843398e-07, 2.83476993e-07, 2.62874096e-07,
        2.66570765e-07, 2.91938221e-07, 2.97144481e-07, 1.99071201e-06,
        3.23014262e-07, 2.72034697e-07, 2.96262442e-07, 2.67905904e-07,
        2.64339292e-07, 2.62672671e-07, 2.76776348e-07, 2.62969849e-07,
        2.65327913e-07, 5.32460319e-07, 3.57865133e-07, 3.98918246e-07,
        6.37006451e-07, 3.47583949e-07, 2.89334906e-07, 4.94366361e-07,
        2.75648404e-07, 2.69683369e-07, 2.91453830e-07, 2.99342076e-07,
        3.89881677e-07, 9.31872250e-07, 2.71328531e-07, 3.35135837e-07,
        5.28990277e-05, 5.82004532e-07, 3.59974706e-07, 7.87486904e-04,
        2.72713635e-07, 3.01077137e-07, 3.66969488e-07, 3.400078

In [38]:
# simple comparison on one layer
nested_model = toto.layers[1].model
simple_lip = Sequential(layers=[toto.layers[1]])

_ = nested_model(x)
_ = simple_lip(x)

In [39]:
decomon_v0 = clone(nested_model, method='forward-ibp')

In [40]:
decomon_v1 = clone(simple_lip, method='forward-ibp', mapping_keras2decomon_classes={LipschitzLayer:DecomonLipschitzLayer})

In [41]:
l_0, u_0 = decomon_v0.predict(box)

Expected: ['perturbation_domain_input_functional']
Received: inputs=Tensor(shape=torch.Size([1, 2, 3, 224, 224]))


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step


In [42]:
l_1, u_1 = decomon_v1.predict(box)

Expected: ['perturbation_domain_input_sequential_2']
Received: inputs=Tensor(shape=torch.Size([1, 2, 3, 224, 224]))


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 492ms/step


In [50]:
l_0 >l_1

array([[[[ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         ...,
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True]],

        [[ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         ...,
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True]],

        [[ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         ...,
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  T

In [49]:
u_0 <u_1

array([[[[ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         ...,
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True]],

        [[ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         ...,
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True]],

        [[ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  True, ...,  True,  True,  True],
         ...,
         [ True,  True,  True, ...,  True,  True,  True],
         [ True,  True,  T

In [29]:
# step 1: use jacobinet to obtain lipschitz constant
# step 2: create an oracle map to optimize ibp ?
# step 2: create a siamese network ...

In [27]:
pred_min

array([[-244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244.59842, -244.59842,
        -244.59842, -244.59842, -244.59842, -244

In [None]:
self._mask = self.add_weight(
                name="mask",
                shape=[n]+self.lip_layer.input_shape_wo_batch,
                initializer=keras.initializers.RandomUniform(minval=0, maxval=1, seed=None),
                constraint=self.kernel_constraint,
            )

In [27]:
tmp_model = Sequential(nested_layers)

In [28]:
nested_layers

[<LipschitzLayer name=lipschitz_layer_6, built=False>,
 <LipschitzLayer name=lipschitz_layer_7, built=False>,
 <LipschitzLayer name=lipschitz_layer_8, built=False>,
 <LipschitzLayer name=lipschitz_layer_9, built=False>,
 <LipschitzLayer name=lipschitz_layer_10, built=False>,
 <LipschitzLayer name=lipschitz_layer_11, built=False>]

In [13]:
pred_tmp=tmp_model.predict(x)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 144ms/step


In [33]:
import keras.ops as K

class LowerLipschitzLayer(keras.layers.Layer):
    """
    Custom Keras Layer that contains an L lipschitz function in infinite norm for now ???
    """

    def __init__(self, lip_layer:LipschitzLayer, n, **kwargs):
        """
        Initializes the Pow layer with a specified exponent.

        Args:
            model: The keras model
            lipschitz: the lipschitz constant
            norm: the norm for lipschitz
        """
        super(LipschitzLayer, self).__init__(**kwargs)
        self.lip_layer:LipschitzLayer = lip_layer
        self.n = n
        # build mask of n values
        self._mask = self.add_weight(
                name="mask",
                shape=[n]+self.lip_layer.input_shape_wo_batch,
                initializer=keras.initializers.RandomUniform(minval=0, maxval=1, seed=None),
                constraint=self.kernel_constraint,
            )

    def call(self, inputs_):
        x, lower, upper = inputs_ # (None, n_in..)
        # compute inner points
        coeff = K.clip(K.expand(self._mask, 0), 0, 1)  #(1, n, n_i...)
        lower_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        upper_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        
        y = coeff*lower_e + (1-coeff)*upper_e # (None, n, n_i...)
        y_ = K.reshape(y, [-1]+self.lip_layer.input_shape_wo_batch) # (None*n, n_i...) 

        pred_y_ = self.lip_layer(y_) # (None*n, n_out...) 
        n_in = np.prod(self.lip_layer.input_shape_wo_batch)
        x_flat = K.reshape(x, [-1, n_in])
        y_flat = K.reshape(y, [-1, self.n, n_in])
        dist = K.max(K.abs(K.expand_dims(x_flat, 1) - y_flat), -1, keepdims=True) # (None, n, 1)

        return K.max(K.reshape(pred_y_, [-1, self.n]+self.lip_layer.output_shape_wo_batch) - L*dist, axis=1)


class UpperLipschitzLayer(keras.layers.Layer):
    """
    Custom Keras Layer that contains an L lipschitz function in infinite norm for now ???
    """

    def __init__(self, lip_layer:LipschitzLayer, n, **kwargs):
        """
        Initializes the Pow layer with a specified exponent.

        Args:
            model: The keras model
            lipschitz: the lipschitz constant
            norm: the norm for lipschitz
        """
        super(LipschitzLayer, self).__init__(**kwargs)
        self.lip_layer:LipschitzLayer = lip_layer
        self.n = n
        # build mask of n values
        self._mask = self.add_weight(
                name="mask",
                shape=[n]+self.lip_layer.input_shape_wo_batch,
                initializer=keras.initializers.RandomUniform(minval=0, maxval=1, seed=None),
                constraint=self.kernel_constraint,
            ) # probably too high dimension, use a neural network instead

    def call(self, inputs_):
        x, lower, upper = inputs_ # (None, n_in..)
        # compute inner points
        coeff = K.clip(K.expand(self._mask, 0), 0, 1)  #(1, n, n_i...)
        lower_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        upper_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        
        y = coeff*lower_e + (1-coeff)*upper_e # (None, n, n_i...)
        y_ = K.reshape(y, [-1]+self.lip_layer.input_shape_wo_batch) # (None*n, n_i...) 

        pred_y_ = self.lip_layer(y_) # (None*n, n_out...) 
        n_in = np.prod(self.lip_layer.input_shape_wo_batch)
        x_flat = K.reshape(x, [-1, n_in])
        y_flat = K.reshape(y, [-1, self.n, n_in])
        dist = K.max(K.abs(K.expand_dims(x_flat, 1) - y_flat), -1, keepdims=True) # (None, n, 1)

        return K.min(K.reshape(pred_y_, [-1, self.n]+self.lip_layer.output_shape_wo_batch) + L*dist, axis=1)

    def get_range_abs(lower, upper):

        mask_pos = K.clip(K.sign(lower), 0, 1) # mask[i]=1 iff lower[i] >0
        mask_neg = K.clip(K.sign(-upper), 0, 1) # mask_neg[i]=1 iff upper[i] <0

        # mask_neg[i] = 1 (lower >=0)
        upper_neg = upper
        lower_neg = lower

        # mask_pos[i] = 1 (upper <=0)
        upper_pos = upper
        lower_pos = lower

        upper_else = K.maximum(lower, upper)
        lower_else = K.zeros_like(lower)

        mask_else = (1- mask_neg - mask_pos)
        upper = mask_else*upper_else + mask_pos*upper_pos + mask_neg*upper_neg
        lower = mask_else*upper_else + mask_pos*upper_pos + mask_neg*upper_neg

        return lower, upper
        

    def get_constant_upper_bound(lower, upper):

        
        coeff = K.clip(K.expand(self._mask, 0), 0, 1)  #(1, n, n_i...)
        lower_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        upper_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        
        y = coeff*lower_e + (1-coeff)*upper_e # (None, n, n_i...)
        y_ = K.reshape(y, [-1]+self.lip_layer.input_shape_wo_batch) # (None*n, n_i...) 

        pred_y_ = self.lip_layer(y_) # (None*n, n_out...) 
        pred_y = K.reshape(pred_y_, [-1, self.n]+self.lip_layer.output_shape_wo_batch)

        n_in = np.prod(self.lip_layer.input_shape_wo_batch)
        x_flat = K.reshape(x, [-1, 1, n_in])
        u_flat =  K.reshape(upper, [-1, 1, n_in])
        u_flat =  K.reshape(upper, [-1, 1, n_in])
        y_flat = K.reshape(y, [-1, self.n, n_in])

        # compute L_inf distance
        lower_dist = l_flat - y_flat
        upper_dist = u_flat - y_flat
        lower_abs, upper_abs = get_range_abs(lower_dist, upper_dist) # |x-y|
        # lower_abs (None, n, n_in)

        # compute lower and upper of max |x-y_i] aka infinite norm
        lower_norm = K.max(lower_abs, axis=-1, keepdims=True) # (None, n, 1)
        upper_norm = K.max(upper_abs, axis=-1, keepdims=True) # (None, n, 1)

        upper_input_min = pred_y + self.lip_layer.lipschitz*upper_norm # (None, n, n_out...)
        lower_input_min = pred_y + self.lip_layer.lipschitz*upper_norm # (None, n, n_out...)

        return K.min(upper_input_min, 1)

    def get_range_norm_inf(lower, upper, axis):
        # propagate bounds depending on the norm
        #norm(x) = max_i(abs(x)_i)

        # step 1: compute range of absolute value
        # lower (None, n_0.., n_axis, n_1...)
        # upper (None, n_0.., n_axis, n_1...)
        lower_abs, upper_abs = get_range_abs(lower, upper)
        # lower_abs, upper_abs (None, n_0.., n_axis, n_1...)

        # compute max over axis
        return K.max(lower_abs, axis=axis), K.max(upper_abs, axis=axis)

    def get_range_norm_2(lower, upper, axis):
        # propagate bounds depending on the norm
        #norm(x) = sqrt(sum_i(abs(x)_i**2))

        # step 1: compute range of absolute value
        # lower (None, n_0.., n_axis, n_1...)
        # upper (None, n_0.., n_axis, n_1...)
        lower_abs, upper_abs = get_range_abs(lower, upper)
        # lower_abs, upper_abs (None, n_0.., n_axis, n_1...)

        # abs(x_i)**2
        lower_pow_2= lower_abs**2
        upper_pow_2= upper_abs**2

        upper_sum = K.sum(upper_pow_2, axis=axis)
        lower_sum = K.sum(lower_pow_2, axis=axis)

        # compute max over axis
        return K.sqrt(lower_sum), K.sqrt(upper_sum)

    def get_range_norm_1(lower, upper, axis):
        # propagate bounds depending on the norm
        #norm(x) = sum_i(abs(x)_i))

        # step 1: compute range of absolute value
        # lower (None, n_0.., n_axis, n_1...)
        # upper (None, n_0.., n_axis, n_1...)
        lower_abs, upper_abs = get_range_abs(lower, upper)
        # lower_abs, upper_abs (None, n_0.., n_axis, n_1...)

        # compute max over axis
        return K.sum(lower_abs, axis=axis), K.sum(upper_abs, axis=axis)

    
    def get_affine_upper_bound_inf_old(lower, upper, w_u_out):

        # return upper and lower bounds at each step
        
        coeff = K.clip(K.expand(self._mask, 0), 0, 1)  #(1, n, n_i...)
        lower_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        upper_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        
        y = coeff*lower_e + (1-coeff)*upper_e # (None, n, n_i...)
        y_ = K.reshape(y, [-1]+self.lip_layer.input_shape_wo_batch) # (None*n, n_i...) 

        pred_y_ = self.lip_layer(y_) # (None*n, n_out...) 
        pred_y = K.reshape(pred_y_, [-1, self.n]+self.lip_layer.output_shape_wo_batch) # (None, n, n_out...) 

        n_in = np.prod(self.lip_layer.input_shape_wo_batch)
        x_flat = K.reshape(x, [-1, 1, n_in]) # (None, 1, n_in)
        u_flat =  K.reshape(upper, [-1, 1, n_in]) # (None, 1, n_in)
        u_flat =  K.reshape(upper, [-1, 1, n_in]) #(None, 1, n_in)
        y_flat = K.reshape(y, [-1, self.n, n_in]) #(None, n, n_in)

        # compute L_inf distance
        lower_dist = l_flat - y_flat # (None, n, n_in)
        upper_dist = u_flat - y_flat  # (None, n, n_in)
        lower_abs, upper_abs = get_range_abs(lower_dist, upper_dist) # |x-y|
        # lower_abs (None, n, n_in)

        # compute lower and upper of max |x-y_i] aka infinite norm
        lower_norm = K.max(lower_abs, axis=-1, keepdims=True) # (None, n, 1)
        upper_norm = K.max(upper_abs, axis=-1, keepdims=True) # (None, n, 1)

        upper_input_min = pred_y + self.lip_layer.lipschitz*upper_norm # (None, n, n_out...)
        lower_input_min = pred_y + self.lip_layer.lipschitz*upper_norm # (None, n, n_out...)

        # consider propagation

        # warning, take into inpu
        w_u_min, b_u_min = get_affine_upper_bound_min(lower_input_min, upper_input_min, axis=1, keepdims=False)
        # w_u_min (None, n, n_out...) b_u_min (None, n_out...)

        # w_u_min*z = w_u_min*(y_i+ L*max(abs_ni) )
        # w_u_min*y_i + L*w_u_min*max(abs_ni)
        # update bias with w_u_min*y_i
        b_u_min = b_u_min + K.sum(w_u_min*pred_y, axis=1)
        w_u_min*= self.lip_layer.lipschitz

        n_out = np.prod(self.lip_layer.output_shape_wo_batch)
        w_u_min_ = K.reshape(w_u_min, [-1, self.n, 1, n_out]) # (None, n, 1, n_out) 

        # compute upper and lower of abs_ni
        # warning w_u_min is positive so we just multiply with the pre computed bounds before max
        upper_tmp = w_u_min_* K.expand_dims(upper_abs, -1) #(None, n, n_in, n_out)
        lower_tmp = w_u_min_* K.expand_dims(lower_abs, -1) #(None, n, n_in, n_out)

        #relaxation for max
        w_u_max, b_u_max = get_affine_upper_bound_max(lower_tmp, upper_tmp, axis=-1, keepdims=False)
        # w_u_max (None, n, n_in, n_out) b_u_max (None, n, n_in, n_out)
        # update bias
        b_u = b_u_min + K.reshape(K.sum(b_u_max, axis=1), [1]+self.lip_layer.output_shape_wo_batch) # (None, n_out...)

        # affine upper bound for abs
        w_abs, b_abs = get_convex_upper_affine_bound_unary(lower_abs, upper_abs, func=K.abs, func_prime=K.sign)
        # w_abs (None, n, n_in) b_abs (None, n, n_in)

        # w_u_max(abs(x-y)) + b_u
        # w_u_max(abs(x-y)) + b_u <= w_u_max*(w_abs*(x-y) + b_abs) + b_u
        # w_u_max*w_abs*x + w_u_max*(b_abs - w_abs*y) + b_u

        #w_u = K.sum(w_u_max*w_abs (None, n, n_in, n_out), 1)=  (None, n_in)
        w_u = K.reshape(K.sum(K.expand_dims(w_abs, -1)*w_u_max, 1), 
                        [-1]+ self.lip_layer.input_shape_wo_batch+ self.lip_layer.out_shape_wo_batch)
        # not enough to take into account previous linear relaxations ...
        # w_u (None, n_in.., n_out..)

        # w_u_max*(b_abs - w_abs*y) + b_u
        b_u = b_u + K.sum(w_u_max*(K.expand_dims(b_abs, -1) - w_abs*y_flat), 1)

        return w_u, b_u

        def get_affine_bound_upper(self, lower, upper, w_u_out=None):

            coeff = K.clip(K.expand(self._mask, 0), 0, 1)  #(1, n, n_i...)
            lower_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
            upper_e = K.expand_dims(lower, 1) # (None, 1, n_i...)
        
            y = coeff*lower_e + (1-coeff)*upper_e # (None, n, n_i...) --- OK
            y_ = K.reshape(y, [-1]+self.lip_layer.input_shape_wo_batch) # (None*n, n_i...) 

            pred_y_ = self.lip_layer(y_) # (None*n, n_out...) 
            pred_y = K.reshape(pred_y_, [-1, self.n]+self.lip_layer.output_shape_wo_batch) # (None, n, n_out...)  -- OK

            n_in = np.prod(self.lip_layer.input_shape_wo_batch)
            x_flat = K.reshape(x, [-1, 1, n_in]) # (None, 1, n_in)
            u_flat =  K.reshape(upper, [-1, 1, n_in]) # (None, 1, n_in)
            u_flat =  K.reshape(upper, [-1, 1, n_in]) #(None, 1, n_in)
            y_flat = K.reshape(y, [-1, self.n, n_in]) #(None, n, n_in)


            lower_dist = K.expand_dims(lower, 1) - y
            upper_dist = K.expand_dims(upper, 1) - y
            lower_abs, upper_abs = get_range_abs(lower_dist, upper_dist) # |x-y| ---OK

            n_in = np.prod(self.layer.input_shape_wo_batch)
            lower_abs_flat = K.reshape(lower_abs, [-1, self.n, n_in])
            upper_abs_flat = K.reshape(upper_abs, [-1, self.n, n_in])

            
            # compute lower and upper of max |x-y_i] aka infinite norm
            lower_norm = K.max(lower_abs, axis=-1, keepdims=True) # (None, n, 1)
            upper_norm = K.max(upper_abs, axis=-1, keepdims=True) # (None, n, 1)

            upper_input_min = pred_y + self.lip_layer.lipschitz*upper_norm # (None, n, n_out...)
            lower_input_min = pred_y + self.lip_layer.lipschitz*upper_norm # (None, n, n_out...)

            # if w_h is not None
            if not w_h is None:
                # do something
                ...
                # split w_u in positive and negative weights
                # TO DO
            else:
                w_u, b_u = get_affine_upper_bound_(self, lower_input_min, 
                                                   upper_input_min, 
                                                   lower_abs, 
                                                   upper_abs, pred_y, y)

            # derive bounds until min


            #lower_min, upper_min, lower_abs, upper_abs, pred_y, y


        def get_affine_upper_bound_(self, lower_min, upper_min, lower_abs, upper_abs, pred_y, y):

            # compute lower bound as well !
            
            # provide the shape ...
            # lower_min (None, n, n_out..., n_h..)
            # upper_min (None, n, n_out..., n_h..)
            # lower_abs (None, n, n_in.., n_out..)
            # upper_abs (None, n, n_in.., n_out..)
            # pred_y (None, n, n_out...)
            # y (None, n , n_in..)            

            out_dim = np.prod(self.layer.output_shape_wo_batch)
            in_dim = np.prod(self.layer.input_shape_wo_batch)
            N_out = len(self.layer.output_shape_wo_batch)
            N_in = len(self.layer.output_shape_wo_batch)

            # lower_min (None, n, n_out.., n_h..)
            # compute n_h
            backward_shape_wo_batch = list(lower_min.shape[N_out:])
            back_dim = np.prod(backward_shape_wo_batch)
            N_back = len(backward_shape_wo_batch)
            
            # warning, take into inpu
            w_u_min, b_u_min = get_affine_upper_bound_min(lower_min, upper_min, axis=1, keepdims=False)
            # w_u_min (None, n, n_out..., n_h..) b_u_min (None, n_out..., n_h..)
    
            # w_u_min*z = w_u_min*(p_i+ L*max(abs_ni) )
            # w_u_min*p_i + L*w_u_min*max(abs_ni)
            # update bias with w_u_min*p_i
            pred_y_ = K.reshape(pred_y, [-1, self.n]+self.layer.out_shape_wo_batch+[1]*N_back #(None, n, n_out.., 1..)
            b_u_min = b_u_min + K.sum(w_u_min*pred_y_, axis=1) #(None, n_out..., n_h..)
            w_u_min*= self.lip_layer.lipschitz #(None, n, n_out..., n_h..) 
            # first version with lipschitz is unique for every output
            # even better would be to have a vector to ensure completeness ... <3
    
            w_u_min_ = K.expand_dims(w_u_min, 2) #(None, n, 1, n_out..., n_h..), 
    
            # compute upper and lower of abs_ni
            # warning w_u_min is positive so we just multiply with the pre computed bounds before max
            upper_abs_ = K.reshape(upper_abs, [-1, self.n , in_dim]+[1]*(N_out+N_back)) #(None, n, n_in, 1..., 1..), 
            lower_abs_ = K.reshape(lower_abs, [-1, self.n, in_dim]+[1]*(N_out+N_back)) #(None, n, n_in, 1..., 1..), 
            
            upper_tmp = w_u_min_* upper_abs_ #(None, n, n_in, n_out.., n_h..)
            lower_tmp = w_u_min_* lower_abs #(None, n, n_in, n_out.., n_h..)
    
            #relaxation for max
            w_u_max, b_u_max = get_affine_upper_bound_max(lower_tmp, upper_tmp, axis=2, keepdims=False)
            # w_u_max (None, n, n_in, n_out.., n_h..) b_u_max (None, n, n_in, n_out.., n_h..)
            # update bias
            b_u = b_u_min + K.reshape(K.sum(b_u_max, axis=1), 
                                      [1]+self.layer.output_shape_wo_batch+backward_shape_wo_batch) # (None, n_out.., n_h..)
    
            # affine upper bound for abs
            w_abs, b_abs = get_convex_upper_affine_bound_unary(lower_abs, upper_abs, func=K.abs, func_prime=K.sign)
            # w_abs (None, n, n_in) b_abs (None, n, n_in)
    
            # w_u_max(abs(x-y)) + b_u
            # w_u_max(abs(x-y)) + b_u <= w_u_max*(w_abs*(x-y) + b_abs) + b_u
            # w_u_max*w_abs*x + w_u_max*(b_abs - w_abs*y) + b_u

            w_u_max = K.reshape(w_u_max, [-1, self.n, in_dim, out_dim]+backward_shape_wo_batch)
            w_abs_ = K.reshape(w_abs, [-1, self.n, in_dim, 1]+[1]*N_back)
            b_abs_ = K.reshape(b_abs, [-1, self.n, in_dim, 1]+[1]*N_back)
            b_abs_ = K.reshape(b_abs, [-1, self.n, in_dim, 1]+[1]*N_back)

            # y (None, n, n_in..)
            

            # w_u_max*w_abs_
            w_u_ = w_u_max*w_abs_ # (None, n, in_dim, out_dim, n_h..)
            b_u_ = w_u_max*(b_abs_ - w_abs_*y_) # (None, n, in_dim, out_dim, n_h..)

            if N_back:
                w_u = K.reshape(K.sum(w_u_, (1, 3)), [-1]+ self.layer.input_shape_wo_batch+backward_shape_wo_batch)# (None, n_in.., n_h..)
                b_u = K.sum(K.sum(b_u_, (1, 2)) + K.reshape(b_u, [-1, out_dim]+backward_shape_wo_batch), 1) # (None, n_h...)
            else:
                w_u = K.reshape(K.sum(w_u, 1), 
                                [-1]+ self.layer.input_shape_wo_batch + self.layer.output_shape_wo_batch)# (None, n_in, n_out..)
                b_u = K.reshape(K.sum(b_u_(1, 2)), [-1]+self.layer.output_shape_wo_batch) + b_u

            return w_u, b_u
                                
                

        

        

        

        
        
        
        

    

# how to apply crown 

SyntaxError: invalid syntax (1641914335.py, line 356)

In [None]:
# fast upper bound of gradient with jacobinet
overide BackwardRelu with BackwardLinearRelu
# split into positive and negative weights ...

In [15]:
_ = split_model.predict(np.zeros((1, 128, 28, 28)))

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step


In [None]:
def get_output_nodes(model: Model) -> list[Node]:
    """Get list of output nodes ordered as model.outputs

    Args:
        model:

    Returns:

    """
    nodes_by_operation = {n.operation: n for subnodes in model._nodes_by_depth.values() for n in subnodes}
    return [nodes_by_operation[output._keras_history.operation] for output in model.outputs]

['conv3_block1_1_relu',
 'conv3_block4_2_relu',
 'conv4_block3_out',
 'conv5_block1_1_relu',
 'conv5_block3_out']

In [35]:
model._nodes_by_depth[node_index][0].operation

<Dense name=predictions, built=True>

In [None]:
node_index = model_output._keras_history.node_index
        nodes = model._nodes_by_depth[node_index]
        node = [
            node
            for node in nodes
            if node.operation.output.name == model_output.name
        ][0]
        output_nodes.append(node)

In [None]:
def get_submoddel(layer_in, layer_out):

    # get the node of layer_out
    

In [None]:
def get_backward_node(
    node: None,
    gradient: Tensor,
    mapping_keras2backward_classes: Optional[
        dict[type[Layer], type[BackwardLayer]]
    ] = None,
    input_name=None,
    get_backward: Callable = get_backward_layer,
):
    # step 1: get parents
    parent_nodes: List[None] = node.parent_nodes

    # step 2: get layer from node
    layer_node: Union[Layer, Model, Sequential] = node.operation

    # step 3.1 if layer is an InputLayer stop the algorithm
    if isinstance(layer_node, InputLayer):
        # check we reach the right input
        if input_name is None:
            return gradient, True, True
        else:
            return gradient, True, layer_node.output.name == input_name

    # step 4: get backward layer
    backward_layer_node: BackwardLayer = get_backward(
        layer_node,
        mapping_keras2backward_classes=mapping_keras2backward_classes,
    )

    is_linear = True
    if isinstance(backward_layer_node, BackwardLinearLayer):
        gradients = backward_layer_node(gradient)
    elif isinstance(layer_node, Sequential):
        if backward_layer_node.is_linear:
            gradients = backward_layer_node(gradient)
        else:
            is_linear = False
            layer_node_inputs = to_list(layer_node.input)
            gradients = backward_layer_node(layer_node_inputs + [gradient])
    elif isinstance(layer_node, Model):
        if backward_layer_node.is_linear:
            gradients = backward_layer_node(gradient)
        else:
            is_linear = False
            # warning: in case of nested models the following can happen: node.operation.input != node.parent_nodes[0].operation.output
            # consider working on the parents'outputs to avoid disconnected graph
            layer_node_inputs = []
            for parent_node in parent_nodes:
                layer_node_inputs = to_list(parent_node.operation.output)
            gradients = backward_layer_node(layer_node_inputs + [gradient])
    else:
        layer_node_inputs = to_list(layer_node.input)
        gradients = backward_layer_node([gradient] + layer_node_inputs)
        is_linear = False

    keep_output = True
    if backward_layer_node.n_input != 1:
        results = [
            get_backward_node(
                p_node,
                p_grad,
                mapping_keras2backward_classes,
                input_name,
                get_backward=get_backward,
            )
            for (p_node, p_grad) in zip(parent_nodes, gradients)
        ]

        results = [
            r for r in results if r[-1]
        ]  # keep the parents whom is connected to the right input

        if len(results):
            outputs = [
                r[0] for r in results
            ]  # output tensor is the first output of get_backward_node

            if is_linear:
                is_linear = min(is_linear, min([r[1] for r in results]))
            # chain rule along multiple inputs: expand, concat and sum
            fuse_layer = FuseGradients()
            output = fuse_layer(outputs)
        else:
            output = None
            is_linear = False
            keep_output = False
    else:
        result = get_backward_node(
            parent_nodes[0],
            gradients,
            mapping_keras2backward_classes,
            input_name,
            get_backward=get_backward,
        )
        output = result[0]
        keep_output = result[-1]
        if is_linear:
            is_linear = min(is_linear, result[1])

    return output, is_linear, keep_output

In [29]:
split

['conv3_block1_1_relu',
 'conv3_block4_2_relu',
 'conv4_block3_out',
 'conv5_block1_1_relu',
 'conv5_block3_out']

<Activation name=conv3_block4_2_relu, built=True>

In [22]:
import torch
import torch.nn as nn

# Define a Torch wrapper around the Keras model
class Keras2Torch(nn.Module):
    def __init__(self, keras_model):
        super().__init__()
        self.keras_model = keras_model

    def forward(self, x):
        z = self.keras_model(x)
        return z

# Wrap the Keras backward model in the PyTorch interface
torch_model = Keras2Torch(model)

# Generate random input and gradient for Torch
torch_input = torch.randn(1, 3, 224, 224)  # Batch size 1, 3 features

# Run a forward pass to ensure no errors
_ = torch_model(torch_input) 

# Export the backward model to ONNX format
torch.onnx.export(
    torch_model,                         # Model to export
    torch_input,           # Model inputs (as a tuple)
    "backward_model_torch.onnx",       # File name to save as
    input_names=['input_x'],
    output_names=['output'],
    dynamic_axes={'input_x': {0: 'batch_size'}, 'output': {0: 'batch_size'}}  # Handle batch size changes
)

print("ONNX model successfully exported as 'backward_model_torch.onnx'")

  shape = tuple(map(lambda x: int(x) if x is not None else None, shape))
  if channels % kernel_in_channels > 0:
  if isinstance(axis, int) and x.shape[axis] == 1:
  _C._jit_pass_onnx_node_shape_type_inference(node, params_dict, opset_version)
  _C._jit_pass_onnx_graph_shape_type_inference(
  _C._jit_pass_onnx_graph_shape_type_inference(


ONNX model successfully exported as 'backward_model_torch.onnx'


In [3]:
!wget https://upload.wikimedia.org/wikipedia/commons/f/f9/Zoorashia_elephant.jpg -O elephant.jpg

--2025-02-21 14:48:29--  https://upload.wikimedia.org/wikipedia/commons/f/f9/Zoorashia_elephant.jpg
Resolving upload.wikimedia.org (upload.wikimedia.org)... 2a02:ec80:600:ed1a::2:b, 185.15.58.240
Connecting to upload.wikimedia.org (upload.wikimedia.org)|2a02:ec80:600:ed1a::2:b|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 168370 (164K) [image/jpeg]
Saving to: ‘elephant.jpg’


2025-02-21 14:48:30 (779 KB/s) - ‘elephant.jpg’ saved [168370/168370]



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 195ms/step
Predicted: [('n02504013', 'Indian_elephant', 0.8239707), ('n02504458', 'African_elephant', 0.114624634), ('n01871265', 'tusker', 0.057447013)]


In [5]:
model.summary()

In [6]:
from keras.layers import Activation

In [7]:
len([e for e in model.layers if isinstance(e, Activation)])

49

In [21]:
x.shape

(1, 3, 224, 224)

In [8]:
from decomon.models import clone


In [9]:
decomon_model_ibp = clone(model, method='forward-ibp')

In [11]:
# onnx export

177

In [19]:
box = K.concatenate([x[:, None]-0., x[:,None]+0.], 1)

In [20]:
decomon_model_ibp.predict(box)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 337ms/step


[array([[ 1.38385296e+00, -1.66132593e+00, -6.52233601e-01,
         -1.55607080e+00, -2.01279640e+00, -2.04975128e-01,
         -1.88587189e-01,  6.08097553e-01, -1.19802046e+00,
          2.84951496e+00, -2.69594717e+00, -1.78844738e+00,
         -2.34228992e+00, -1.71915770e+00, -2.11964226e+00,
         -1.36635399e+00, -2.60569191e+00, -1.68987989e+00,
         -1.22668266e+00, -2.80113602e+00, -2.27827644e+00,
         -9.44791317e-01, -8.04038048e-01,  2.97355843e+00,
         -3.01806450e-01, -1.80711508e+00, -8.26531410e-01,
         -2.14105320e+00, -2.56035709e+00, -2.83927536e+00,
         -1.52490139e+00, -2.78349066e+00, -2.42549419e+00,
          1.13248205e+00,  1.24706745e-01,  4.67588425e-01,
          1.45471478e+00,  1.66530609e-02, -1.02338171e+00,
          9.83333588e-01, -1.58517456e+00, -1.98347569e+00,
         -9.58968639e-01, -7.50083447e-01,  4.01549339e-01,
          2.02961683e+00, -1.85689354e+00, -1.32053375e-01,
          6.38710499e+00,  1.29814339e+0