In [1]:
from ss2hcsp.sl.sl_diagram import SL_Diagram
from ss2hcsp.sl.get_hcsp import get_hcsp
from ss2hcsp.hcsp.pprint import pprint
from ss2hcsp.sf.sf_chart import get_common_ancestor
from ss2hcsp.sf.sf_state import OR_State, AND_State
from ss2hcsp.hcsp import hcsp
from ss2hcsp.hcsp import expr
from ss2hcsp.hcsp.pprint import pprint
from ss2hcsp.matlab.convert import convert_expr, convert_cmd

In [2]:
location = "./Examples/Stateflow/tests/no_event_trig.xml"
diagram = SL_Diagram(location=location)
model_name = diagram.parse_xml()
diagram.add_line_name()
_, _, charts, _, _, _, _, _ = diagram.seperate_diagram()
chart = charts[0]
print(charts[0])

Chart(Chart):
AND{id=35, name=Chart}
  Children:
    AND{id=4, name=A}
      Children:
        OR{id=7, name=A1}
          Default: (None)--{x = -1}-->(7)
          Out-Transitions:
            (7)--->(9)
        OR{id=9, name=A2}
          en: [f("en_A2")]
          Out-Transitions:
            (9)--->(32)
        OR{id=32, name=A3}
          en: [f("en_A3")]
    AND{id=6, name=B}
      Children:
        OR{id=16, name=B1}
          Default: (None)--->(16)
          Out-Transitions:
            (16)--{f("b")}/{E;
        f("tb")}-->(17)
        OR{id=17, name=B2}
          en: [f("en_B2")]



In [3]:
chart.diagram.children[0].children[2].en

FunctionCall('f',AConst('en_A3'))

In [4]:
chart.diagram.funs

[Function('f',('s',),FunctionCall('fprintf',Var('s')),None)]

In [5]:
chart.all_states

{'4': <ss2hcsp.sf.sf_state.AND_State at 0x1ee30b73c18>,
 '7': <ss2hcsp.sf.sf_state.OR_State at 0x1ee328229b0>,
 '9': <ss2hcsp.sf.sf_state.OR_State at 0x1ee32822780>,
 '32': <ss2hcsp.sf.sf_state.OR_State at 0x1ee32822550>,
 '6': <ss2hcsp.sf.sf_state.AND_State at 0x1ee32822eb8>,
 '16': <ss2hcsp.sf.sf_state.OR_State at 0x1ee32827470>,
 '17': <ss2hcsp.sf.sf_state.OR_State at 0x1ee3282e208>,
 '35': <ss2hcsp.sf.sf_state.AND_State at 0x1ee30b73080>}

In [6]:
type(chart.diagram)

ss2hcsp.sf.sf_state.AND_State

In [7]:
def get_procedures(chart):
    res = dict()
    for fun in chart.diagram.funs:
        res[fun.name] = fun
    return res

In [8]:
def get_chain_to_ancestor(state, ancestor):
    """Chain of states from the current state to ancestor (not including ancestor)."""
    chain = []
    while state != ancestor:
        chain.append(state)
        state = state.father
    return chain

In [9]:
def en_proc_name(state):
    """en action of state."""
    return "en_" + state.name

def du_proc_name(state):
    """du action of state."""
    return "du_" + state.name

def ex_proc_name(state):
    """ex action of state."""
    return "ex_" + state.name

def active_state_name(state):
    """Variable indicating which child state of the current state is active.
    
    This variable has type string. It is applicable only if the current state
    has child OR-states. If one of the child OR-states is active, this variable
    is assigned the name of the child state. If none of the child OR-states is
    active, this variable is assigned the empty string.

    """
    return state.name + "_st"

In [10]:
def entry_proc_name(state):
    """Procedure for entry into state.
    
    This does NOT include recursive entry into child states.
    
    """
    return "entry_" + state.name

def during_proc_name(state):
    """Procedure for executing at state.
    
    This does NOT include recursive execution of child states.
    
    """
    return "during_" + state.name

def exit_proc_name(state):
    """Procedure for exiting from state.
    
    This does NOT include recursive exit from child states.
    
    """
    return "exit_" + state.name

In [11]:
def get_en_proc(state):
    if not state.en:
        return hcsp.Skip()
    return convert_cmd(state.en, procedures=get_procedures(chart))

