In [4]:
import casadi as cs
import torch
import l4casadi as l4c
import numpy as np


class Cs_Torch(torch.nn.Module):

    def __init__(self, input_size: int) -> None:
        super().__init__()
        self.input_size = input_size

    def check_cs_torch_consistancy(self, N_iter=2):
        cs_input_params = [cs.MX.sym('x', 2, 1)]
        for param in self.parameters():
            cs_input_params.append(cs.MX.sym('x', param.flatten().shape[0], 1))
        cs_input = cs.vcat(cs_input_params)
        self.set_model_type("casadi")
        cs_model = l4c.L4CasADi(self, has_batch=False, device='cpu')
        cs_model = cs.Function('f_cs', [cs_input], [cs_model(cs_input)])
        for _ in range(N_iter):
            random_inp = np.random.random(cs_input.shape[0])
            print(cs_model(cs.DM(random_inp)))
            ### TO DO complete comparison of both methods

    def linear(self, in_features, out_features, bias: bool = True):
        torch_layer = torch.nn.Linear(in_features, out_features, bias)
        cs_layer = self.cs_linear(in_features, out_features, bias)

        def linear_wrapper(input, weights=None, *args, **kwargs):
            if (self.type == "torch"):
                return torch_layer(input, args, kwargs)
            elif (self.type == "casadi"):
                return cs_layer(input, weights)

        return linear_wrapper

    def cs_linear(self, in_features, out_features, bias):

        def wrapper(input, weights):
            A = weights.pop()
            if bias:
                return input @ A.T + weights.pop()
            else:
                return input @ A.T

        return wrapper

    def pack_up_input(self, input):
        x = input[:self.input_size]
        weights = input[self.input_size:]

        weights_package = []
        pointer = 0
        for param in self.parameters():
            rows, cols = param.shape()
            w = weights[pointer:pointer + rows * cols].reshape(rows, cols)
            weights_package.append(w)
            pointer = pointer + rows * cols
        weights_package.reverse()
        return x, weights_package


class MultiLayerPerceptron(torch.nn.Module):

    def __init__(self, input_size):
        super().__init__(input_size)
        self.input_layer = torch.nn.Linear(2, 4, bias=False)
        self.out_layer = torch.nn.Linear(4, 1, bias=False)
        self.check_cs_torch_consistancy()

    def forward(self, inp):
        if (self.type == "torch"):
            x = self.input_layer(inp)
            x = self.out_layer(x)
            return x
        elif (self.type == "casadi"):
            x = inp[:2]
            weight0 = inp[2:2 + 2 * 4].reshape(2, 4)
            weight1 = inp[10:14].reshape(4, 1)
            x = x.T @ weight0
            x = x @ weight1
            return x
        else:
            Exception(
                "Please specify the model return type with set_model_type method"
            )

    def set_model_type(self, type):
        self.type = type


pyTorch_model = MultiLayerPerceptron(2)
pyTorch_model.set_model_type("casadi")
l4c_model = l4c.L4CasADi(pyTorch_model, has_batch=False,
                         device='cpu')  # device='cuda' for GPU

x_sym = cs.MX.sym('x', 2, 1)
w_sym = cs.MX.sym('x', 2, 1)

inp_sym = cs.vcat([x_sym, w_sym])

# f_sym = l4c_model(inp_sym)
# f = cs.Function('y', [inp_sym], [f_sym])
# df = cs.Function('dy', [inp_sym], [cs.jacobian(f_sym, x_sym)])
# ddf = cs.Function('ddy', [inp_sym], [cs.hessian(f_sym, x_sym)[0]])

# inp = cs.DM([[0.], [2.], [2.], [4.]])
# print("l4c_model(inp)", l4c_model(inp))
# print("f(inp)", f(inp))
# print("df(inp)", df(inp))
# print("ddf(inp)", ddf(inp))


0.253967
0.753405


In [20]:
a = [0, 1, 2, 3]
a.reverse()
print(a)


[3, 2, 1, 0]


In [None]:
from torch import nn
import math
from copy import deepcopy


In [None]:
import numpy as np
import pytest
import torch
from torch import nn
import casadi as cs
import l4casadi as l4c


