# Reaction-level CAL Theorem Examples

In [1]:
### A nice example of a timeline

import plotly.express as px

df = [
    dict(Task="Task 1", Start='2024-10-10 09:00', Finish='2024-10-10 11:00'),
    dict(Task="Task 2", Start='2024-10-10 10:00', Finish='2024-10-10 11:30'),
    dict(Task="Task 3", Start='2024-10-10 11:00', Finish='2024-10-10 12:00'),
]

fig = px.timeline(df, x_start="Start", x_end="Finish", y="Task", title="Real-Time System Example Timeline")
fig.update_yaxes(categoryorder="total ascending")
fig.show()


In [None]:
# Max-plus functions from https://github.com/msnhdyt/Max-Plus-Algebra
import numpy as np

# function for calculating A oplus B
def oplus(A, B):
  if not type(A) is np.ndarray or A.ndim != 2:
    raise TypeError("A must be a 2d numpy array")
  if not type(B) is np.ndarray or B.ndim != 2:
    raise TypeError("B must be a 2d numpy array")
  if A.shape != B.shape:
    raise TypeError("A and B must have the same shape: A.shape={}, B.shape={}".format(A.shape, B.shape))
  result = np.zeros(A.shape)
  for i, r in enumerate(A):
    for j, c in enumerate(r):
      result[i][j] = max(A[i][j], B[i][j])
  return result

# Function for calculating A otimes B, i.e., matrix multiplication
def otimes(A, B):
  if not type(A) is np.ndarray or A.ndim != 2:
    raise TypeError("A must be a 2d numpy array")
  if not type(B) is np.ndarray or B.ndim != 2:
    raise TypeError("B must be a 2d numpy array")
  if A.shape[1] != B.shape[0]:
    raise TypeError("A's 2nd dimension does not match B's 1st dimension: A.shape={}, B.shape={}".format(A.shape, B.shape))

  if A.shape == (1,1) or B.shape == (1,1):
    result = A + B
    return result
  else:
    result = np.zeros([A.shape[0], B.shape[1]])
    B = B.T
    for i, row_a in enumerate(A):
      for j, row_b in enumerate(B):
        result[i][j] = odot(row_a, row_b)
    return result
  
# Function for taking the max-plus dot product of two row vectors
def odot(a, b):
  assert a.ndim == b.ndim == 1, "Vector a and b must be 1D arrays"
  assert len(a) == len(b), "Vector a and b must have the same shape"
  result = np.array([float('-inf')] * len(a))
  for i in range(0, len(a)):
    result[i] = a[i] + b[i]
  return max(result)

# function for calculating A^n in otimes
def pow_otimes(A, n):
  if not type(A) is np.ndarray or A.ndim != 2:
    raise TypeError("the function only receive 2d numpy array")
  result = A
  for i in range(n-1):
    result = otimes(result, A)
  return result

# function for calculating trace(A)
def trace(A):
  if not type(A) is np.ndarray or A.ndim != 2:
    raise TypeError("the function only receive 2d numpy array")
  return max(A.diagonal())

# function for calculating A*
def star(A):
  if not type(A) is np.ndarray or A.ndim != 2:
    raise TypeError("the function only receive 2d numpy array")
  
  result = np.zeros(A.shape)
  # initialize result with E
  for i in range(len(A)):
    for j in range(len(A)):
      if i == j:
        result[i][j] = 0
      else:
        result[i][j] = float('-inf')

  for i in range(1,len(A)):
    result = oplus(result, pow_otimes(A,i))
  
  return result

# function for calculating A^+
def plus(A):
  if not type(A) is np.ndarray or A.ndim != 2:
    raise TypeError("the function only receive 2d numpy array")
  
  result = A

  for i in range(1,len(A)+1):
    result = oplus(result, pow_otimes(A,i))
  
  return result

In [3]:
A = np.array([[-2,3,1], [1,1,float('-inf')], [float('-inf'),2,1]])
A

array([[ -2.,   3.,   1.],
       [  1.,   1., -inf],
       [-inf,   2.,   1.]])

In [4]:
trace(A)

np.float64(1.0)

### Impossible Consistency

<img src="../img/ImpossibleConsistency.svg" alt="drawing" width="500"/>

In the formulation below, reaction 1s are ignored for simplicity.

In [7]:
# Negative infinity to denote nothing has happened yet
# epsilon = empty element = -inf
eps = float('-inf')
# Positive infinity to denote the exhaustion of future events
inf = float('inf')

