In [28]:
import json
import yaml
import io
import re
import os

topics = set()
subs = {}
def expr_to_PlusCal(str_):
    return str_.replace("/", "").replace(".", "_").replace("!", "~").replace("==", "=").replace("(anonymous namespace)::","").replace("::","_").replace("True","TRUE").replace("False","FALSE").replace("||","\\/").replace("&&","/\\")

def input_port_name(topic, component):
    return f'{topic}_{component}'


In [29]:

def outVariablesTLA(jsonComp):
    vars = "variables\n\t\tmsg \in Data;\n"
    
    for state_var in jsonComp["potential_state_vars"]:
        variable = expr_to_PlusCal(state_var["qualified_name"])
        initial_value = expr_to_PlusCal(str(state_var["initial-value"]["literal"]))
        vars += f"\t\t{variable} = {initial_value};\n" 
    return vars


def topicVariables():
    variables = ''
    invariants = []
    for topic in topics:
        variables += f'\t{topic} = <<>>;\n'
        invariants.append(f'\t{topic} \in SeqOf(Data, MaxQueue)')

        if topic not in subs:
            continue
        for sub in subs[topic]:
            variables += f'\t{sub} = <<>>;\n'
            invariants.append(f'\t{sub} \in SeqOf(Data, MaxQueue)')
    
    for config in configuration:
        variable = expr_to_PlusCal(config)
        initial_value = expr_to_PlusCal(configuration[config])
        variables += f"\t{variable} = {initial_value};\n" 
    invariants_txt =  " /\\ \n".join(invariants)
    return f"""variables
    {variables}
    define 
        TypeInvariant == \n {invariants_txt}

        Response == <>({desired_output} /= <<>>)
    end define;"""

def transitionsTLA(jsonComp):
    reactive_behavior = {}
    comp_name = jsonComp["node"]
    transitions = ''
    for rb in jsonComp["reactive_behavior"]:
        if "event" in rb:
            continue
        input_topic = expr_to_PlusCal(rb["subscriber"]["topic"])
        topics.add(input_topic)
        if input_topic not in subs:
            subs[input_topic] = set()
        inport = input_port_name(input_topic, comp_name)

        
        subs[input_topic].add(inport)

        transitionID = expr_to_PlusCal(rb["subscriber"]["callback"])
        if inport not in reactive_behavior:
            reactive_behavior[inport] = ""
        outport = expr_to_PlusCal(rb["publisher"]["topic"])
        reactive_behavior[inport] += f"{outport} := {outport} (+) msg;\n"
        if outport not in topics:
            topics.add(outport)
        
    for t in jsonComp["transitions"]:
        if t["type"] == "message":
            input_topic = expr_to_PlusCal(t["topic"])
            topics.add(input_topic)
            if input_topic not in subs:
                subs[input_topic] = set()
            inport = input_port_name(input_topic, comp_name)
            subs[input_topic].add(inport)
            transitionID = expr_to_PlusCal(t["callback"])
            condition = expr_to_PlusCal(t["condition"])

            outputs = ""
            for o in t["outputs"]:
                outport = expr_to_PlusCal(o["publisher"]["topic"])
                if outport not in topics:
                    topics.add(outport)
                    outputs += f"                            {outport} := {outport} (+) msg;"
            
            state_changes = ""
            for sc in t["state_changes"]:
                variable  = expr_to_PlusCal(sc["variable"])
                new_value =  expr_to_PlusCal(sc["new_value"])
                state_changes += f"                            {variable} := {new_value};\n"

            if inport not in reactive_behavior:
                reactive_behavior[inport] = ""
            if len(state_changes) > 1 or len(outputs) > 1:
                reactive_behavior[inport] += (f"""
                            if {condition} then
    {state_changes}
    {outputs}
                            end if;
                    """)
            
        elif t["type"] == "interval":
            transitionID = expr_to_PlusCal(t["interval"])
            condition = expr_to_PlusCal(t["condition"])

            outputs = ""
            for o in t["outputs"]:
                outport = o["publisher"]["topic"]
                if outport not in topics:
                    topics.add(outport)
                    outputs += f"                        {outport} := {outport} (+) msg;\n"
            
            state_changes = ""
            for sc in t["state_changes"]:
                variable  = expr_to_PlusCal(sc["variable"])
                new_value =  expr_to_PlusCal(sc["new_value"])
                state_changes += f"{variable} := {new_value};\n"

            transitions += f"""
                {transitionID}:
                    if {condition} then
                        {state_changes}
{outputs}
                    end if;
                """

    return transitions, reactive_behavior
