In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import numpy as np
import pickle
import os

In [2]:
with open("data/safe_data_set.pkl", "rb") as f:
    safe_data = pickle.load(f)  # Load the Pickle file

# Convert to PyTorch tensor
safe_data = torch.tensor(safe_data, dtype=torch.float32)
# Use safe data in training
states_safe = safe_data  # Use this in your training loop

with open("data/unsafe_data_set.pkl", "rb") as f2:
    unsafe_data = pickle.load(f2)  # Load the Pickle file

unsafe_data = torch.tensor(unsafe_data, dtype=torch.float32)
states_unsafe = unsafe_data
next_states_safe = states_safe[1:]  # All but the first state
states_safe = states_safe[:-1]  # All but the last state
print(states_safe[3])
print(next_states_safe[3])


tensor([1331.0923,  860.7158,  156.6412,   10.0000])
tensor([1322.6477,  855.3596,  147.6138,   10.0000])


In [3]:
class BarrierFunctionNet(nn.Module):
    def __init__(self, input_dim):
        super(BarrierFunctionNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)
        self.fc2 = nn.Linear(256, 1)
        self.tanh = nn.Tanh()

    def forward(self, x):
        hidden = self.tanh(self.fc1(x))
        return self.fc2(hidden)  # Now matches the passage's full equation

In [4]:
def phi(x):
    """ReLU function to enforce barrier constraints."""
    return torch.maximum(x, torch.tensor(0.0))

def barrier_loss(B_s, B_u, Lf_B_s, gamma, ws=1.0, wu=1.0, wl=1.0):
    """
    Computes the barrier function loss L(θ).

    Args:
        B_s: Barrier function output for safe states (Tensor of shape [Ns]).
        B_u: Barrier function output for unsafe states (Tensor of shape [Nu]).
        Lf_B_s: Lie derivative of B(x) for safe states (Tensor of shape [Ns]).
        gamma: Weight for the Lie derivative constraint.
        ws, wu, wl: Weights for different loss terms.

    Returns:
        Total loss value.
    """
    Ns = B_s.shape[0]  # Number of safe samples
    Nu = B_u.shape[0]  # Number of unsafe samples

    # Compute each term of the loss
    loss_safety = ws * torch.mean(phi(-B_s))   # First term
    loss_usability = wu * torch.mean(phi(B_u)) # Second term
    loss_invariance = wl * torch.mean(phi(-Lf_B_s - gamma * B_s))  # Third term

    # Combine the terms
    total_loss = loss_safety + loss_usability + loss_invariance
    return total_loss

In [5]:
def compute_Lf_B_approx(barrier_net, states, next_states, delta_t):
    """
    Approximates the Lie derivative Lf B(x) using sampled trajectories.

    Args:
        barrier_net: Neural network for the barrier function.
        states: Current states (Tensor of shape [Ns, state_dim]).
        next_states: Next states (Tensor of shape [Ns, state_dim]).
        delta_t: Time difference between consecutive states.

    Returns:
        Approximation of Lf B(x).
    """
    # Compute B(s) and B(s0)
    B_s = barrier_net(states)
    B_next = barrier_net(next_states)

    # Approximation: (B(s0) - B(s)) / delta_t
    Lf_B = (B_next - B_s) / delta_t
    return Lf_B

In [6]:
# Load the saved checkpoint
checkpoint = torch.load('checkpoint2.pth')

# Reinitialize the model and optimizer
barrier_net = BarrierFunctionNet(4)
optimizer = Adam(barrier_net.parameters(), lr=1e-3)

# Load the state_dict for the model and optimizer
barrier_net.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# Get the last epoch and loss if needed
start_epoch = checkpoint['epoch']
last_loss = checkpoint['loss']

# Set the model to evaluation mode or training mode
barrier_net.train()  # or barrier_net.eval() if you want to evaluate

  checkpoint = torch.load('checkpoint2.pth')


BarrierFunctionNet(
  (fc1): Linear(in_features=4, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=1, bias=True)
  (tanh): Tanh()
)

In [7]:
# Hyperparameters
input_dim = 4  # Example input dimension for state
gamma = 1.0    # Weight for invariance term
ws, wu, wl = 1.0, 1.0, 1.0  # Loss weights
delta_t = 0.1  # Time step