def get_du_proc(state):
    if not state.du:
        return hcsp.Skip()
    return convert_cmd(state.du, procedures=get_procedures(chart))

def get_ex_proc(state):
    if not state.ex:
        return hcsp.Skip()
    return convert_cmd(state.ex, procedures=get_procedures(chart))

In [12]:
def get_entry_proc(state):
    """Entry procedure for the given state.
    
    The procedure does only two things:
    - if the current state is an OR-state, assign the corresponding active state
      variable.
    - call the en action of the state.

    """
    procs = []
    
    # Set the activation variable
    if isinstance(state, OR_State):
        procs.append(hcsp.Assign(active_state_name(state.father), expr.AConst(state.name)))
    
    # Perform en action
    procs.append(hcsp.Var(en_proc_name(state)))
    return hcsp.seq(procs)

In [13]:
def get_transition_proc(src, dst, middle_proc=None):
    """Get procedure for transitioning between two states.
    
    src - source state.
    dst - destination state.
    middle_proc : HCSP - procedure to execute in the middle.

    """
    ancestor = get_common_ancestor(src, dst)
    procs = []

    # Exit states from src to ancestor (not including ancestor)
    procs.append(get_rec_exit_proc(src))
    for state in get_chain_to_ancestor(src, ancestor)[1:]:
        procs.append(hcsp.Var(exit_proc_name(state)))
        
    if middle_proc is not None:
        procs.append(middle_proc)
        
    # Enter states from ancestor to state1
    for state in reversed(get_chain_to_ancestor(dst, ancestor)[1:]):
        procs.append(hcsp.Var(entry_proc_name(state)))
    procs.append(get_rec_entry_proc(dst))
        
    return hcsp.seq(procs)

In [14]:
def get_rec_entry_proc(state):
    """Return the process for recursively entering into state."""
    procs = []
    procs.append(hcsp.Var(entry_proc_name(state)))

    # Recursively enter into child states
    if state.children:
        if isinstance(state.children[0], AND_State):
            for child in state.children:
                procs.append(get_rec_entry_proc(child))
        elif isinstance(state.children[0], OR_State):
            for child in state.children:
                if child.default_tran:
                    procs.append(get_rec_entry_proc(child))
        else:
            raise TypeError
    return hcsp.seq(procs)

In [15]:
def get_rec_exit_proc(state):
    """Return the process for recursively exiting from state."""
    procs = []
    
    # Recursively exit from child states
    if state.children:
        if isinstance(state.children[0], AND_State):
            for child in reversed(state.children):
                procs.append(get_rec_exit_proc(child))
        elif isinstance(state.children[0], OR_State):
            ite = []
            for child in state.children:
                ite.append((expr.RelExpr("==", expr.AVar(active_state_name(state)), expr.AConst(child.name)),
                            get_rec_exit_proc(child)))
            procs.append(hcsp.ITE(ite))
        else:
            raise TypeError
    
    procs.append(hcsp.Var(exit_proc_name(state)))
    return hcsp.seq(procs)

In [16]:
def init_name():
    return "init_" + chart.name

def exec_name():
    return "exec_" + chart.name

def get_init_proc():
    return get_rec_entry_proc(chart.diagram)

def get_exec_proc():
    return get_rec_during_proc(chart.diagram)

def raise_event(event):
    return hcsp.Var(exec_name())

In [17]:
def get_during_proc(state):
    """During procedure for the given state.
    
    The procedure performs the following steps:
    - check if there are outgoing transitions from the state. If one of them is
      valid, carry out the transition.
    - If none of the transitions are valid, perform the du action of the state.

    """
    procs = []

    if isinstance(state, OR_State) and state.out_trans:
        ite = []
        for tran in state.out_trans:
            cond, cond_act, tran_act = expr.true_expr, None, None
            src = chart.all_states[tran.src]
            dst = chart.all_states[tran.dst]
            if tran.label is not None:
                if tran.label.event is not None:
                    raise NotImplementedError
                if tran.label.cond is not None:
                    cond = convert_expr(tran.label.cond)
                if tran.label.cond_act is not None:
                    cond_act = convert_cmd(tran.label.cond_act, raise_event=raise_event, procedures=get_procedures(chart))
                if tran.label.tran_act is not None:
                    tran_act = convert_cmd(tran.label.tran_act, raise_event=raise_event, procedures=get_procedures(chart))
            tran_act = get_transition_proc(src, dst, tran_act)
            act = tran_act if cond_act is None else hcsp.seq([cond_act, tran_act])
            ite.append((cond, act))
        procs.append(hcsp.ITE(ite, hcsp.Var(du_proc_name(state))))
    else:
        procs.append(hcsp.Var(du_proc_name(state)))
    return hcsp.seq(procs)

