In [None]:
# Install required packages
%pip install cupy-cuda11x  # Adjust version based on your CUDA installation
import cupy as cp
import numpy as np
from scipy.linalg import expm, solve_discrete_lyapunov
import itertools
from polytope import Polytope, HyperRectangle

In [None]:
%pip install scipy numpy cvxopt polytope matplotlib
import numpy as np
from scipy.linalg import expm, solve_discrete_lyapunov
import itertools
from polytope import Polytope, HyperRectangle

Note: you may need to restart the kernel to use updated packages.


ImportError: cannot import name 'HyperRectangle' from 'polytope' (c:\Users\sunan\AppData\Local\Programs\Python\Python311\Lib\site-packages\polytope\__init__.py)

In [2]:

class ClosedLoopSystem:
    """
    Represents the closed-loop control system as described in Section II of the paper.
    It encapsulates system matrices and derives the augmented state matrices for
    successful (A1) and missed (A0) control updates.
    """
    def __init__(self, A_c, B_c, C, K, L, h):
        """
        Args:
            A_c, B_c (np.array): Continuous-time system matrices.
            C (np.array): Output matrix.
            K (np.array): LQR feedback gain.
            L (np.array): Kalman filter gain.
            h (float): Sampling period.
        """
        self.n = A_c.shape[0]
        
        # Discretize the system matrices
        self.A = expm(A_c * h)
        self.B = np.linalg.inv(A_c) @ (self.A - np.eye(self.n)) @ B_c
        self.C = C
        self.K = K
        self.L = L

        # Augmented matrix for successful update (Eq. 2)
        # X = [x, x_hat]^T, state dimension is 2n
        self.A1 = np.block([
            [self.A, -self.B @ self.K],
            [self.L @ self.C, self.A - self.L @ self.C - self.B @ self.K]
        ])

        # Augmented matrix for missed update (Eq. 3)
        self.A0 = np.block([
            [self.A, -self.B @ self.K],
            [np.zeros((self.n, self.n)), np.eye(self.n)]
        ])
        
        # Helper matrices for SSCD calculation
        self.I = np.eye(self.n * 2)
        self.O = np.zeros_like(self.A1)

In [5]:
# --- Define a Sample System (e.g., a double integrator) ---
# This is a placeholder for a real system model from a CPS application.
# Continuous-time state-space representation
h = 0.1  # Sampling period
A_c = np.array([[0, 1], [0, 0]])
B_c = np.array([[0], [1]])
C = np.array([[1, 0]])

# Dummy gains for K (controller) and L (observer).
# In a real scenario, these would be designed (e.g., via LQR/Kalman).
K = np.array([[1, 2]])
L = np.array([[0.5], [0.1]])

# Create the closed-loop system object
system = ClosedLoopSystem(A_c, B_c, C, K, L, h)

LinAlgError: Singular matrix

In [None]:

# --- Set Algorithm Inputs ---
# Desired GUES decay rate (must be negative)
gamma_star = -0.05
# Safe state space X_S: [x, x_dot, x_hat, x_dot_hat]
# Here, we only constrain the physical states x and x_dot.
# The observer states are allowed to vary more widely.
safe_space = HyperRectangle([
    (-2.0, 2.0),    # position
    (-2.0, 2.0),    # velocity
    (-10.0, 10.0),   # estimated position
    (-10.0, 10.0)    # estimated velocity
])

# Grid granularity for partitioning X_S
# A finer grid gives more accuracy but takes longer.
partitions = []
grid_delta = [1.0, 1.0, 5.0, 5.0]
dims = len(grid_delta)
iterators = [np.arange(safe_space.intervals[i, 0], safe_space.intervals[i, 1], grid_delta[i]) for i in range(dims)]

for point in itertools.product(*iterators):
    intervals = [(point[i], point[i] + delta[i]) for i in range(dims)]
    partitions.append(HyperRectangle(intervals))
print(f"Step 1: Partitioned safe space into {len(partitions)} grid cells.")
# Hyperperiod / sequence length
K_hyperperiod = 6


NameError: name 'HyperRectangle' is not defined

# --- Run the Algorithm ---