# Initialize the barrier function network and optimizer
barrier_net = BarrierFunctionNet(input_dim)
optimizer = Adam(barrier_net.parameters(), lr=1e-3)

# Training data (replace with actual safe/unsafe state data)
Ns, Nu = 20000, 20000  # Number of safe and unsafe samples


# Initialize the barrier function network and optimizer
barrier_net = BarrierFunctionNet(input_dim)
optimizer = Adam(barrier_net.parameters(), lr=1e-3)

# Check if a checkpoint exists and load it
checkpoint_path = 'checkpoint2.pth'
start_epoch = 0  # default start epoch if no checkpoint is found
if os.path.exists(checkpoint_path):
    checkpoint = torch.load(checkpoint_path)
    barrier_net.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch'] + 1  # Continue from the next epoch
    print(f"Resuming training from epoch {start_epoch} with loss: {checkpoint['loss']}")

# Training loop (starting from the right epoch)
for epoch in range(start_epoch, 2000):
    # Forward pass: Compute B(x) for safe and unsafe states
    B_s = barrier_net(states_safe)
    B_u = barrier_net(states_unsafe)

    # Compute Lf B(x) approximation for safe states
    Lf_B_s = compute_Lf_B_approx(barrier_net, states_safe, next_states_safe, delta_t)

    # Compute the loss
    loss = barrier_loss(B_s, B_u, Lf_B_s, gamma, ws, wu, wl)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")

    # Save model and optimizer state every 10 epochs
    if (epoch + 1) % 10 == 0:
        torch.save({
            'epoch': epoch,  # current epoch
            'model_state_dict': barrier_net.state_dict(),  # model parameters
            'optimizer_state_dict': optimizer.state_dict(),  # optimizer parameters
            'loss': loss.item(),  # last loss value
        }, checkpoint_path)


Resuming training from epoch 3000 with loss: 0.0024752230383455753


  checkpoint = torch.load(checkpoint_path)


In [8]:
def verify_safe_states(barrier_net, states_safe):
    barrier_values = barrier_net(states_safe)  # Compute B(x) for all safe states
    if torch.all(barrier_values >= 0):  # Check if B(x) >= 0 for all safe states
        print("Condition 1 satisfied: B(x) >= 0 for all x in X_s")
        return True
    else:
        print("Condition 1 violated: B(x) is negative for some x in X_s")
        return False

In [9]:
def verify_unsafe_states(barrier_net, states_unsafe):
    barrier_values = barrier_net(states_unsafe)  # Compute B(x) for all unsafe states
    if torch.all(barrier_values < 0):  # Check if B(x) < 0 for all unsafe states
        print("Condition 2 satisfied: B(x) < 0 for all x in X_u")
        return True
    else:
        print("Condition 2 violated: B(x) is non-negative for some x in X_u")
        return False

In [26]:
def create_grid_around_boundary(states, threshold=0.1, delta=0.05):
    # Create a grid around the boundary where B(x) = 0
    grid_states = []
    
    # Loop over each state in the list of states
    for state in states:
        # Convert state to a tensor if it's not already
        state_tensor = torch.tensor(state, dtype=torch.float32)
        
        # Compute the barrier function for this state
        barrier_value = barrier_net(state_tensor).mean().item()  # Take the mean of the output vector
        
        # If B(x) is near 0 (within threshold), create a grid around this state
        if np.abs(barrier_value) < threshold:  
            # Create a grid around this state within a range of delta
            for dx in np.linspace(-delta, delta, num=5):
                # Apply dx element-wise to each dimension of the state
                grid_state = state + np.array([dx] * len(state))  # Add dx to each dimension of state
                grid_states.append(grid_state.tolist())  # Ensure grid_state is a list
                
    # Convert grid states to a tensor and return
    return torch.tensor(grid_states, dtype=torch.float32)



