## Train surrogate model

With the previous data, train a surrogate model (gaussian process or NN)

## Model training

We use data from a file or a function generation to train a perception surrogate $g(\Delta x)$.
In this case $\Delta x$ is defined as the relative position (distance) between a tree and the drone: $\Delta x = \| x - t_i \|$

In [2]:

import numpy as np
from tools import *
import plotly.graph_objects as go



In [None]:

x_data, y_label =  generate_fake_dataset(250, False, n_input=3)


In [3]:

fig = go.Figure()
fig.add_trace(go.Scatter3d(z=y_label.flatten(), x=x_data[:,0], y=x_data[:,1], name='Ground Truth',marker=dict(
        color=y_label.flatten(),  # Color based on z values
        colorscale='Viridis',     # Use Viridis color map
        colorbar=dict(title='Z Value'),  # Add color bar
        size=5,                   # Marker size
        opacity=0.8               # Marker opacity
    ),))

fig.update_layout(
    title="Ground Truth vs Predicted (2D)",
    scene=dict(
        xaxis_title="x1", 
        yaxis_title="x2", 
        zaxis_title="Output"
    ),
)
fig.show()

In [5]:
# TODO: Define a simple NN
import torch

# Define the Radial Basis Function Layer
class RBFLayer(torch.nn.Module):
    def __init__(self, input_dim, num_centers):
        """
        Radial Basis Function Layer.
        Parameters:
            input_dim (int): Dimension of the input.
            num_centers (int): Number of RBF centers.
        """
        super(RBFLayer, self).__init__()
        self.centers = torch.nn.Parameter(torch.randn(num_centers, input_dim))
        self.log_sigmas = torch.nn.Parameter(torch.zeros(num_centers))  # Log scale for stability

    def forward(self, x):
        """
        Computes the RBF activation manually.
        """
        # Calculate squared Euclidean distances manually
        x = x.unsqueeze(1)  # Add a dimension to allow broadcasting
        centers = self.centers.unsqueeze(0)  # Add a dimension for broadcasting
        distances = torch.sum((x - centers) ** 2, dim=-1)  # Squared Euclidean distance
        
        sigmas = torch.exp(self.log_sigmas)  # Convert log_sigma to sigma
        # Apply Gaussian kernel
        return torch.exp(-distances / (2 * sigmas ** 2))


class RBFNN3d(torch.nn.Module):
    def __init__(self, input_dim, num_centers, output_dim=1):
        """
        Radial Basis Function Neural Network.
        Parameters:
            input_dim (int): Dimension of the input (2 or 3).
            num_centers (int): Number of RBF centers.
            output_dim (int): Dimension of the output.
        """
        super(RBFNN3d, self).__init__()
        self.input_dim = input_dim
        self.rbf_layer = RBFLayer(input_dim if input_dim < 3 else input_dim - 1, num_centers)
        self.linear_layer = torch.nn.Linear(num_centers + (2 if input_dim == 3 else 0), 64)
        self.linear_layer1 = torch.nn.Linear(64, output_dim)

    def forward(self, x):
        if x.shape[-1] == 3:  # Check if the input has 3 dimensions
            # Replace the angle with its sin and cos values
            sin_cos = torch.cat([torch.sin(x[..., -1:]), torch.cos(x[..., -1:])], dim=-1)
            rbf_output = torch.cat([self.rbf_layer(x[..., :-1]), sin_cos], dim=-1)
        else:
            rbf_output = self.rbf_layer(x)
        
        x = self.linear_layer(rbf_output)
        x = torch.sigmoid(x)  # Apply Sigmoid activation here
        return self.linear_layer1(x)


# TODO: Define the GP

In [6]:

batch_size = 1
num_centers = 10
lr = 1e-4
epochs = 20
save_path = 'models/rbfnn_model_3d_jupyter.pth'


In [7]:

import torch
from torch.utils.data import DataLoader, TensorDataset

# Train
dataset = TensorDataset(x_data, y_label)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

model = RBFNN3d(input_dim=3, num_centers=num_centers)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
print( 'using cuda: ',torch.cuda.is_available())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(epochs):
    epoch_loss = 0
    model.train()
    for x_batch, y_batch in data_loader:
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        y_pred = model(x_batch)
        loss = criterion(y_pred, y_batch)
        epoch_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss / len(data_loader):.6f}')
torch.save(model.state_dict(), save_path)
print(f"Model saved to {save_path}")




