# **Examples for CDC '25**

## **Case Study: Overtaking Scenario**

The running example from the paper, illustrating basic concepts in `pyspect`. The task is to verify an overtaking scenario.

### **Imports**

In [None]:
from math import pi
from time import time
from functools import reduce
from contextlib import contextmanager
from pyspect import *

In [None]:
@contextmanager
def timectx(msgfunc):
    """Context manager to time a block of code."""
    start = time()
    yield
    end = time()
    print(msgfunc(end-start))

### **Hyperparameters**

In [None]:
AXES = [dict(name=  't', bounds=[     0,     15],   step=0.5, unit='s'),
        dict(name=  'x', bounds=[     0,    200], points=201, unit='m'),
        dict(name=  'y', bounds=[    -4,     +4], points= 15, unit='m'),
        dict(name='yaw', bounds=[-pi/16, +pi/16], points= 13, unit='rad'), # pi/16 = 11.25deg
        dict(name='vel', bounds=[    15,     30], points= 51, unit='m/s')] # 15 mps ~= 54 kmph | 35 mps ~= 125 kmph

MAX_STEER = pi/64   # [rad/s]   +/- 2.8 degrees/s
MAX_ACCEL = 3.5     # [mps2]

### **Program**

#### Task definition

1. Define the different regions. These are "constants" in the task specification. Note that we can define the sets lazily, such that we do not rely on a specific implementation or set representation yet. This is possible because `pyspect` internally defers evaluation until realization time.

2. Write the specification $\varphi = \varphi_\text{env} \:\mathsf{U}\: \texttt{goal}$, where $\varphi_\text{env}, \texttt{goal} \in \mathsf{AP}$. While $\varphi_\text{env} = ( \neg p_\text{obs} \wedge ... )$ with $p_\text{obs} \in \mathsf{AP}$, and $p_\text{obs} \leftrightarrow z \in \texttt{OBS}$, we directly use the set $\texttt{OBS}$ in the specification. The proposition $\texttt{goal}$ is left symbolic and will be bound later.

**Note A.** This example uses `functools.reduce` to [_fold-left_](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) binary operators (e.g. `AND`). This is simply to reduce the code length. In this example, the semantics allow for both left- and right-associative reduction (which is not necessarily true for all downstream applications).  By default, `pyspect` does not assume associativity for any of its operators.

**Note B.** We define `OBS` in a funny way to make it easier to write `GOAL`. We want to describe the goal as "in front of `OBS`-vehicle", hence we need to know the front of `OBS`. When having this, it becomes shorter to (informally) define `OBS` such that `BEHIND_OBS < OBS < INFRONT_OBS`.

In [None]:
## (1) Define the different regions.

# All of the following objects are SetBuilder objects and will be propositions in later specs.

RIGHT_LANE  = BoundedSet(y=(-3.5, -0.5))
LEFT_LANE   = BoundedSet(y=(+0.5, +3.5))
BETWEEN     = BoundedSet(y=(-1.5, +1.5))

STRAIGHT = BoundedSet(yaw=(-pi/18, +pi/18))

INFRONT_OBS = HalfSpaceSet(normal=[ +1, -16],
                           offset=[ 40,   4],
                           axes = ['x', 't'])

BEHIND_OBS = HalfSpaceSet(normal=[ -1, +16],
                          offset=[  0,   4],
                          axes = ['x', 't'])

# See Note B.
OBS = reduce(Inter, [Compl(INFRONT_OBS), Compl(BEHIND_OBS), RIGHT_LANE])

GOAL = reduce(Inter, [
    INFRONT_OBS, 
    RIGHT_LANE,
    STRAIGHT,
    # NOTE: For stability of HJ solution, 
    #       target should not extend to domain boundary
    BoundedSet(x=(..., 195)),
])

## (2) Write the specification.

# All of the following objects are logical specifications.