# Return an epsilon-filled row vector with the same shape as the input vector.
def reset(a):
    return np.array([[eps] * a.shape[1]])

# Create vectors. These are row vectors, i.e., shape = 1 x n.
x_p = np.array([[eps] * 4])   # Physical firing times in the k+1-th iteration
x   = np.array([[eps] * 4])   # Physical firing times in the k-th iteration
u   = np.array([[eps] * 2])   # Logical scheduling times of physical actions (which is also physical time)

# Indices
a1  = 0 # Reaction 1 in reactor A
a2  = 1 # Reaction 2 in reactor A
b1  = 2 # Reaction 1 in reactor B
b2  = 3 # Reaction 2 in reactor B
pa  = 0 # Physical action in reactor A
pb  = 1 # Physical action in reactor B

# Setting execution times
e   = np.array([[1] * 4])   # Execution times

In [8]:

# Schedule physical actions
u[0][pa] = 1 # t_A = 1
u[0][pb] = 1 # t_B = 1

# Evolution equations
x_p[0][a1] = max(x[0][a2]+e[0][a2], u[0][pa])
x_p[0][b1] = max(x[0][b2]+e[0][b2], u[0][pb]) # This needs to precede the next two lines because it is used later.
x_p[0][a2] = max(x_p[0][a1]+e[0][a1], x_p[0][b1]+e[0][b1])
x_p[0][b2] = max(x_p[0][a1]+e[0][a1], x_p[0][b1]+e[0][b1])

# Report results
print("A1 fires at t={time}".format(time=x_p[0][a1]))
print("A2 fires at t={time}".format(time=x_p[0][a2]))
print("B1 fires at t={time}".format(time=x_p[0][b1]))
print("B2 fires at t={time}".format(time=x_p[0][b2]))

A1 fires at t=1.0
A2 fires at t=2.0
B1 fires at t=1.0
B2 fires at t=2.0


Now encode the evolution equations into max-plus matrix form to make the
evolution more functional.

We are trying to write the following equation:
x(k+1) = M x(k) ⊕ N u(k)

In [9]:
# Evolution matrix
M = np.array([
    [eps, e[0][a2], eps, eps], 
    [eps, e[0][a1]+e[0][a2], eps, e[0][a1]+e[0][a2]],
    [eps, eps, eps, e[0][b2]],
    [eps, e[0][a1]+e[0][a2], eps, e[0][a1]+e[0][a2]],
])

# Control coefficient matrix
N = np.array([
    [0, eps],
    [e[0][a1], e[0][b1]],
    [eps, 0],
    [e[0][a1], e[0][b1]],
])

In [10]:
def step(x, u):
    assert x.shape[0] == 1, "x is not a row vector: x.shape={0}".format(x.shape)
    assert u.shape[0] == 1, "u is not a row vector: u.shape={0}".format(u.shape)
    Mx = otimes(M, x.T)
    Nu = otimes(N, u.T)
    # print("Mx:{}".format(Mx))
    # print("Nu:{}".format(Nu))
    # Returns a row vector to make iterations easy.
    return oplus(Mx, Nu).T

In [11]:
# Schedule physical actions
u[0][pa] = 1
u[0][pb] = eps

# Step the system from k=0 to k=1
x_p = step(x, u)
x_p

array([[  1.,   2., -inf,   2.]])

Let's create a machine that steps itself. Here we assume physical
actions behave like timers. Each iteration is a different timestamp.

### NOTE
This formulation assumes that each iteration `k`
represents a new tag described by `u(k)`, so it does not make sense for
`u(k)` to have two different non-negative-infinite values. Only three
cases are possible here:
1) One element of `u(k)` is a finite future logical time to be advanced to, and the other element is `-inf`.
2) Both elements are some identical future logical time.
3) Both elements are `-inf`.

In [12]:
def report_stats(k, x, u):
    next_logical_time = np.max(u)
    lag_k = x - next_logical_time # lag
    print("Tag index k={k}".format(k=k))
    print("u(k)= {0}".format(u))
    # print("next logical time = {0}".format(next_logical_time))
    print("x(k) = earliest possible firing times = {0}".format(x))
    print("lag(k) = {0}".format(lag_k))

The cell below sets all parameters.

In [13]:
# Initial firing of actions
pa_init_offset = 0
pb_init_offset = 0
u[0][pa] = pa_init_offset
u[0][pb] = pb_init_offset