In [18]:
def get_exit_proc(state):
    """Exit procedure from the given state.
    
    The procedure does only two things:
    - if the current state is an OR-state, clear the corresponding active state
      variable.
    - call the ex action of state.

    """
    procs = []
    
    # Perform ex action
    procs.append(hcsp.Var(ex_proc_name(state)))

    # Set the activation variable
    if isinstance(state, OR_State):
        procs.append(hcsp.Assign(active_state_name(state.father), expr.AConst("")))
        
    return hcsp.seq(procs)

In [19]:
for ssid in chart.all_states:
    state = chart.all_states[ssid]
    print(en_proc_name(state), '::=', get_en_proc(state))

en_A ::= skip
en_A1 ::= skip
en_A2 ::= log("en_A2")
en_A3 ::= log("en_A3")
en_B ::= skip
en_B1 ::= skip
en_B2 ::= log("en_B2")
en_Chart ::= skip


In [20]:
for ssid in chart.all_states:
    state = chart.all_states[ssid]
    print(entry_proc_name(state), '::=', get_entry_proc(state))

entry_A ::= @en_A
entry_A1 ::= A_st := "A1"; @en_A1
entry_A2 ::= A_st := "A2"; @en_A2
entry_A3 ::= A_st := "A3"; @en_A3
entry_B ::= @en_B
entry_B1 ::= B_st := "B1"; @en_B1
entry_B2 ::= B_st := "B2"; @en_B2
entry_Chart ::= @en_Chart


In [21]:
for ssid in chart.all_states:
    state = chart.all_states[ssid]
    print(ex_proc_name(state), '::=', get_ex_proc(state))

ex_A ::= skip
ex_A1 ::= skip
ex_A2 ::= skip
ex_A3 ::= skip
ex_B ::= skip
ex_B1 ::= skip
ex_B2 ::= skip
ex_Chart ::= skip


In [22]:
for ssid in chart.all_states:
    state = chart.all_states[ssid]
    print(exit_proc_name(state), '::=', get_exit_proc(state))

exit_A ::= @ex_A
exit_A1 ::= @ex_A1; A_st := ""
exit_A2 ::= @ex_A2; A_st := ""
exit_A3 ::= @ex_A3; A_st := ""
exit_B ::= @ex_B
exit_B1 ::= @ex_B1; B_st := ""
exit_B2 ::= @ex_B2; B_st := ""
exit_Chart ::= @ex_Chart


In [23]:
for ssid in chart.all_states:
    state = chart.all_states[ssid]
    print(du_proc_name(state), '::=', get_du_proc(state))

du_A ::= skip
du_A1 ::= skip
du_A2 ::= skip
du_A3 ::= skip
du_B ::= skip
du_B1 ::= skip
du_B2 ::= skip
du_Chart ::= skip


In [24]:
for ssid in chart.all_states:
    state = chart.all_states[ssid]
    print(during_proc_name(state), '::=', get_during_proc(state))

during_A ::= @du_A
during_A1 ::= if true then @exit_A1; @entry_A2 else @du_A1 endif
during_A2 ::= if true then @exit_A2; @entry_A3 else @du_A2 endif
during_A3 ::= @du_A3
during_B ::= @du_B
during_B1 ::= if true then log("b"); @exit_B1; @exec_Chart; log("tb"); @entry_B2 else @du_B1 endif
during_B2 ::= @du_B2
during_Chart ::= @du_Chart


In [25]:
print(pprint(get_rec_entry_proc(chart.diagram)))

@entry_Chart;
@entry_A;
@entry_A1;
@entry_B;
@entry_B1


In [26]:
print(pprint(get_rec_exit_proc(chart.diagram)))

if B_st == "B1" then
  @exit_B1
elif B_st == "B2" then
  @exit_B2
else
  skip
endif;
@exit_B;
if A_st == "A1" then
  @exit_A1
elif A_st == "A2" then
  @exit_A2
elif A_st == "A3" then
  @exit_A3
else
  skip
endif;
@exit_A;
@exit_Chart