In [11]:
def create_grid_around_boundary2(barrier_net, states_safe, dynamics_model, threshold=0.1, delta=0.01):
    """
    Create a grid around boundary states and compute Lie derivative bounds.

    Args:
        barrier_net (torch.nn.Module): Neural network representing the barrier function B(x).
        states_safe (numpy array): Safe states (N x dim).
        dynamics_model (function): Function that computes f(x) given a state.
        threshold (float): Distance to define the boundary.
        delta (float): Perturbation for finite difference estimation.

    Returns:
        grid_states (numpy array): Sampled states around the boundary.
        Lf_B_bounds (list of tuples): List of (lower, upper) bounds for L_f B(x).
    """
    n, dim = states_safe.shape  # Number of states and state dimensions

    # Generate perturbations to create the grid
    perturbations = np.random.uniform(-threshold, threshold, size=(n, dim))
    grid_states = states_safe + perturbations  # Apply small shifts around safe states

    Lf_B_bounds = []  # Store Lie derivative bounds for each grid state

    for state in grid_states:
        # Compute Jacobian of the dynamics using finite differences
        J_f = np.zeros((dim, dim))  # Jacobian initialization
        control_input = np.array([0.0, 0.0])
        f_a = dynamics_model(state,control_input)  # Compute f(a)

        for i in range(dim):
            perturb = np.zeros(dim)
            perturb[i] = delta  # Perturb only the i-th dimension
            f_perturbed = dynamics_model(state + perturb,control_input)  # Compute f(a + δe_i)

            # Finite difference approximation for ∂f/∂x_i
            J_f[:, i] = (f_perturbed - f_a) / delta  

        # Compute ∇B(x)
        state_tensor = torch.tensor(state, dtype=torch.float32, requires_grad=True)
        B_x = barrier_net(state_tensor)  # Compute B(x)
        B_x.backward()  # Compute gradient ∇B
        grad_B = state_tensor.grad.detach().numpy()

        # Compute Lie derivative bounds using interval arithmetic
        Lf_B_lower = np.dot(grad_B, np.min(J_f, axis=1))  # Lower bound
        Lf_B_upper = np.dot(grad_B, np.max(J_f, axis=1))  # Upper bound

        Lf_B_bounds.append((Lf_B_lower, Lf_B_upper))

    return grid_states, Lf_B_bounds

In [12]:
def compute_lie_derivative(barrier_net, state, dynamics_model, delta_t=0.1, control_input=None):
    if control_input is None:
        control_input = np.array([0.0, 0.0])  # Default control input: [acceleration, steering] = [0, 0]
    
    state = np.array(state) 
    state_tensor = torch.tensor(state, dtype=torch.float32, requires_grad=True)
    B_x = barrier_net(state_tensor)
    
    # If B(x) is a vector, take the mean or sum to get a scalar
    B_x = B_x.mean() 
    
    # Compute the gradient of B(x) with respect to the state
    B_x.backward()  # Compute the gradient of B(x)
    
    # Detach state from the computation graph and compute next state
    state_detached = state_tensor.detach()  # Detach from the computation graph
    state_next = state_detached + delta_t * dynamics_model(state_detached.numpy(), control_input)  # Use detached state

    
    # Compute the change in barrier function based on the system dynamics
    Lf_B = B_x.item()  # Get the scalar value of the barrier function
    return Lf_B



In [13]:
def verify_lie_derivative(barrier_net, grid_states, dynamics_model, delta_t=0.1):
    default_control_input = np.array([0.0, 0.0])  # Example: [acceleration, steering] = [0, 0]
    violating_states = []
    for state in grid_states:
        # Ensure state is a numpy array or a list
        Lf_B = compute_lie_derivative(barrier_net, state, dynamics_model, delta_t, control_input=default_control_input)
        
        if Lf_B <= 0:
            # print(f"Condition 3 violated: L_f B(x) <= 0 at state {state}")
            violating_states.append(state)
    print(f"Condition 3 violated: for len of {len(violating_states)}")
    return violating_states