ENV = reduce(AND, [
    reduce(OR, [
        AND(RIGHT_LANE, STRAIGHT),
        AND(LEFT_LANE, STRAIGHT),
        BETWEEN,
    ]),
    NOT(OBS),
    # NOTE: For stability of HJ solution, 
    #       target should not extend to domain boundary
    BoundedSet(vel=(16, 24), yaw=(-pi/16 + pi/32, +pi/16 - pi/32)),
])

TASK = UNTIL(ENV, 'goal')

#### Construct the TLT

3. Select the set of primitive TLTs. This determines the logic fragment in which the specification is interpreted. In this example, we use continuous-time LTL, i.e., LTL without the "next" operator.

4. Construct the temporal logic tree and bind the symbolic proposition $\texttt{goal}$ to the concrete set $\texttt{GOAL}$. The call to `.where(goal=GOAL)` updates a internal proposition map $\mathsf{M}$ so that $\texttt{goal} \leftrightarrow z \in \texttt{GOAL}$.

In [None]:
## (3) Select the set of primitive TLTs.

TLT.select(ContLTL)

## (4) Create the TLT and set the proposition 'goal'.

objective = TLT(TASK, where={'goal': GOAL})

#### Initialize the implementation object

First now, we introduce a specific implementation. `pyspect` has some off-the-shelf implementations that can be imported. Note, however, `pyspect` does not abstract _away_ the underlying back-end completely, as we see in the following cell.

5. Additional imports for the specific implementation. `TVHJImpl` is a wrapper class around the `hj_reachability` library. `pyspect` includes a forked version with convenience utilities.

6. Initialization of the implementation is fully backend-specific. In this case, we specify the dynamics (double integrator), bounds, time horizon, and discretization. This configuration determines how sets like `GOAL` and operators like `UNTIL` will be computed (e.g., using backward reachable sets on a grid).

7. Realize the TLT. This is the key step where the temporal logic and reachability analysis meet: `objective.realize(impl)` recursively applies the reachability and set operations defined by the selected logic fragment and implemented in the backend. The output `out` is the satisfaction set (i.e., the states from which the specification holds) in the implementation's set reprentation. 

8. Plotting. We visualize the satisfaction set using `HJImpl.plot`, which displays the reachable states as a level set over position, velocity, and time. This step is backend-dependent (as value functions are used).

In [None]:
## (5) Additional imports for the specific implementation.

from pyspect.impls.hj_reachability import TVHJImpl

from pyspect.systems.hj_reachability import *

## (6) Define the implementation of the reachability algorithm.

dynamics = dict(cls=Bicycle4D,
                wheelbase=2.7,
                min_accel=-MAX_ACCEL,
                max_accel=+MAX_ACCEL,
                min_steer=-MAX_STEER,
                max_steer=+MAX_STEER)

impl = TVHJImpl(dynamics, AXES)

## (7) Run the reachability program ##

with timectx(lambda t: f"Realization with HJ took {t:.2f} seconds"):
    out = objective.realize(impl)

# `out` will have the same object type that `impl` operates with.
# For TVHJImpl, `out` will be a numpy array of the gridded value function.
print(f"{type(out) = }, {out.min() = }, {out.max() = }")

## (8) Plotting

# Generate levelset for the other vehicle.
obs = OBS(impl)
print(f"{obs.min() = }, {obs.max() = }")

# NOTE: We remove a small margin for plotting (clearer boundary/separation from obs).
out = out + 0.03

impl.plot(
    out,
    (obs, dict(colorscale='reds')),
    axes=('x', 'y', 't'),
    camera_eye=impl.PLOT.EYE_ML_SW,
    camera_center=dict(x=.9, y=.9, z=-.8),
    layout_margin=dict(t=40, r=0, l=0, b=0),
).show()

In [None]:
for t in [6., 8., 10., 12.]:
    impl.plot(
        out,
        (obs, dict(colorscale='reds')),
        method='bitmap',
        axes=('x', 'y'),
        transform_select=[('t', t)],
        layout_height=300,
        layout_width=600,
        layout_title=f'At Time: {t = }',
    ).show()

## **Specification: $\square \psi$**

