# Example Temporal Logic Monitoring in Python

RTAMT is a Python library for monitoring Signal Temporal Logic (STL). It supports both offline and online monitoring of discrete-time and dense-time STL specifications. This tutorial provides a step-by-step guide to installing and using RTAMT.

For more details, refer to the [RTAMT GitHub repository](https://github.com/nickovic/rtamt).

## Installation

Install the following package for RTAMT:

In [None]:
%pip install rtamt

## Usage

rtamt can be used in four ways:
- **discrete-time** (discrete time-steps) versus **dense-time** (continuous signals), and
- **online** (signal values are added one-at-a-time) versus **offline** (the entire trace is available)

### Discrete-Time Online Monitor
The `StlDiscreteTimeSpecification` class is used for monitoring discrete-time STL specifications. Here is an example:

In [None]:
import sys
import rtamt

def monitor():
    # create the STL specification
    spec = rtamt.StlDiscreteTimeSpecification()
    spec.declare_var('a', 'int')
    spec.declare_var('b', 'int')
    spec.spec = 'eventually[0,1] (a >= b)' # spec: check on this time-step if a >= b

    # parse the specification
    try:
        spec.parse()
        spec.pastify()
    except rtamt.RTAMTException as err:
        print('RTAMT Exception: {}'.format(err))
        sys.exit()

    # check the specification on time-step t0
    robustness = spec.update(0, [('a', 100), ('b', 20)])
    print('time=' + str(0) + ' robustness= ' + str(robustness))

    # check the specification on time-step t1
    robustness = spec.update(1, [('a', 10), ('b', 20)])
    print('time=' + str(1) + ' robustness= ' + str(robustness))

    # check the specification on time-step t2
    robustness = spec.update(2, [('a', 1), ('b', 20)])
    print('time=' + str(2) + ' robustness= ' + str(robustness))
    
    # check the specification on time-step t3
    robustness = spec.update(3, [('a', 2), ('b', 1)])
    print('time=' + str(3) + ' robustness= ' + str(robustness))

if __name__ == '__main__':
    monitor()

### Discrete-time Offline Monitor

In [None]:
# Get CSV file from an incubator dataset
import os

# Get the current working directory.
current_dir = os.getcwd()

assert os.path.basename(current_dir) == '8-Monitoring', 'Current directory is not 8-Monitoring'

# Get the parent directory. Should be the root of the repository
parent_dir = os.path.dirname(current_dir)

# The root of the repo should contain the incubator_dt folder. Otherwise something went wrong in 0-Pre-requisites.
assert os.path.exists(os.path.join(parent_dir, 'incubator_dt')), 'incubator_dt folder not found in the repository root'

csv_file_path = os.path.join(parent_dir, 'incubator_dt', 'software', 'incubator', 'datasets', '20230501_calibration_empty_system', '20230501_calibration_empty_system.csv')

assert os.path.exists(csv_file_path), '20230501_calibration_empty_system.csv not found in the incubator repository.'

import matplotlib.pyplot as plt
import pandas as pd

data = pd.read_csv(csv_file_path)
data.head()

#### Specification 1: Checking incubator temperature

Here, we're going to see if the incubator temperature reaches a particular value.

In [None]:
import sys
import csv
import rtamt
import os

from rtamt.spec.stl.discrete_time.specification import Semantics

def monitor():

    # create the specification and the variables
    spec = rtamt.StlDiscreteTimeSpecification(semantics=rtamt.Semantics.STANDARD)
    spec.name = 'Incubator temp'
    spec.declare_var('average_temperature', 'float')
    spec.set_var_io_type('average_temperature', 'output')
    spec.spec = 'eventually[0:20](average_temperature>=37)'
    try:
        spec.parse()
        spec.pastify()
    except rtamt.RTAMTException as err:
        print('RTAMT Exception: {}'.format(err))
        sys.exit()

    #calculate the robustness for each time-step, and capture the first time-step where the property becomes true
    first_moment = -1
    robustness = []
    for i in range(len(data['average_temperature'])):
        rob = spec.update(i, [('average_temperature', data['average_temperature'][i])])
        robustness.append(rob)
        if rob > 0 and first_moment == -1:
            first_moment = i
        
    #print('Robustness: {}'.format(robustness))
    return robustness, first_moment
robustness,first_moment = monitor()
print("Satisifed at time stamp: " + str(first_moment))

In [None]:
plt.figure(figsize=(8, 4))

# Plot temperature data on the first subplot
plt.plot(data.index, data.average_temperature, label='Temperature')
plt.plot(data.index, robustness, label='Robustness')
plt.ylabel('Temperature (°C)')
plt.legend()

plt.axhline(y = 35, color = 'r', linestyle = '-') # shows when temperature is above 30 degrees
plt.axhline(y = 0, color = 'black', linestyle = '-') # shows when property is true
plt.axvline(x = first_moment, color = 'g', linestyle = '-') # shows at what time stamp the property becomes true

# Add a title to the shared X-axis
fig.suptitle('Temperature and Robustness Over Time')

# Show the plots
plt.show()

#### Specification 2: Checking incubator temperature in relation to the heater

Here, we're going to see if the incubator temperature reaches a particular value after the heater is on.

In [None]:
import sys
import csv
import rtamt
import os

from rtamt.spec.stl.discrete_time.specification import Semantics

def monitor():

    spec = rtamt.StlDiscreteTimeSpecification(semantics=rtamt.Semantics.STANDARD)
    spec.name = 'Incubator temp'
    spec.declare_var('average_temperature', 'float')
    spec.set_var_io_type('average_temperature', 'output')
    spec.declare_var('heater_on', 'float')
    spec.set_var_io_type('heater_on', 'input')
    spec.spec = '((heater_on == 1) implies eventually[0:5] (average_temperature >= 35))'
    try:
        spec.parse()
        spec.pastify()
    except rtamt.RTAMTException as err:
        print('RTAMT Exception: {}'.format(err))
        sys.exit()

    first_moment = -1
    robustness = []
    for i in range(len(data['average_temperature'])):
        rob = spec.update(i, [('average_temperature', data['average_temperature'][i]),('heater_on', data['heater_on'][i])])
        robustness.append(rob)
        if rob > 0 and first_moment == -1:
            first_moment = i
        
    #print('Robustness: {}'.format(robustness))
    return robustness, first_moment
robustness,first_moment = monitor()
print("Satisifed at time stamp: " + str(first_moment))

In [None]:
plt.figure(figsize=(8, 4))

# Plot temperature data on the first subplot
plt.plot(data.index, data.average_temperature, label='Temperature')
plt.plot(data.index, data.heater_on*10, label='Heater On')
plt.plot(data.index, robustness, label='Robustness')
plt.ylabel('Temperature (°C)')
plt.legend()

plt.axhline(y = 35, color = 'r', linestyle = '-') # shows when temperature is above 30 degrees
plt.axhline(y = 0, color = 'black', linestyle = '-') # shows when property is true
plt.axvline(x = first_moment, color = 'g', linestyle = '-') # shows at what time stamp the property becomes true

# Add a title to the shared X-axis
fig.suptitle('Temperature and Robustness Over Time')

# Show the plots
plt.show()

### Exercises:

1. What do the two specifications mean in **natural language**? As in, how would you explain them to someone else in words.
2. The second plot shows that the robustness is 0 near the beginning of the trace. This means the specification is failing. Why does this happen?
3. Experiment with the first specification. Try changing the 'eventually' operator to 'always', the time bounds, or changing the temperature value. Report three experiments by:
    - writing the specification you chose
    - showing the resulting plot, and
    - describing what the plot shows and how it relates to the specification
4. Perform and report three more experiments for the second specification.

### Dense-Time Offline Monitor
To perform offline monitoring of dense-time STL specifications (example from https://link.springer.com/article/10.1007/s10009-023-00720-3):

In [None]:
import rtamt

# Signals are lists of tuples (timestamp, value)
req = [[0.0, 0.0], [3.0, 6.0], [5.0, 0.0]]
gnt = [[0.0, 0.0], [7.0, 6.0], [9.0, 0.0]]

spec = rtamt.StlDenseTimeSpecification()
# Declare the variables that will correspond to the above signals.
spec.declare_var('req', 'float')
spec.declare_var('gnt', 'float')
spec.spec = 'always((req >= 3) implies (eventually[0:5](gnt >= 3)))'
spec.parse()

# Evaluate the specification and match the variables with the signals.
rob = spec.evaluate(['req', req], ['gnt', gnt])

print('Robustness:', rob)

Note that:
 1. The output format is [[timestamp, robustness]]
 2. RTAMT only computes the quantitative/robustness semantics

For more details, refer to the [RTAMT GitHub repository](https://github.com/nickovic/rtamt).


## Exercises

### 1: Quantitative v.s. Boolean Semantics

Can you convert the output from `spec.evaluate()` into boolean verdicts? (i.e. an array of form `[[float, bool]]`)

Compare this to the quntitative results. Which gives you the most information?

### 2: Streaming input

RTAMT also has an online monitoring algorithm, which receives streams incrementally.

To use this create the spec with the `rtamt.StlDenseTimeOnlineSpecification()` method and provide each value with the method `spec.update` method, which takes a list of stream values, and returns a single robustness value.

Try running the following supecification:

In [None]:
import sys
import rtamt

def monitor():
    a1 = [(0, 3), (3, 2)]
    b1 = [(0, 2), (2, 5), (4, 1), (7, -7)]

    a2 = [(5, 6), (6, -2), (8, 7), (11, -1)]
    b2 = [(10, 4)]

    a3 = [(13, -6), (15, 0)]
    b3 = [(15, 0)]

    # # stl
    spec = rtamt.StlDenseTimeSpecification()
    spec.name = 'STL dense-time specification'
    spec.declare_var('a', 'float')
    spec.spec = 'a>=2'
    try:
        spec.parse()
    except rtamt.RTAMTException as err:
        print('RTAMT Exception: {}'.format(err))
        sys.exit()

    rob = spec.update(['a', a1], ['b', b1])
    print('rob: ' + str(rob))

    rob = spec.update(['a', a2], ['b', b2])
    print('rob: ' + str(rob))

    rob = spec.update(['a', a3], ['b', b3])
    print('rob: ' + str(rob))

if __name__ == '__main__':
    monitor()


#### 2.1: Streams of inputs

How does the `update` method allow monitors to handle progressively arriving data?

Think about how you could use this to monitor data received from a network or a time-series database?


### 3: More info

Have a look at the description of the tool in https://link.springer.com/article/10.1007/s10009-023-00720-3 and the [RTAMT GitHub repository](https://github.com/nickovic/rtamt).

What other types of useful monitors can you write in RTAMT?