In [14]:
def dynamics_model(state, control_input):
    if isinstance(state, torch.Tensor):
        state_values = state.detach().numpy()  # Convert the tensor to a numpy array
    else:
        state_values = state  # If it's already a numpy array, use it directly
    
    # Unpack the state values
    x, y, theta, v = state_values[0], state_values[1], state_values[2], state_values[3]
    a, delta = control_input  # Extract control inputs (acceleration, steering angle)
    #print(x,y)
    L = 2.5  # Wheelbase length (for example)
    
    # Compute the next state using the dynamics equations
    dx = v * np.cos(theta) # Change in x position
    dy = v * np.sin(theta) # Change in y position
    delta = (v / L) * np.tan(delta) # Change in heading (yaw rate)
    dv = a  # Change in velocity (acceleration)
    #print(f"dx: {dx}, dy: {dy}, dtheta: {dtheta}, dv: {dv}")
    return np.array([dx, dy, delta, dv], dtype=float)  # Explicitly set the dtype to ensure consistent types



In [15]:

# Condition 1: Verify safe states
verify_safe_states(barrier_net, states_safe)

# Condition 2: Verify unsafe states
verify_unsafe_states(barrier_net, states_unsafe)

# Condition 3: Verify Lie derivative for boundary states
grid_states, _ = create_grid_around_boundary2(barrier_net, states_safe, dynamics_model, threshold=0.1, delta=0.05)
violating_states = verify_lie_derivative(barrier_net, grid_states, dynamics_model)


Condition 1 violated: B(x) is negative for some x in X_s
Condition 2 violated: B(x) is non-negative for some x in X_u


  grid_states = states_safe + perturbations  # Apply small shifts around safe states
  f_perturbed = dynamics_model(state + perturb,control_input)  # Compute f(a + δe_i)
  state_tensor = torch.tensor(state, dtype=torch.float32, requires_grad=True)
  state = np.array(state)
  state_next = state_detached + delta_t * dynamics_model(state_detached.numpy(), control_input)  # Use detached state


Condition 3 violated: for len of 5


In [16]:
from sklearn.neighbors import KNeighborsClassifier
def knn_labeling(states, labels, test_states, k=6):
    # Fit kNN classifier on the states with known labels
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(states, labels)
    
    # Predict labels for the test states
    predicted_labels = knn.predict(test_states)
    
    return predicted_labels

In [17]:

# Prepare training data (safe and unsafe states)
training_states = np.vstack([states_safe, states_unsafe])  # Stack them together
training_labels = np.array([1] * len(states_safe) + [0] * len(states_unsafe))  # Safe = 1, Unsafe = 0

# Convert violating states into a NumPy array
violating_states_np = np.array([state.detach().numpy() for state in violating_states])

# Use kNN to predict the labels for the violating states
violating_labels = knn_labeling(training_states, training_labels, violating_states_np)

# Print out the labels for violating states
for i, state in enumerate(violating_states):
    print(f"State {state}: Label {violating_labels[i]}")

State tensor([ 397.6548,  852.4391, -122.5868,   10.0257], dtype=torch.float64): Label 1
State tensor([ 393.6957,  861.6785, -112.6384,   10.0402], dtype=torch.float64): Label 1
State tensor([ 401.0556,  855.4220, -116.1718,    9.9340], dtype=torch.float64): Label 1
State tensor([ 398.4114,  865.0586, -106.2958,   10.0813], dtype=torch.float64): Label 1
State tensor([397.2530, 875.1494, -96.3005,   9.9189], dtype=torch.float64): Label 1


In [27]:
cond1 = verify_safe_states(barrier_net, states_safe)
cond2 =  verify_unsafe_states(barrier_net, states_unsafe)