This specification requires the system to remain within a safe region $\psi$ for the entire time horizon. It encodes an invariance property, meaning the condition $\psi$ must hold at all times. The HJ backend realizes this using an avoid set computed via universal control, solving $\Box \psi$ as a special case of $\neg \Diamond \neg \psi$, while the HZ backend uses a fixed-point formulation, i.e. $\psi \land \bigcirc(\psi \land \bigcirc(\dots))$. Both methods result a valid set of initial states from which the system can always satisfy $\psi$ throughout the trajectory.

### **Imports**

In [None]:
# Implementation independent

from time import time
from contextlib import contextmanager
from tqdm import tqdm

from pyspect import *

# HJ specific

from pyspect.impls.hj_reachability import TVHJImpl
from pyspect.systems.hj_reachability import DoubleIntegrator as HJDoubleIntegrator
from pyspect.plotting.levelsets import *

# HZ specific

from pyspect.plotting.zonotopes import _hz2hj

from hz_reachability.hz_impl import TVHZImpl
from hz_reachability.systems.cars import *
from hz_reachability.systems.integrators import DoubleIntegrator as HZDoubleIntegrator
from hz_reachability.spaces import EmptySpace

In [None]:
@contextmanager
def timectx(msgfunc):
    """Context manager to time a block of code."""
    start = time()
    yield
    end = time()
    print(msgfunc(end-start))

def print_hzinfo(hz):
    if isinstance(hz, list):
        nz, ng, nb, nc = \
            np.array([[_out.dim, _out.ng, _out.nb, _out.nc]
                    for _out in hz]).max(axis=0)
    else:
        nz,ng,nb,nc = hz.dim, hz.ng, hz.nb, hz.nc
    print(f"nz: {nz}, ng: {ng}, nb: {nb}, nc: {nc}")

### **Hyperparameters**

In [None]:
AXES = [dict(name='t', bounds=[   0,   40],   step=0.5, unit='s'),
        dict(name='x', bounds=[-100, +100], points= 91, unit='m'),
        dict(name='v', bounds=[ -20,  +20], points= 91, unit='m/s')]

MAX_ACCEL = 1.0     # [mps2]

# HZ2HJ specific
TIME_STEP = AXES[0]['step']
TIME_HORIZON = AXES[0]['bounds'][1]
MIN_BOUNDS = [spec['bounds'][0] for spec in AXES if spec['name'] != 't']
MAX_BOUNDS = [spec['bounds'][1] for spec in AXES if spec['name'] != 't']
GRID_SHAPE = [spec['points'] for spec in AXES if spec['name'] != 't']

### **Program**

In [None]:
## SPECIFICATION

T = BoundedSet(x=(-50,  +50))

phi = ALWAYS(T)

#### Two interpretations of ALWAYS

In [None]:
# Define ALWAYS through RCI set (relating to ¬◇¬ψ)

@primitive(ALWAYS('_1'))
def Always_rci(_1: 'TLTLike') -> tuple[SetBuilder, APPROXDIR]:
    b1, a1 = _1._builder, _1._approx

    ao = APPROXDIR.UNDER
    return (
        AppliedSet('rci', b1),
        ao + a1 if ao * a1 == APPROXDIR.EXACT else 
        a1      if ao == a1 else
        APPROXDIR.INVALID,
    )

# Define Always through fixed-point iteration

@primitive(ALWAYS('_1'))
def Always_fp(_1: 'TLTLike') -> tuple[SetBuilder, APPROXDIR]:

    N = int(TIME_HORIZON / TIME_STEP)

    phi = 'psi'
    for _ in range(N-1):
        phi = AND('psi', NEXT(phi))

    tree = TLT(phi, where={'psi': _1})
    return (tree._builder, tree._approx)

#### HJ Implementation

In [None]:
## CONSTRUCT TLT

TLT.select(ContLTL | Always_rci)

tree = TLT(phi)

print(f"Approximation direction: {tree._approx = }")

## INITIALIZE IMPLEMENTATION