def systemTLA(files):

    spec = ""
    comp_names = []
    topic_names = []
    for file in files:
        comp_spec, comp_name = compTLA(file)
        spec += comp_spec
        comp_names.append(comp_name)

    for topic in topics:
        topic_spec = topicTLA(topic)
        if topic_spec:
            topic_names.append(topic.capitalize())
            spec += topic_spec
    tla_out= f'''
------------------------------- MODULE {system} -------------------------------
EXTENDS Sequences, Integers, TLC, FiniteSets
CONSTANTS {", ".join(comp_names)}, {", ".join(topic_names)}, Data, NULL, MaxQueue

ASSUME NULL \\notin Data


\* helper functions
''' + "SeqOf(set, n) == UNION {[1..m -> set] : m \\in 0..n} \* generates all sequences no longer than n consisting of elements in set" + f'''
seq (+) elem == Append(seq, elem)

(*--fair algorithm polling
{topicVariables()}
{spec}
end algorithm; *)
====
'''
    
    print(tla_out)

    with open(f'./results/{system}_gen.tla', 'w') as file:
        file.write(tla_out)



def topicTLA(topic_name):
    inports_spec = ""
    if topic_name not in subs:
        return ''
    
    for sub in subs[topic_name]:
        inports_spec += f"                        {sub} := {sub} (+) msg;\n"
    return f'''
    fair process {topic_name} \in {topic_name.capitalize()}
        
        begin 
            Write: 
                if {topic_name} /= <<>> then
                    msg := Head({topic_name});
                    {topic_name} := Tail({topic_name});
{inports_spec}
                end if;
        end process;
    '''



def compTLA(file):

    jsonComp = json.load(open(file))
    compInstance = jsonComp["node"] 
    compType = compInstance.capitalize()
    

    transitions, reactive_behavior = transitionsTLA(jsonComp)
    for inport in reactive_behavior:
        beh = re.sub(r'end if;\s*if', 'elsif', reactive_behavior[inport])
        transitions += f"""
            {inport}:
                if {inport} /= <<>> then
                    msg := Head({inport});
                    {inport} := Tail({inport});
                    {beh}
                end if;
            """
    
    return f'''
    fair process {compInstance} \in {compType}
        {outVariablesTLA(jsonComp)}
        begin 
            {transitions}
        end process;
    ''', compType

    

In [30]:
system = "autoware02"
desired_output = "cubic_splines_viz"
configuration = {"g_sim_mode" : "TRUE"}
files =  ["./results/autoware-02/driving_planner.lattice_trajectory_gen.json"]
topics = set()
subs = {}
tla_out = systemTLA(files)


------------------------------- MODULE autoware02 -------------------------------
EXTENDS Sequences, Integers, TLC, FiniteSets
CONSTANTS Lattice_trajectory_gen, Odom_pose, Base_waypoints, Control_pose, Data, NULL, MaxQueue

ASSUME NULL \notin Data


\* helper functions
SeqOf(set, n) == UNION {[1..m -> set] : m \in 0..n} \* generates all sequences no longer than n consisting of elements in set
seq (+) elem == Append(seq, elem)