# Set the period of the actions 
# (Either same or one is -inf. See NOTE above)
# NOTE 2: eigenvalue(M) = 2. Ie., smallest minimum spacing of the physical action to make the systme feasible
pa_period = 2 # If actions scheduled simultaneously, use 1.999 to see divergence. Use inf if no future events are scheduled.
pb_period = 2 # If actions scheduled simultaneously, use 1.999 to see divergence. Use inf if no future events are scheduled.

# If following is uncommented, set pa_init_offset=0 and
# pb_init_offset=0.001, then set pa_period=pb_period=3.999 to observe
# divergence, which disappears when pa_period=pb_period=4
"""
toggle = True
pa_cache = u[0][pa]
pb_cache = u[0][pb]
"""

# STAs 
"""
sta_a = 1
sta_b = 1
"""

# Interation count
k = 0

# Reset x to eps.
x = reset(x)

In [14]:
# Evolution matrix
M = np.array([
    [eps, e[0][a2], eps, eps], 
    [eps, e[0][a1]+e[0][a2], eps, e[0][a1]+e[0][a2]],
    [eps, eps, eps, e[0][b2]],
    [eps, e[0][a1]+e[0][a2], eps, e[0][a1]+e[0][a2]],
])

# Control coefficient matrix
N = np.array([
    [0, eps],
    [e[0][a1], e[0][b1]],
    [eps, 0],
    [e[0][a1], e[0][b1]],
])

# FIXME: Check to see if adding STAs this way makes sense.
# N = np.array([
#     [sta_a, eps],
#     [e[0][a1]+sta_a, e[0][b1]+sta_b],
#     [eps, sta_b],
#     [e[0][a1]+sta_a, e[0][b1]+sta_b],
# ])

Execute the following *once*.

In [15]:
# Step the system from k=0 to k=1
x = step(x, u)
k += 1

# Statistics
report_stats(k, x, u)

Tag index k=1
u(k)= [[0. 0.]]
x(k) = earliest possible firing times = [[0. 1. 0. 1.]]
lag(k) = [[0. 1. 0. 1.]]


Then execute the following *repeatedly*.

In [16]:
# Bump the next physical action firing times
u[0][pa] = u[0][pa] + pa_period
u[0][pb] = u[0][pb] + pb_period

"""
if toggle:
    u[0][pa] = pa_cache + pa_period
    pb_cache = u[0][pb]
    u[0][pb] = eps
else:
    pa_cache = u[0][pa]
    u[0][pa] = eps
    u[0][pb] = pb_cache + pb_period
toggle = not toggle
print(f"toggle={toggle}")
"""

# Step the system from k to k+1
x = step(x, u)
k += 1

# Statistics
report_stats(k, x, u)

Tag index k=2
u(k)= [[2. 2.]]
x(k) = earliest possible firing times = [[2. 3. 2. 3.]]
lag(k) = [[0. 1. 0. 1.]]


### Divergence
Divergence occurs when the gap between `u(k)` and `x(k)` *grows* over
iterations. We should not see divergence when the action periods = 10,
but should see divergence when the action periods = 1. 

Note that even if
one action is always absent, i.e., `offset = period = eps`, divergence
still occurs. This is because `x(k)` currently represents the "earliest
physical firing times *if triggered*". The formulation does not
currently include information of whether a reaction gets triggered or not.

### Theorem: Infeasibility

A sufficient STA needs to satisfy: $\forall u(k) \in \mathbb{T}, u(k) +
\textit{STA} \ge x(k)$.

If Consistency is indeed Impossible for this program, we should prove
that "No finite STA satisfies this constraint for all possible u(k)."

### Proof

In other words, a sufficient STA needs to satisfy: $\forall u(k) \in \mathbb{T}, \textit{STA} \ge x(k+1) -
u(k)$.

Define $\textit{STA}(k) = x(k+1) - u(k)$, which means the "minimum STA
required to handle the k+1-th time tag". For an STA to be a sufficient parameter, it
needs to satisfy $\forall k \in \mathbb{I}, \textit{STA} \ge \textit{STA}(k)$.

Because of divergence, $\textit{STA}(k)$ approaches $\infty$ as $k$ increases.

In the special case where $u(k) = u(k-1) + \textit{period}$ and $\textit{period} = 1$, the
only $\textit{STA} >= \textit{STA}(k)$ for all $k$ is $\textit{STA} = \infty$.

Therefore, in the general case when $u(k)$ is *unknown*, the only satisfying $\textit{STA}$ is $\textit{STA} = \infty$.