using cuda:  True
Epoch [1/20], Loss: 0.001310
Epoch [2/20], Loss: 0.000727
Epoch [3/20], Loss: 0.000681
Epoch [4/20], Loss: 0.000651
Epoch [5/20], Loss: 0.000630
Epoch [6/20], Loss: 0.000597
Epoch [7/20], Loss: 0.000557
Epoch [8/20], Loss: 0.000516
Epoch [9/20], Loss: 0.000470
Epoch [10/20], Loss: 0.000422
Epoch [11/20], Loss: 0.000393
Epoch [12/20], Loss: 0.000371
Epoch [13/20], Loss: 0.000349
Epoch [14/20], Loss: 0.000330
Epoch [15/20], Loss: 0.000309
Epoch [16/20], Loss: 0.000289
Epoch [17/20], Loss: 0.000272
Epoch [18/20], Loss: 0.000259
Epoch [19/20], Loss: 0.000248
Epoch [20/20], Loss: 0.000239
Model saved to models/rbfnn_model_3d_jupyter.pth


In [9]:
# PLOT NN INFERENCE
model = RBFNN3d(input_dim=3, num_centers=num_centers)
model.load_state_dict(torch.load(save_path))
model.eval()


grid, ground_truth = generate_fake_dataset(100, False, n_input=3)
Z_truth = ground_truth.numpy().reshape(100, 100)
y_test = model(grid).detach().numpy().reshape(100, 100)
fig = go.Figure()
print(grid.shape)
#fig.add_trace(go.Scatter3d(z=Z_truth.flatten(), x=grid[:,0], y=grid[:,1], name='Ground Truth'))
fig.add_trace(go.Scatter3d(z=y_test.flatten(), x=grid[:,0], y=grid[:,1], name='Predicted'))

fig.update_layout(
    title="Ground Truth vs Predicted (2D)",
    scene=dict(
        xaxis_title="x1", 
        yaxis_title="x2", 
        zaxis_title="Output"
    ),
)
fig.show()

torch.Size([10000, 3])


## Define the working environment

The problem definition is as follows:

$x$ state of the robot defined by the position of the drone and its velocity [x, y ,z, vx, vy, vz]

$f(x, u)$ state transition function for the drone

$u$ acceleration commands to the drone [ax, ay, az]

$t \in T$ tree positions for the $T$ trees [[tx_1, ty_2], ... , [tx_T, ty_T]]

$\lambda$ belief for the trees maturity confidence [\lambda_1, ..., \lambda_T] (values from 0 to 1). For practicity, it can be seen as part of $x$

$z$ observation vector for the tree maturity confidence [z_1, ..., z_T] (values from 0 to 1)

$g(\Delta x)$ observation surrogate. It is applied to every tree.

$b(\lambda, z)$ bayesian update to the previous belief.

$H(\lambda)$ entropy function for the belief defined (for the case of binary distribution) as: $-\lambda \log{\lambda} - (1-\lambda) \log(1-\lambda)$.

$J(\lambda)$ the cost function of the MPC defined as: $\sum_{1, ..., n} \delta_1 H(\lambda_i) * \delta_2 \Delta x_i^2 + \delta_3 \|u\|$. They correspond to trying to reduce the entropy for each of the trees using $\Delta x_i^2$ to guide the planner when there is no observation, and reduce the control inputs.

The steps of the system are as follows:
1. Load learned $g()$ which works for one tree.
2. Initialize $x$ in a $x_0$ position, $\lambda$ with $0.5$ values for each tree, and $t$ as known.
3. Run the NMPC from $x$ for $N$ iterations. In each step:
  - Compute $\Delta x$ for each tree with the new drone $x$.
  - Get estimation from NN for each tree: $z = g(\Delta x)$.
  - Fuse the estimation in $\lambda$ for each tree: $\lambda_{k} = b(\lambda_{k-1}, z)$
4. Apply the solution from the MPC.
5. Get a real observation.
6. Integrate the real observation into $\lambda$.
7. Go back to step 3.

In [None]:
import numpy as np
import casadi as ca
import l4casadi as l4c
import time

# Constants
delta1 = 1.0  # Weight for entropy
delta2 = 1.0  # Weight for delta x
delta3 = 1.0  # Weight for control input

def kin_model(T=1.0, N=20):
    nx = 2

    # Construct a CasADi function for the ODE right-hand side
    x = MX.sym('x', nx)  # states: pos_x [m], pos_y [m], vel_x [m/s], vel_y [m/s]
    u = MX.sym('u', 2)  # control force [N]
    rhs = vertcat(u)

    # Continuous system dynamics as a CasADi Function
    f = Function('f', [x, u], [rhs])
    xf = x + dt *  u

    F_ = Function('F', [x, u], [xf])

    return F_

# Bayesian update function
def bayesian_update(lambda_prev, z):
    return (lambda_prev * z) / (lambda_prev * z + (1 - lambda_prev) * (1 - z))