class ModelNN(nn.Module):
    """
    Class of pytorch neural network models. This class is not to be used barebones.
    Instead, you should inherit from it and specify your concrete architecture.

    """

    model_name = "NN"

    def __call__(self, *args, weights=None, use_stored_weights=False):
        if use_stored_weights is False:
            if weights is not None:
                return self.forward(*args, weights=weights)
            else:
                return self.forward(*args)
        else:
            return self.cache.forward(*args)

    @property
    def cache(self):
        """
        Isolate parameters of cached model from the current model
        """
        return self.cached_model[0]

    def detach_weights(self):
        """
        Excludes the model's weights from the pytorch computation graph.
        This is needed to exclude the weights from the decision variables in optimization problems.
        An example is temporal-difference optimization, where the old critic is to be treated as a frozen model.

        """
        for variable in self.parameters():
            variable.detach_()

    def cache_weights(self, whatever=None):
        """
        Assign the active model weights to the cached model followed by a detach.

        This method also backs up itself and performs this operation only once upon the initialization procedure
        """
        if "cached_model" not in self.__dict__.keys():
            self.cached_model = (
                deepcopy(self),
            )  # this is needed to prevent cached_model's parameters to be parsed by model init hooks

        self.cache.load_state_dict(self.weights)
        self.cache.detach_weights()

    @property
    def weights(self):
        return self.state_dict()

    def update_weights(self, whatever=None):
        pass

    def weights2dict(self, weights_to_parse):
        """
        Transform weights as a numpy array into a dictionary compatible with pytorch.

        """

        weights_to_parse = torch.tensor(weights_to_parse)

        new_state_dict = {}

        length_old = 0

        for param_tensor in self.state_dict():
            weights_size = self.state_dict()[param_tensor].size()
            weights_length = math.prod(self.state_dict()[param_tensor].size())
            new_state_dict[param_tensor] = torch.reshape(
                weights_to_parse[length_old:length_old + weights_length],
                tuple(weights_size),
            )
            length_old = weights_length

        return new_state_dict

    def update_and_cache_weights(self, weights=None):
        if weights is not None:
            for item in weights:
                weights[item].requires_grad_()
            weights = self.load_state_dict(weights)
        # self.load_state_dict(self.weights2dict(weights))
        self.cache_weights()

    def restore_weights(self):
        """
        Assign the weights of the cached model to the active model.
        This may be needed when pytorch optimizer resulted in unsatisfactory weights, for instance.

        """

        self.update_and_cache_weights(self.cache.state_dict())

    def soft_update(self, tau):
        """Soft update model parameters.
        θ_target = τ*θ_local + (1 - τ)*θ_target

        Params
        ======
            local_model (Torch model): weights will be copied from
            target_model (Torch model): weights will be copied to
            tau (float): interpolation parameter

        """
        for target_param, local_param in zip(self.cache.parameters(),
                                             self.parameters()):
            target_param.data.copy_(tau * local_param.data +
                                    (1.0 - tau) * target_param.data)

    def __call__(self, *argin, weights=None, use_stored_weights=False):
        if len(argin) > 1:
            argin = cs.concatenate(argin)
        else:
            argin = argin[0]

        argin = argin if isinstance(argin,
                                    torch.Tensor) else torch.tensor(argin)

        if use_stored_weights is False:
            if weights is not None:
                result = self.forward(argin, weights)
            else:
                result = self.forward(argin)
        else:
            result = self.cache.forward(argin)

        return result


class ModelPerceptron(nn.Module):

    def __init__(
        self,
        dim_input: int,
        dim_output: int,
        dim_hidden: int,
        n_hidden_layers: int,
        leaky_relu_coef: float = 0.15,
        is_force_infinitesimal: bool = False,
        is_bias: bool = True,
        weights=None,
    ):
        super().__init__()
        self.dim_input = dim_input
        self.dim_output = dim_output
        self.n_hidden_layers = n_hidden_layers
        self.leaky_relu_coef = leaky_relu_coef
        self.is_force_infinitesimal = is_force_infinitesimal
        self.is_bias = is_bias

        self.input_layer = nn.Linear(dim_input, dim_hidden,
                                     bias=is_bias).float()
        self.hidden_layers = nn.ModuleList([
            nn.Linear(dim_hidden, dim_hidden, bias=is_bias).float()
            for _ in range(n_hidden_layers)
        ])
        self.output_layer = nn.Linear(dim_hidden, dim_output,
                                      bias=is_bias).float()

        if weights is not None:
            self.load_state_dict(weights)

        # self.cache_weights()

    def _forward(self, x):
        x = nn.functional.leaky_relu(self.input_layer(x),
                                     negative_slope=self.leaky_relu_coef)
        for layer in self.hidden_layers:
            x = nn.functional.leaky_relu(layer(x),
                                         negative_slope=self.leaky_relu_coef)
        x = self.output_layer(x)
        return x

    def forward(self, input_tensor, weights=None):
        # if weights is not None:
        #     self.update(weights)
        return self._forward(input_tensor) - self._forward(
            torch.zeros_like(input_tensor))