for i in range(10):
    count = 0
    if os.path.exists(checkpoint_path):
        checkpoint = torch.load(checkpoint_path)
        barrier_net.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_epoch = checkpoint['epoch'] + 1  # Continue from the next epoch
        print(f"Resuming training from epoch {start_epoch} with loss: {checkpoint['loss']}")
    for epoch in range(start_epoch, start_epoch+1000):
        # Forward pass: Compute B(x) for safe and unsafe states
        B_s = barrier_net(states_safe)
        B_u = barrier_net(states_unsafe)

        # Compute Lf B(x) approximation for safe states
        Lf_B_s = compute_Lf_B_approx(barrier_net, states_safe, next_states_safe, delta_t)

        # Compute the loss
        loss = barrier_loss(B_s, B_u, Lf_B_s, gamma, ws, wu, wl)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")

        # Save model and optimizer state every 10 epochs
        if (epoch + 1) % 10 == 0:
            torch.save({
                'epoch': epoch,  # current epoch
                'model_state_dict': barrier_net.state_dict(),  # model parameters
                'optimizer_state_dict': optimizer.state_dict(),  # optimizer parameters
                'loss': loss.item(),  # last loss value
            }, checkpoint_path)

    # Condition 1: Verify safe states
    cond1 = verify_safe_states(barrier_net, states_safe)

    # Condition 2: Verify unsafe states
    cond2 = verify_unsafe_states(barrier_net, states_unsafe)

    # Condition 3: Verify Lie derivative for boundary states
    grid_states = create_grid_around_boundary(states_safe, threshold=0.1, delta=0.05)
    violating_states = verify_lie_derivative(barrier_net, grid_states, dynamics_model)

    training_states = np.vstack([states_safe, states_unsafe])  # Stack them together
    training_labels = np.array([1] * len(states_safe) + [0] * len(states_unsafe))  # Safe = 1, Unsafe = 0

    # Convert violating states into a NumPy array
    violating_states_np = np.array([state.detach().numpy() for state in violating_states])


    # Use kNN to predict the labels for the violating states
    if violating_states_np.size > 0:
        violating_labels = knn_labeling(training_states, training_labels, violating_states_np)
    else:
        print("no violating states")
    
    if cond1:
        count+=1
    if cond2:
        count+=1
    if violating_states_np.size == 0:
        count+=1
    if count > 1:
        print("goood enough")
        break

    # Print out the labels for violating states
    # for i, state in enumerate(violating_states):
    #     print(f"State {state}: Label {violating_labels[i]}")


Condition 1 violated: B(x) is negative for some x in X_s
Condition 2 violated: B(x) is non-negative for some x in X_u
Resuming training from epoch 3020 with loss: 0.0014649800723418593


  checkpoint = torch.load(checkpoint_path)


Epoch 3030, Loss: 0.0013
Epoch 3040, Loss: 0.0010
Epoch 3050, Loss: 0.0010
Epoch 3060, Loss: 0.0008
Epoch 3070, Loss: 0.0042
Epoch 3080, Loss: 0.0188
Epoch 3090, Loss: 0.0091
Epoch 3100, Loss: 0.0060
Epoch 3110, Loss: 0.0031
Epoch 3120, Loss: 0.0037
Epoch 3130, Loss: 0.0055
Epoch 3140, Loss: 0.0057
Epoch 3150, Loss: 0.0021
Epoch 3160, Loss: 0.0019
Epoch 3170, Loss: 0.0015
Epoch 3180, Loss: 0.0012
Epoch 3190, Loss: 0.0010
Epoch 3200, Loss: 0.0143
Epoch 3210, Loss: 0.0124
Epoch 3220, Loss: 0.0092
Epoch 3230, Loss: 0.0053
Epoch 3240, Loss: 0.0025
Epoch 3250, Loss: 0.0019
Epoch 3260, Loss: 0.0015
Epoch 3270, Loss: 0.0012
Epoch 3280, Loss: 0.0013
Epoch 3290, Loss: 0.0016
Epoch 3300, Loss: 0.0039
Epoch 3310, Loss: 0.0037
Epoch 3320, Loss: 0.0029
Epoch 3330, Loss: 0.0017
Epoch 3340, Loss: 0.0014
Epoch 3350, Loss: 0.0011
Epoch 3360, Loss: 0.0010
Epoch 3370, Loss: 0.0009
Epoch 3380, Loss: 0.0009
Epoch 3390, Loss: 0.0294
Epoch 3400, Loss: 0.0149
Epoch 3410, Loss: 0.0084
Epoch 3420, Loss: 0.0059


  state_tensor = torch.tensor(state, dtype=torch.float32)
  grid_state = state + np.array([dx] * len(state))  # Add dx to each dimension of state
  state = np.array(state)
  state_next = state_detached + delta_t * dynamics_model(state_detached.numpy(), control_input)  # Use detached state


Condition 3 violated: for len of 0
no violating states
goood enough