dynamics = dict(cls=HJDoubleIntegrator,
                min_accel=-MAX_ACCEL,
                max_accel=+MAX_ACCEL)

impl = TVHJImpl(dynamics, AXES)

## REALIZE ROOT NODE OF TLT

with timectx(lambda t: f"Realization with HJ took {t:.2f} seconds"):
    out = tree.realize(impl)

print(f"{type(out) = }")

## PLOT

impl.plot(
    out,
    axes=('x', 'v', 't'),
    colorscale='greens',
    camera_eye=impl.PLOT.EYE_MH_W,
)

#### HZ Implementation

In [None]:
## CONSTRUCT TLT

TLT.select(DiscLTL | Always_fp)

tree = TLT(phi)

print(f"Approximation direction: {tree._approx = }")

## INITIALIZE IMPLEMENTATION

space = EmptySpace(MIN_BOUNDS, MAX_BOUNDS)

dynamics = HZDoubleIntegrator(max_accel=MAX_ACCEL, dt=TIME_STEP)

impl = TVHZImpl(dynamics, space, AXES)
# impl.enable_reduce = True

## REALIZE ROOT NODE OF TLT

with timectx(lambda t: f"Realization with HZ took {t:.2f} seconds"):
    out = tree.realize(impl)

print(f"{type(out) = }")
print_hzinfo(out)

## Plot

if not isinstance(out, list):
    vf = np.array([_hz2hj(out, MIN_BOUNDS, MAX_BOUNDS, GRID_SHAPE)] * impl.N)
else:
    vf = np.array([_hz2hj(_out, MIN_BOUNDS, MAX_BOUNDS, GRID_SHAPE) for _out in tqdm(out)])

plot3D_levelset(
    vf,
    min_bounds=[           0, *MIN_BOUNDS],
    max_bounds=[TIME_HORIZON, *MAX_BOUNDS],
    xtitle='Position (m)',
    ytitle='Velocity (m/s)',
    colorscale='blues',
    camera_eye=EYE_MH_W,
)

## **Specification: $\square \lozenge \psi$**

This specification ensures that the system can reach $\psi$ infinitely often, i.e., $\psi$ must remain recurrently reachable throughout the time horizon. Formally, it is a liveness property requiring that at every point in time, it is possible to reach a state satisfying $\psi$ in the future. The HJ backend supports this through nested reachability using negation and disjunction (e.g., $\Box \Diamond \psi = \neg \Diamond \neg \Diamond \psi$). However, HZ implementations typically cannot soundly support this due to approximation mismatches may violate the logical semantics when doing the fixed-point iteration. In response, pyspect flags and rejects this combination.

### **Imports**

In [None]:
# Implementation independent

from time import time
from contextlib import contextmanager
from tqdm import tqdm

from pyspect import *

# HJ specific

from pyspect.impls.hj_reachability import TVHJImpl
from pyspect.systems.hj_reachability import DoubleIntegrator as HJDoubleIntegrator
from pyspect.plotting.levelsets import *

# HZ specific

from pyspect.plotting.zonotopes import _hz2hj

from hz_reachability.hz_impl import TVHZImpl
from hz_reachability.systems.cars import *
from hz_reachability.systems.integrators import DoubleIntegrator as HZDoubleIntegrator
from hz_reachability.spaces import EmptySpace

In [None]:
@contextmanager
def timectx(msgfunc):
    """Context manager to time a block of code."""
    start = time()
    yield
    end = time()
    print(msgfunc(end-start))

def print_hzinfo(hz):
    if isinstance(hz, list):
        nz, ng, nb, nc = \
            np.array([[_out.dim, _out.ng, _out.nb, _out.nc]
                    for _out in hz]).max(axis=0)
    else:
        nz,ng,nb,nc = hz.dim, hz.ng, hz.nb, hz.nc
    print(f"nz: {nz}, ng: {ng}, nb: {nb}, nc: {nc}")

### **Hyperparameters**