In [None]:
initial_cm_guess = k // 2
a0 = mkwhsa(clsys, gamma_star, 0, k, initial_cm_guess)
nl = len(a0)
cm_bar = max(a0.keys()) if a0 else 0
m_bar_max = k - (k // (cm_bar + 1)) if cm_bar > -1 else 0
print(f"Step 2-3: Initial analysis -> {nl} locations, Max consecutive miss (CM)={cm_bar}, Max total miss (M)={m_bar_max}")

# Line 4: Initialize the specification list
spec_list = [{'X': [], 'm_bar': m, 'cm_bar': cm_bar, 'cm': 0, 'K': k} for m in range(m_bar_max + 1)]

# Line 5: Initialize Safety Guard (SG) data structure
sg = {m: {} for m in range(m_bar_max + 1)}

# --- Main Loop (Line 6) ---
for m_j in range(m_bar_max + 1):
    print(f"\n--- Verifying for m_bar = {m_j} misses ---")
    
    # Line 7: Build WHSA for current miss count
    automaton_j = 
        
    # Line 8-9: Grid-wise safety verification
    print("Step 8-9: Performing grid-wise safety verification...")
    for i, partition in enumerate(partitions):
        _, sg[m_j] = verha(automaton_j, partition, x_s, m_j, k, sg[m_j])

    # Line 13-17: Derive specifications from SG
    print("Step 13-17: Deriving specifications from verification results...")
    spec = spec_list[m_j]
    all_safe_regions_for_mj = []
    
    # Find all safe regions from the SG table for this miss count
    for jump, regions in sg[m_j].items():
        all_safe_regions_for_mj.extend(regions)

    # The union represents the total safe initialization area found so far
    spec['X'] = union_of_rects(all_safe_regions_for_mj)

    # Update max consecutive misses based on what was found to be safe
    safe_locations = set(j[0] for j in sg[m_j].keys()) | set(j[1] for j in sg[m_j].keys())
    spec['cm_bar'] = max(safe_locations) if safe_locations else 0

    # Line 18: MDADT calculation (simplified)
    # The paper requires MDADT for SSCD '1'. This ensures recovery.
    # We'll use a placeholder value, as a full calculation requires
    # solving for Lyapunov functions not fully implemented in mkwhsa.
    spec['cm'] = 1 # Minimum 1 successful update for recovery.

return [s for s in spec_list if s['X']]

# --- Print Results ---
print("\n\n--- Final Synthesized Safe Weakly Hard Specifications ---")
if not final_specifications:
    print("No safe specifications could be found.")
else:
    for i, spec in enumerate(final_specifications):
        print(f"\nSpecification #{i+1}:")
        print(f"  (m_bar, cm_bar, cm, K) = ({spec['m_bar']}, {spec['cm_bar']}, {spec['cm']}, {spec['K']})")
        print(f"  Description: Allow max {spec['m_bar']} total misses and {spec['cm_bar']} consecutive misses in {spec['K']} steps.")
        print(f"  Requires at least {spec['cm']} consecutive successful updates for recovery.")
        print(f"  Safe Initial Regions (X):")
        if not spec['X']:
            print("    None found.")
        else:
            for region in spec['X']:
                print(f"    - {region}")

In [None]:
class GPUClosedLoopSystem:
    """
    GPU-accelerated version of ClosedLoopSystem
    """
    def __init__(self, A_c, B_c, C, K, L, h):
        self.n = A_c.shape[0]
        
        # Move matrices to GPU
        self.A_c_gpu = cp.array(A_c)
        self.B_c_gpu = cp.array(B_c)
        self.C_gpu = cp.array(C)
        self.K_gpu = cp.array(K)
        self.L_gpu = cp.array(L)
        
        # Discretize the system matrices on GPU
        self.A_gpu = cp.array(expm(A_c * h))  # Keep expm on CPU as it's not available in CuPy
        self.B_gpu = cp.linalg.inv(self.A_c_gpu) @ (self.A_gpu - cp.eye(self.n)) @ self.B_c_gpu
        
        # Augmented matrix for successful update
        self.A1 = cp.block([
            [self.A_gpu, -self.B_gpu @ self.K_gpu],
            [self.L_gpu @ self.C_gpu, self.A_gpu - self.L_gpu @ self.C_gpu - self.B_gpu @ self.K_gpu]
        ])

        # Augmented matrix for missed update
        self.A0 = cp.block([
            [self.A_gpu, -self.B_gpu @ self.K_gpu],
            [cp.zeros((self.n, self.n)), cp.eye(self.n)]
        ])
        
        self.I = cp.eye(self.n * 2)
        self.O = cp.zeros_like(self.A1)
        
    def to_cpu(self):
        """Convert all matrices back to CPU for compatibility with existing code"""
        return {
            'A1': cp.asnumpy(self.A1),
            'A0': cp.asnumpy(self.A0),
            'I': cp.asnumpy(self.I),
            'O': cp.asnumpy(self.O)
        }
        
    def batch_compute_trajectories(self, initial_states, sequence):
        """
        Compute multiple system trajectories in parallel on GPU
        
        Args:
            initial_states: Array of initial states (batch_size x state_dim)
            sequence: Array of 1s and 0s indicating successful/missed updates
        """
        states_gpu = cp.array(initial_states)
        sequence_gpu = cp.array(sequence)
        
        trajectories = []
        trajectories.append(cp.asnumpy(states_gpu))
        
        for update in sequence_gpu:
            if update:
                states_gpu = self.A1 @ states_gpu.T
            else:
                states_gpu = self.A0 @ states_gpu.T
            trajectories.append(cp.asnumpy(states_gpu.T))
            
        return np.array(trajectories)

In [None]:
# Example usage of GPU-accelerated system
def verify_partitions_gpu(system_gpu, partitions, sequence):
    """
    Parallel verification of multiple partitions using GPU
    """
    # Convert partitions to initial states
    initial_states = np.array([p.center for p in partitions])
    
    # Compute all trajectories in parallel
    trajectories = system_gpu.batch_compute_trajectories(initial_states, sequence)
    
    # Verify safety conditions (can be done on CPU as it's less compute-intensive)
    safe_partitions = []
    for i, trajectory in enumerate(trajectories):
        if is_trajectory_safe(trajectory):  # Define this based on your safety conditions
            safe_partitions.append(partitions[i])
    
    return safe_partitions

# Convert existing system to GPU version
system_gpu = GPUClosedLoopSystem(A_c, B_c, C, K, L, h)

# Example sequence of updates (1=success, 0=miss)
test_sequence = np.array([1, 0, 1, 1, 0, 1])

# Time both CPU and GPU versions for comparison
import time

print("Testing performance comparison:")

# CPU timing
start_time = time.time()
# Your existing verification code here
cpu_time = time.time() - start_time
print(f"CPU time: {cpu_time:.4f} seconds")

# GPU timing
start_time = time.time()
safe_regions_gpu = verify_partitions_gpu(system_gpu, partitions, test_sequence)
gpu_time = time.time() - start_time
print(f"GPU time: {gpu_time:.4f} seconds")
print(f"Speedup: {cpu_time/gpu_time:.2f}x")