In [None]:
pyTorch_model = ModelPerceptron(2, 1, 16, 2)
l4c_model = l4c.L4CasADi(pyTorch_model, has_batch=True,
                         device='cpu')  # device='cuda' for GPU

x_sym = cs.MX.sym('x', 2)
y_sym = l4c_model(x_sym)
f = cs.Function('y', [x_sym], [y_sym])
df = cs.Function('dy', [x_sym], [cs.jacobian(y_sym, x_sym)])
ddf = cs.Function('ddy', [x_sym], [cs.hessian(y_sym, x_sym)[0]])

x = cs.DM([[0.], [2.]])
print(l4c_model(x))
print(f(x))
print(df(x))
print(ddf(x))

-0.00713383
-0.00713383
[[-0.0304231, -0.006513]]

[[0, 0], 
 [0, 0]]


In [None]:
pyTorch_model(torch.FloatTensor(np.random.random(2)))

tensor(-0.0197)

In [None]:
import numpy as np
import pytest
import torch
import casadi as cs
import l4casadi as l4c


class DeepModel(torch.nn.Module):

    def __init__(self, dim_in, dim_out):
        super().__init__()

        self.input_layer = torch.nn.Linear(dim_in, 512)

        hidden_layers = []
        for i in range(20):
            hidden_layers.append(torch.nn.Linear(512, 512))

        self.ln = torch.nn.LayerNorm(512)

        self.hidden_layer = torch.nn.ModuleList(hidden_layers)
        self.out_layer = torch.nn.Linear(512, dim_out)

    def forward(self, x):
        x = self.input_layer(x)
        for layer in self.hidden_layer:
            x = torch.tanh(layer(x))
        x = self.ln(x)
        x = self.out_layer(x)
        return x


class TrigModel(torch.nn.Module):

    def forward(self, x):
        return torch.stack([torch.sin(x[:1]), torch.cos(x[1:2])], dim=-1)


class TestL4CasADi:

    @pytest.fixture(params=[(1, 3), (2, 3), (3, 1)])
    def deep_model(self, request):
        in_dim, out_dim = request.param
        return DeepModel(in_dim, out_dim)

    @pytest.fixture
    def triag_model(self):
        return TrigModel()

    def test_l4casadi_deep_model(self, deep_model):
        rand_inp = torch.rand((1, deep_model.input_layer.in_features))
        torch_out = deep_model(rand_inp)

        l4c_out = l4c.L4CasADi(deep_model, has_batch=True)(rand_inp.transpose(
            -2, -1).detach().numpy())

        np.allclose(l4c_out, torch_out.transpose(-2, -1).detach().numpy())

    def test_l4casadi_triag_model(self, triag_model):
        rand_inp = torch.rand((12, 12))
        torch_out = triag_model(rand_inp)

        l4c_out = l4c.L4CasADi(triag_model)(rand_inp.detach().numpy())

        np.allclose(l4c_out, torch_out.detach().numpy())

    def test_l4casadi_deep_model_jac(self, deep_model):
        rand_inp = torch.rand((1, deep_model.input_layer.in_features))
        torch_out = torch.func.vmap(torch.func.jacrev(deep_model))(rand_inp)[0]

        mx_inp = cs.MX.sym('x', deep_model.input_layer.in_features, 1)

        jac_fun = cs.Function('f_jac', [mx_inp], [
            cs.jacobian(
                l4c.L4CasADi(deep_model, has_batch=True)(mx_inp), mx_inp)
        ])

        l4c_out = jac_fun(rand_inp.transpose(-2, -1).detach().numpy())

        np.allclose(l4c_out, torch_out.detach().numpy())

In [None]:
deep_model = DeepModel(5, 40)
mx_inp = cs.MX.sym('x', deep_model.input_layer.in_features, 1)
l4c.L4CasADi(deep_model, has_batch=True, name='f1')(mx_inp)

deep_model = DeepModel(10, 40)
mx_inp = cs.MX.sym('x', deep_model.input_layer.in_features, 1)
a = l4c.L4CasADi(deep_model, has_batch=True, name='f2')(mx_inp)