(*--fair algorithm polling
variables
    	odom_pose = <<>>;
	odom_pose_lattice_trajectory_gen = <<>>;
	base_waypoints = <<>>;
	base_waypoints_lattice_trajectory_gen = <<>>;
	cubic_splines_viz = <<>>;
	control_pose = <<>>;
	control_pose_lattice_trajectory_gen = <<>>;
	unknown_topic = <<>>;
	g_sim_mode = TRUE;

    define 
        TypeInvariant == 
 	odom_pose \in SeqOf(Data, MaxQueue) /\ 
	odom_pose_lattice_trajectory_gen \in SeqOf(Data, MaxQueue) /\ 
	base_waypoints \in SeqOf(Data, MaxQueue) /\ 
	base_waypoints_lattice_trajectory_gen \in SeqOf(Data, MaxQueue) /\

In [31]:
system = "autoware03"
desired_output = "closest_waypoint"
configuration = {}
files =  ["./results/autoware-03/driving_planner.velocity_set.json"]
topics = set()
subs = {}
tla_out = systemTLA(files)


------------------------------- MODULE autoware03 -------------------------------
EXTENDS Sequences, Integers, TLC, FiniteSets
CONSTANTS Velocity_set, Odom_pose, Base_waypoints, Localizer_pose, Data, NULL, MaxQueue

ASSUME NULL \notin Data


\* helper functions
SeqOf(set, n) == UNION {[1..m -> set] : m \in 0..n} \* generates all sequences no longer than n consisting of elements in set
seq (+) elem == Append(seq, elem)

(*--fair algorithm polling
variables
    	odom_pose = <<>>;
	odom_pose_velocity_set = <<>>;
	base_waypoints = <<>>;
	base_waypoints_velocity_set = <<>>;
	closest_waypoint = <<>>;
	obstacle = <<>>;
	detection_range = <<>>;
	temporal_waypoints = <<>>;
	localizer_pose = <<>>;
	localizer_pose_velocity_set = <<>>;

    define 
        TypeInvariant == 
 	odom_pose \in SeqOf(Data, MaxQueue) /\ 
	odom_pose_velocity_set \in SeqOf(Data, MaxQueue) /\ 
	base_waypoints \in SeqOf(Data, MaxQueue) /\ 
	base_waypoints_velocity_set \in SeqOf(Data, MaxQueue) /\ 
	closest_waypoint \in Seq

In [32]:
system = "autoware10"
desired_output = "obj_label_marker"
assumed_inputs = {"points_raw"}
configuration = {}
files =  ["./results/autoware-10/cv_tracker.obj_reproj.json"]
topics = set()
subs = {}
systemTLA(files)


------------------------------- MODULE autoware10 -------------------------------
EXTENDS Sequences, Integers, TLC, FiniteSets
CONSTANTS Obj_reproj, Image_obj_tracked, Current_pose, Unknown, Data, NULL, MaxQueue

ASSUME NULL \notin Data


\* helper functions
SeqOf(set, n) == UNION {[1..m -> set] : m \in 0..n} \* generates all sequences no longer than n consisting of elements in set
seq (+) elem == Append(seq, elem)

(*--fair algorithm polling
variables
    	image_obj_tracked = <<>>;
	image_obj_tracked_obj_reproj = <<>>;
	current_pose = <<>>;
	current_pose_obj_reproj = <<>>;
	obj_label_marker = <<>>;
	obj_label = <<>>;
	unknown = <<>>;
	unknown_obj_reproj = <<>>;
	obj_label_bounding_box = <<>>;

    define 
        TypeInvariant == 
 	image_obj_tracked \in SeqOf(Data, MaxQueue) /\ 
	image_obj_tracked_obj_reproj \in SeqOf(Data, MaxQueue) /\ 
	current_pose \in SeqOf(Data, MaxQueue) /\ 
	current_pose_obj_reproj \in SeqOf(Data, MaxQueue) /\ 
	obj_label_marker \in SeqOf(Data, MaxQueue) /\ 