In [27]:
def get_rec_during_proc(state):
    """Return the process for recursively executing an state."""
    procs = []
    # First, execute the during procedure
    procs.append(hcsp.Var(during_proc_name(state)))
    
    # Next, recursively execute child states
    if state.children:
        if isinstance(state.children[0], AND_State):
            for child in state.children:
                procs.append(get_rec_during_proc(child))
        elif isinstance(state.children[0], OR_State):
            ite = []
            for child in state.children:
                ite.append((expr.RelExpr("==", expr.AVar(active_state_name(state)), expr.AConst(child.name)),
                            get_rec_during_proc(child)))
            procs.append(hcsp.ITE(ite))
        else:
            raise TypeError
    return hcsp.seq(procs)

In [28]:
print(pprint(get_rec_during_proc(chart.diagram)))

@during_Chart;
@during_A;
if A_st == "A1" then
  @during_A1
elif A_st == "A2" then
  @during_A2
elif A_st == "A3" then
  @during_A3
else
  skip
endif;
@during_B;
if B_st == "B1" then
  @during_B1
elif B_st == "B2" then
  @during_B2
else
  skip
endif


In [29]:
def get_procs(chart):
    """Returns the list of procedures."""
    all_procs = []

    for ssid, state in chart.all_states.items():
        all_procs.append(hcsp.Procedure(en_proc_name(state), get_en_proc(state)))
        all_procs.append(hcsp.Procedure(du_proc_name(state), get_du_proc(state)))
        all_procs.append(hcsp.Procedure(ex_proc_name(state), get_ex_proc(state)))
        all_procs.append(hcsp.Procedure(entry_proc_name(state), get_entry_proc(state)))
        all_procs.append(hcsp.Procedure(during_proc_name(state), get_during_proc(state)))
        all_procs.append(hcsp.Procedure(exit_proc_name(state), get_exit_proc(state)))
    
    all_procs.extend([
        hcsp.Procedure(init_name(), get_init_proc()),
        hcsp.Procedure(exec_name(), get_exec_proc())
    ])

    return all_procs

In [30]:
def get_toplevel_process(chart):
    """Returns the top-level process for chart."""
    return hcsp.Sequence(
        hcsp.Var(init_name()),
        hcsp.Loop(hcsp.Sequence(
            hcsp.Var(exec_name()),
            hcsp.Wait(expr.AConst(0.1))
        ))
    )

In [31]:
procs = get_procs(chart)
for proc in procs:
    print(proc.name + " ::= " + str(proc.hp))

en_A ::= skip
du_A ::= skip
ex_A ::= skip
entry_A ::= @en_A
during_A ::= @du_A
exit_A ::= @ex_A
en_A1 ::= skip
du_A1 ::= skip
ex_A1 ::= skip
entry_A1 ::= A_st := "A1"; @en_A1
during_A1 ::= if true then @exit_A1; @entry_A2 else @du_A1 endif
exit_A1 ::= @ex_A1; A_st := ""
en_A2 ::= log("en_A2")
du_A2 ::= skip
ex_A2 ::= skip
entry_A2 ::= A_st := "A2"; @en_A2
during_A2 ::= if true then @exit_A2; @entry_A3 else @du_A2 endif
exit_A2 ::= @ex_A2; A_st := ""
en_A3 ::= log("en_A3")
du_A3 ::= skip
ex_A3 ::= skip
entry_A3 ::= A_st := "A3"; @en_A3
during_A3 ::= @du_A3
exit_A3 ::= @ex_A3; A_st := ""
en_B ::= skip
du_B ::= skip
ex_B ::= skip
entry_B ::= @en_B
during_B ::= @du_B
exit_B ::= @ex_B
en_B1 ::= skip
du_B1 ::= skip
ex_B1 ::= skip
entry_B1 ::= B_st := "B1"; @en_B1
during_B1 ::= if true then log("b"); @exit_B1; @exec_Chart; log("tb"); @entry_B2 else @du_B1 endif
exit_B1 ::= @ex_B1; B_st := ""
en_B2 ::= log("en_B2")
du_B2 ::= skip
ex_B2 ::= skip
entry_B2 ::= B_st := "B2"; @en_B2
during_B2 ::= @

In [32]:
hp = get_toplevel_process(chart)
print(pprint(hp))

@init_Chart;
(
  @exec_Chart;
  wait(0.1)
)**