In [None]:
AXES = [dict(name='t', bounds=[   0,   40],   step=0.5, unit='s'),
        dict(name='x', bounds=[-100, +100], points= 91, unit='m'),
        dict(name='v', bounds=[ -20,  +20], points= 91, unit='m/s')]

MAX_ACCEL = 1.0     # [mps2]

# HZ2HJ specific
TIME_STEP = AXES[0]['step']
TIME_HORIZON = AXES[0]['bounds'][1]
MIN_BOUNDS = [spec['bounds'][0] for spec in AXES if spec['name'] != 't']
MAX_BOUNDS = [spec['bounds'][1] for spec in AXES if spec['name'] != 't']
GRID_SHAPE = [spec['points'] for spec in AXES if spec['name'] != 't']

### **Program**

In [None]:
## SPECIFICATION

T = BoundedSet(x=(-50,  +50))

phi = ALWAYS(EVENTUALLY(T))

#### Two interpretations of ALWAYS

In [None]:
# Define ALWAYS through RCI set (relating to ¬◇¬ψ)

@primitive(ALWAYS('_1'))
def Always_rci(_1: 'TLTLike') -> tuple[SetBuilder, APPROXDIR]:
    b1, a1 = _1._builder, _1._approx

    ao = APPROXDIR.UNDER
    return (
        AppliedSet('rci', b1),
        ao + a1 if ao * a1 == APPROXDIR.EXACT else 
        a1      if ao == a1 else
        APPROXDIR.INVALID,
    )

# Define Always through fixed-point iteration

@primitive(ALWAYS('_1'))
def Always_fp(_1: 'TLTLike') -> tuple[SetBuilder, APPROXDIR]:

    N = int(TIME_HORIZON / TIME_STEP)

    phi = 'psi'
    for _ in range(N-1):
        phi = AND('psi', NEXT(phi))

    tree = TLT(phi, where={'psi': _1})
    return (tree._builder, tree._approx)

#### HJ Implementation

In [None]:
## CONSTRUCT TLT

TLT.select(ContLTL | Always_rci)
tree = TLT(phi)

## INITIALIZE IMPLEMENTATION

dynamics = dict(cls=HJDoubleIntegrator,
                min_accel=-MAX_ACCEL,
                max_accel=+MAX_ACCEL)

impl = TVHJImpl(dynamics, AXES)

## REALIZE ROOT NODE OF TLT

with timectx(lambda t: f"Realization with HJ took {t:.2f} seconds"):
    out = tree.realize(impl)

## Plot

impl.plot(
    out,
    axes=('x', 'v', 't'),
    colorscale='greens',
    camera_eye=impl.PLOT.EYE_MH_W,
)

#### HZ Implementation

In [None]:
## CONSTRUCT TLT

TLT.select(DiscLTL | Always_fp)
tree = TLT(phi)

## INITIALIZE IMPLEMENTATION

space = EmptySpace(MIN_BOUNDS, MAX_BOUNDS)

dynamics = HZDoubleIntegrator(max_accel=MAX_ACCEL, dt=TIME_STEP)

impl = TVHZImpl(dynamics, space, AXES)
# impl.enable_reduce = True

## REALIZE ROOT NODE OF TLT

with timectx(lambda t: f"Realization with HZ took {t:.2f} seconds"):
    out = tree.realize(impl) # NOTE: Fails due to invalid approximation direction!

print_hzinfo(out)

## Plot

if not isinstance(out, list):
    vf = np.array([_hz2hj(out, MIN_BOUNDS, MAX_BOUNDS, GRID_SHAPE)] * impl.N)
else:
    vf = np.array([_hz2hj(_out, MIN_BOUNDS, MAX_BOUNDS, GRID_SHAPE) for _out in tqdm(out)])

plot3D_levelset(
    vf,
    min_bounds=[           0, *MIN_BOUNDS],
    max_bounds=[TIME_HORIZON, *MAX_BOUNDS],
    xtitle='Position (m)',
    ytitle='Velocity (m/s)',
    colorscale='blues',
    camera_eye=EYE_MH_W,
)