# Entropy function
def entropy(lambda_val):
    return -lambda_val * ca.log(lambda_val) - (1 - lambda_val) * ca.log(1 - lambda_val)

# MPC optimization using CasADi
def mpc_opt(gaussians_func, centers, lb, ub, x0, y0, vx0, vy0, weights, steps=10):
    opti = Opti()
    
    F_ = kin_model(T=1.0, N=steps)
    bayes_f = bayes_func(len(centers))
    entropy_f = entropy_func(len(centers))


    X0 = opti.parameter(2 + len(centers))
    
    X = opti.variable(2, steps + 1)  # State trajectory
    U = opti.variable(2, steps)  # Control inputs

    # Initial constraints
    opti.subject_to( X[:, 0] ==  X0[:2])

    # Kinematic constraints
    for i in range(steps):
        opti.subject_to((lb[0]-5 <= X[0, i + 1]) <= ub[0]+5)
        opti.subject_to((lb[1]-5 <= X[1, i + 1]) <= ub[1]+5)
        opti.subject_to((-5 <= U[0, i]) <= 5)
        opti.subject_to((-5 <= U[1, i]) <= 5)
        opti.subject_to(X[:, i + 1] == F_(X[:, i], U[:, i]))  # State update using kinematic model


    # Objective function: Minimize the entropy
    obj = 0
    current_weights = X0[2:] #initialize the weights
    w0 =  X0[2:]

    for i in range(steps):
        z_k = gaussians_func(X[0, i+1], X[1, i+1], centers)+ 0.5
        current_weights = bayes_f(current_weights, z_k)
        obj += entropy_f(current_weights)

    opti.minimize(obj)
    options = {"ipopt": {"hessian_approximation": "limited-memory", "print_level":0, "sb": "no", "mu_strategy": "monotone", "tol":1e-3, "max_iter":200}} #reduced print level
    opti.solver('ipopt', options)


    # Initial step
    opti.set_value(X0, vertcat([x0],[y0], weights))

    start_time = time.time()
    sol = opti.solve()
    duration = time.time() - start_time

    inputs = [X0, opti.x,opti.lam_g]
    outputs = [U[:,0], X, opti.x,opti.lam_g]
    mpc_step = opti.to_function('mpc_step',inputs,outputs)

    return mpc_step, sol.value(U[:,0]), sol.value(X), sol.value(opti.x), sol.value(opti.lam_g)

# Main loop
def main():
    # Step 1: Load learned g(.) which works for one tree.
    gaussians_func = create_l4c_nn_f(len(trees), dev='cuda')

    # Step 2: Initialize x in a x_0 position, lambda$ with 0.5 values for each tree, and t as known.
    trees = generate_tree_positions ([10,10],8)
    lb, ub = get_domain(trees)
    lambda_vals = DM.ones(len(trees)) * 0.5
    steps = N


    bayes_f = bayes_func(len(trees))
    entropy_f = entropy_func(len(trees))

    # Initialize robot state
    x0 = 0.0
    y0 = 0.0
    vx0 = 0.0
    vy0 = 0.0

    # Log
    all_trajectories = []
    weights_history = []
    entropy_history = []
    durations = []

    # Step 2.5: Initialize mpc
    mpc_step, u, x_, x, lam = mpc_opt(gaussians_func, trees,lb, ub, x0, y0, vx0, vy0, lambda_vals,steps)

    # Number of iterations
    N = 10

    for iteration in range(10000):
        # Logging phase
        weights_history.append(lambda_vals.full().flatten().tolist())
        entropy_history.append(entropy_f(lambda_vals).full().flatten().tolist()[0])
        all_trajectories.append(( x_[0,:],  x_[1,:]))

        # Step 4: Apply command to the drone (update its pose)
        x0 =  x_[0,:][1]
        y0 =  x_[1,:][1]
        vx0 = u[ 0 ]
        vy0 = u[ 1 ]

        # Step 5: Get a real observation (simulated)
        z_k = np.round(gaussians_func(x0, y0, trees).full().flatten() + 0.5, 2)

        # Step 6: Integrate observation into lambda
        lambda_vals = bayes_f(lambda_vals, DM(z_k))        

        # Step 3: Run MPC
        u, x_, x, lam = mpc_step(vertcat([x0], [y0], lambda_vals), x, lam)
        u   = u.full()
        x_  = x_.full()

        # Print results
        print(f"Iteration {iteration}: x={x0:.2f}, y={y0:.2f}, Weights sum = {sum1(lambda_vals).full().flatten()}") #log the position and weight sum

# Run the main loop
if __name__ == "__main__":
    main()