In [1]:
from lark import Lark, Transformer, Token
from dataclasses import dataclass, field
from typing import List, Optional, Any, Dict, Sequence
from enum import Enum

In [2]:
hoa_grammar = r"""
    start: header "--BODY--" body "--END--"

    header: header_item*
    header_item: version
               | properties
               | state_count
               | start_state
               | atomic_prepositions
               | acceptance
               | tool
               | name
               | owl_args
               | acc_name

    version: "HOA:" /v\d+/
    tool: "tool:" ESCAPED_STRING+
    name: "name:" ESCAPED_STRING
    owl_args: "owlArgs:" ESCAPED_STRING+
    start_state: "Start:" INT
    acc_name: "acc-name:" ("generalized-Buchi" INT | "Buchi")
    acceptance: "Acceptance:" INT acceptance_cond*
    properties: ("properties:" PROPERTY+)+
    atomic_prepositions: "AP:" INT (ESCAPED_STRING)*
    state_count: "States:" INT

    acceptance_cond: "Inf(" INT ")" ("&" "Inf(" INT ")")*

    body: state_chunk*
    state_chunk: state_name edge*
    state_name: "State:" label? INT ESCAPED_STRING? acc_sig?
    edge: label? INT acc_sig?
    label: "[" label_expr "]"
    label_expr: boolean
              | INT
              | LOGIC_NOT label_expr
              | L_PAR label_expr R_PAR
              | label_expr LOGIC_OP label_expr
    boolean: BOOLEAN_TRUE | BOOLEAN_FALSE
    acc_sig: "{" INT* "}"


    IDENTIFIER: /[a-zA-Z_][0-9a-zA-Z_-]*/
    STRING: /[a-zA-Z_-]+/
    PROPERTY: "state-labels" | "trans-labels" | "implicit-labels" | "explicit-labels"
             | "state-acc" | "trans-acc" | "univ-branch" | "no-univ-branch"
             | "deterministic" | "complete" | "unambiguous" | "stutter-invariant"
             | "weak" | "very-weak" | "inherently-weak" | "terminal" | "tight"
             | "colored"
    LOGIC_OP: "&" | "|"
    LOGIC_NOT: "!"
    BOOLEAN_TRUE: "true" | "t"
    BOOLEAN_FALSE: "false" | "f"
    L_PAR: "("
    R_PAR: ")"

    %import common.ESCAPED_STRING
    %import common.INT
    %import common.WS
    %ignore WS
"""


In [3]:
class HOA_Transformer(Transformer):
    def start(self, items):
        return {
            'header': items[0],
            'body': items[1]
        }

    def header_item(self, items):
        return items[0]

    def header(self, items):
        result = {
            'version': None,
            'tool': None,
            'name': None,
            'owl_args': None,
            'start_state': None,
            'acc_name': None,
            'acceptance': None,
            'properties': None,
            'ap_decl': None,
            'state_count': None
        }
        for subdict in items:
            for key, val in subdict.items():
                result[key] = val
        return result

    def version(self, items):
        return {'version': items[0].value}

    def tool(self, items):
        vals = [s.strip('"') for s in items]
        return {'tool': vals}

    def name(self, items):
        return {'name': items[0].strip('"')}

    def owl_args(self, items):
        vals = [s.strip('"') for s in items]
        return {'owl_args': vals}

    def start_state(self, items):
        return {'start_state': int(items[0].value)}

    def acc_name(self, items):
        return {'acc_name': " ".join(str(i.value if isinstance(i, Token) else i) for i in items)}

    def acceptance(self, items):
        acc_count = int(items[0].value)
        conds = list()
        for c in items[1:]:
            conds.extend(c)
        return {'acceptance': {"buchi_count": acc_count, "buchi_sets": conds}}

    def properties(self, items):
        return {'properties': [p.value for p in items]}

    def atomic_prepositions(self, items):
        count = int(items[0].value)
        props = [it.value.strip('"') for it in items[1:]]
        return {'ap_decl': {'count': count, 'propositions': props}}

    def state_count(self, items):
        return {'state_count': int(items[0])}

    def acceptance_cond(self, items):
        return [int(i) for i in items]

    def body(self, items):
        return {'states': items}

    def state_chunk(self, items):
        state_info = items[0]
        edges = items[1:]
        return {
            'state': state_info,
            'edges': edges
        }

    def state_name(self, items):
        idx = 0
        label_ = None
        if isinstance(items[idx], dict) and 'label' in items[idx]:
            label_ = items[idx]['label']
            idx += 1
        state_id = int(items[idx].value)
        idx += 1
        name_ = None
        if idx < len(items) and isinstance(items[idx], Token) and items[idx].type == 'ESCAPED_STRING':
            name_ = items[idx].value.strip('"')
            idx += 1
        acc_sig_ = None
        if idx < len(items) and isinstance(items[idx], dict) and 'acc_sig' in items[idx]:
            acc_sig_ = items[idx]['acc_sig']
            idx += 1

        return {
            'label': label_,
            'state_id': state_id,
            'dstring': name_,
            'acc_sig': acc_sig_
        }

    def edge(self, items):
        idx = 0
        label_ = None
        if isinstance(items[idx], dict) and 'label' in items[idx]:
            label_ = items[idx]['label']
            idx += 1

        dest_state = int(items[idx].value)
        idx += 1

        acc_sig_ = None
        if idx < len(items) and isinstance(items[idx], dict) and 'acc_sig' in items[idx]:
            acc_sig_ = items[idx]['acc_sig']

        return {
            'label': label_,
            'destination': dest_state,
            'acc_sig': acc_sig_
        }

    def label(self, items):
        return {'label': items[0]}

    def acc_sig(self, items):
        return {'acc_sig': [int(t.value) for t in items]}

    def label_expr(self, items):
        if len(items) == 1:
            it = items[0]
            return str(it.value if isinstance(it, Token) else it)
        if len(items) == 2:
            op, subexpr = items
            if isinstance(op, Token) and op.type == "LOGIC_NOT":
                return f"!{subexpr}"
            else:
                raise ValueError(f"Unexpected 2-item label_expr: {op=}, {subexpr=}")
        if len(items) == 3:
            a, op, b = items
            if isinstance(op, Token) and op.type == "LOGIC_OP":
                return f"{a} {op.value} {b}"
            if (isinstance(a, Token) and a.type == "L_PAR") and (isinstance(b, Token) and b.type == "R_PAR"):
                return f"({op})"
            raise ValueError(f"Unknown 3-item label_expr pattern: {items}")
        raise ValueError(f"Invalid label expression: {items}")

    def boolean(self, items):
        val = items[0].value
        return "t" if val in ["true", "t"] else "f"


# Utils

In [4]:
class AcceptanceStatus(Enum):
    Accepting = "accepting"
    NonAccepting = "non-accepting"

    def __str__(self):
        if self == AcceptanceStatus.Accepting:
            return "(●)"
        return "( )"

    @classmethod
    def from_str(cls, value: str):
        if value in ["accepting", "acc"]:
            return cls.Accepting
        return cls.NonAccepting

class AutomataTransitionType(Enum):
    Epsilon = "epsilon"
    Propositional = "propositional"

    @classmethod
    def from_label(cls, from_label: str):
        value = from_label.strip()
        if value in ["epsilon", "e", "any", "t", True, ""] or len(value) == 0:
            return cls.Epsilon
        return cls.Propositional

    def to_string(self, label):
        return "ε" if self != AutomataTransitionType.Propositional else label

In [5]:
@dataclass
class AutomataTransition:
    destination: int
    acc_sig: List[int] = field(default_factory=list)
    label: Optional[str] = ""
    acceptance_status: AcceptanceStatus = field(init=False, default=AcceptanceStatus.NonAccepting)
    type: AutomataTransitionType = field(init=False, default=AutomataTransitionType.Propositional)

    def __post_init__(self):
        self.acceptance_status = AcceptanceStatus.Accepting if self.acc_sig else AcceptanceStatus.NonAccepting
        self.type = AutomataTransitionType.from_label(self.label)
        if self.type == AutomataTransitionType.Epsilon:
            self.label = ""

    def __str__(self):
        return f"[{self.type.to_string(self.label)}] -> {self.destination}" + (" {" + ",".join(map(str, self.acc_sig)) + "}" if self.acc_sig else "")

In [6]:
@dataclass
class AutomataState:
    state_id: int
    acc_sig: List[int] = field(default_factory=list)
    transitions: List[AutomataTransition] = field(default_factory=list)
    label: Optional[str] = ""
    docString: Optional[str] = ""
    acceptance_status: AcceptanceStatus = field(init=False, default=AcceptanceStatus.NonAccepting)

    def __post_init__(self):
        self.acceptance_status = AcceptanceStatus.Accepting if self.acc_sig else AcceptanceStatus.NonAccepting

    def __str__(self):
        parts = [str(self.acceptance_status), f" {self.state_id}"]
        if self.label:
            parts.append(f" ({self.label})")
        if self.docString:
            parts.append(f' "{self.docString}"')
        if self.acc_sig:
            acc_str = ",".join(map(str, self.acc_sig))
            parts.append(f" {{{acc_str}}}")
        header_str = "".join(parts)
        transitions_str_list = []
        sp = "\n\t "
        for tr in self.transitions:
            transitions_str_list.append(sp + str(tr))

        return header_str + "".join(transitions_str_list)

In [7]:
def build_automata_states(parsed_body: Dict[str, Any]) -> List[AutomataState]:
    result_states: List[AutomataState] = []
    for chunk in parsed_body.get("states", []):
        state_info = chunk["state"]
        state_label = state_info["label"]
        doc_string = state_info["dstring"]
        acc_sig_list = state_info["acc_sig"] if state_info["acc_sig"] is not None else []

        transitions_list: List[AutomataTransition] = []
        for edge_info in chunk["edges"]:
            edge_label = edge_info["label"]
            edge_acc_sig = edge_info["acc_sig"] if edge_info["acc_sig"] is not None else []
            trans = AutomataTransition(
                label=edge_label,
                acc_sig=edge_acc_sig,
                destination=edge_info["destination"]
            )
            transitions_list.append(trans)

        state_obj = AutomataState(
            state_id=state_info["state_id"],
            acc_sig=acc_sig_list,
            transitions=transitions_list,
            label=state_label,
            docString=doc_string
        )
        result_states.append(state_obj)

    return result_states

In [8]:
def convert_to_state_acceptance(states: List[AutomataState]):
    state_count = len(states)
    for st_idx in range(state_count):
        for tr_idx in range(len(states[st_idx].transitions)):
            if states[st_idx].transitions[tr_idx].acc_sig:
                back_tr = AutomataTransition(
                    destination=states[st_idx].state_id,
                    acc_sig=[],
                    label="",
                )
                forward_tr = AutomataTransition(
                    destination=state_count,
                    acc_sig=[],
                    label=states[st_idx].transitions[tr_idx].label,
                )
                new_state = AutomataState(
                    state_id=state_count,
                    acc_sig=states[st_idx].transitions[tr_idx].acc_sig,
                    transitions=[back_tr]
                )
                states[st_idx].transitions[tr_idx] = forward_tr
                states.append(new_state)
                state_count += 1

In [9]:
def build_graph(states: List[AutomataState]) -> Dict[int, List[int]]:
    return {
        int(st.state_id): [int(tr.destination) for tr in st.transitions]
        for st in states
    }

def tarjan_scc(graph: Dict[int, List[int]]) -> List[List[int]]:
    index = 0
    indices = {}
    lowlink = {}
    stack = []
    on_stack = set()
    sccs = []

    def strong_connect(v: int):
        nonlocal index
        indices[v] = index
        lowlink[v] = index
        index += 1
        stack.append(v)
        on_stack.add(v)

        for w in graph.get(v, []):
            if w not in indices:
                strong_connect(w)
                lowlink[v] = min(lowlink[v], lowlink[w])
            elif w in on_stack:
                lowlink[v] = min(lowlink[v], indices[w])

        if lowlink[v] == indices[v]:
            scc = []
            while True:
                w = stack.pop()
                on_stack.remove(w)
                scc.append(w)
                if w == v:
                    break
            sccs.append(scc)

    for _v in graph.keys():
        if _v not in indices:
            strong_connect(_v)

    return sccs

def is_bottom_scc(scc: Sequence[int], graph: Dict[int, List[int]]) -> bool:
    """
    Checks if an SCC is a bottom SCC.
    An SCC is a bottom SCC if none of its nodes have outgoing edges to nodes outside the SCC.
    """
    for node in scc:
        for neighbor in graph.get(node, []):
            if neighbor not in scc:
                return False
    return True

def find_bottom_sccs_covering_accepting_sink_sets(states: List[AutomataState], accepting_sink_sets_id: set[int]) -> List[List[str]]:
    graph = build_graph(states)
    sccs = tarjan_scc(graph)
    result_sccs = []

    for scc in sccs:
        if not is_bottom_scc(scc, graph):
            continue
        accepting_signatures = set()
        for state_id in scc:
            state = states[state_id]
            accepting_signatures.update(state.acc_sig)
        if accepting_sink_sets_id.issubset(accepting_signatures):
            result_sccs.append(scc)
    return [list(map(str, scc)) for scc in result_sccs]

In [10]:
class HOAParsedHeaderHelper:
    @staticmethod
    def extract_start_state_id(parsed_tree):
        return str(parsed_tree['header']['start_state'].children[0].value)

    @staticmethod
    def extract_accepting_sink_sets_id(parsed_tree):
        return [
            str(ch)
            for ch in parsed_tree['header']['acceptance'].children[1:][0]
        ]

    @staticmethod
    def extract_atomic_propositions_to_symbol(parsed_tree):
        return {
            int(v): str(k)
            for k, v in parsed_tree['header']['ap_decl']['propositions'].items()
        }

    @staticmethod
    def extract_useful_header_info(parsed_tree):
        return {
            'start_state_id': HOAParsedHeaderHelper.extract_start_state_id(parsed_tree),
            'accepting_sink_sets_id': HOAParsedHeaderHelper.extract_accepting_sink_sets_id(parsed_tree),
            'atomic_symbol_to_propositions': HOAParsedHeaderHelper.extract_atomic_propositions_to_symbol(parsed_tree)
        }


In [11]:
# class HOAParsedBodyHelper:
#     @staticmethod
#     def _extract_state_id(parsed_state):
#         return parsed_state['state_id'].value
#
#     @staticmethod
#     def _label_walk_helper(transition_label):
#         """
#         Recursively walks through the parsed tree of a transition label
#         and converts it into a string representation.
#         """
#         if isinstance(transition_label, Tree):
#             return "".join([HOAParsedBodyHelper._label_walk_helper(child) for child in transition_label.children])
#         elif isinstance(transition_label, Token):
#             return transition_label.value
#         return str(transition_label)
#
#     @staticmethod
#     def _extract_acc_sig(acc_sig):
#         if acc_sig is None:
#             return []
#         return [
#             ch.value for ch in acc_sig.children
#         ]
#
#     @staticmethod
#     def extract_transitions(state_transitions):
#         return [
#             HOAAutomataTransition(
#                 label=HOAParsedBodyHelper._label_walk_helper(tr['label']),
#                 destination=tr['destination'],
#                 accepting_signature=HOAParsedBodyHelper._extract_acc_sig(tr['acc_sig'])
#             )
#             for tr in state_transitions
#         ]
#
#     @staticmethod
#     def extract_states(parsed_tree):
#         return [
#             HOAAutomataState(
#                 state_id=HOAParsedBodyHelper._extract_state_id(st),
#                 transitions=HOAParsedBodyHelper.extract_transitions(st['transitions'])
#             )
#             for st in parsed_tree['body']['states']
#         ]


In [12]:
# class HOAParser:
#     __slots__ = ["parser"]
#
#     def __init__(self):
#         self.parser = Lark(hoa_grammar, parser='lalr', transformer=HOA_Transformer())
#
#     def __call__(self, hoa_format_ldba):
#         ldba = self.parser.parse(hoa_format_ldba)
#         return {
#             'header': HOAParsedHeaderHelper.extract_useful_header_info(ldba),
#             'states': HOAParsedBodyHelper.extract_states(ldba)
#         }


# Test trans-acc

In [13]:
_hoa_input = '''
HOA: v1
tool: "owl ltl2ldgba" "21.0"
name: "Automaton for ((F(a)) & (F(b)) & (F(G(c))))"
owlArgs: "ltl2ldgba" "-f" "F a & F b & F G c"
Start: 0
acc-name: Buchi
Acceptance: 1 Inf(0)
properties: trans-acc no-univ-branch
AP: 3 "a" "b" "c"
--BODY--
State: 0
[!0 & 1] 1
[!0 & !1] 0
[0 & 1] 2
[0 & !1] 3
State: 1
[!0] 1
[0] 2
State: 2
[t] 2
[2] 4
State: 3
[1] 2
[!1] 3
State: 4
[2] 4 {0}
--END--
'''

# _parser = HOAParser()
# _parsed_hoa = _parser(_hoa_input)
parser = Lark(hoa_grammar, parser='lalr', transformer=HOA_Transformer())
_parsed_hoa = parser.parse(_hoa_input)

In [14]:
import json
print(json.dumps(_parsed_hoa["header"], indent=2))

{
  "version": "v1",
  "tool": [
    "owl ltl2ldgba",
    "21.0"
  ],
  "name": "Automaton for ((F(a)) & (F(b)) & (F(G(c))))",
  "owl_args": [
    "ltl2ldgba",
    "-f",
    "F a & F b & F G c"
  ],
  "start_state": 0,
  "acc_name": "",
  "acceptance": {
    "buchi_count": 1,
    "buchi_sets": [
      0
    ]
  },
  "properties": [
    "trans-acc",
    "no-univ-branch"
  ],
  "ap_decl": {
    "count": 3,
    "propositions": [
      "a",
      "b",
      "c"
    ]
  },
  "state_count": null
}


In [15]:
print(_parsed_hoa["body"])

{'states': [{'state': {'label': None, 'state_id': 0, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': '!0 & 1', 'destination': 1, 'acc_sig': None}, {'label': '!0 & !1', 'destination': 0, 'acc_sig': None}, {'label': '0 & 1', 'destination': 2, 'acc_sig': None}, {'label': '0 & !1', 'destination': 3, 'acc_sig': None}]}, {'state': {'label': None, 'state_id': 1, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': '!0', 'destination': 1, 'acc_sig': None}, {'label': '0', 'destination': 2, 'acc_sig': None}]}, {'state': {'label': None, 'state_id': 2, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': 't', 'destination': 2, 'acc_sig': None}, {'label': '2', 'destination': 4, 'acc_sig': None}]}, {'state': {'label': None, 'state_id': 3, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': '1', 'destination': 2, 'acc_sig': None}, {'label': '!1', 'destination': 3, 'acc_sig': None}]}, {'state': {'label': None, 'state_id': 4, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': '2', 'de

In [16]:
ldba = build_automata_states(_parsed_hoa["body"])
convert_to_state_acceptance(ldba)
bottom = find_bottom_sccs_covering_accepting_sink_sets(ldba, set(_parsed_hoa["header"]["acceptance"]["buchi_sets"]))
accepting_nodes = {int(node) for component in bottom for node in component}
for node in accepting_nodes:
    ldba[node].acceptance_status = AcceptanceStatus.Accepting
for st in ldba:
    print("-", st)

- ( ) 0
	 [!0 & 1] -> 1
	 [!0 & !1] -> 0
	 [0 & 1] -> 2
	 [0 & !1] -> 3
- ( ) 1
	 [!0] -> 1
	 [0] -> 2
- ( ) 2
	 [ε] -> 2
	 [2] -> 4
- ( ) 3
	 [1] -> 2
	 [!1] -> 3
- (●) 4
	 [2] -> 5
- (●) 5 {0}
	 [ε] -> 4


# Test state-acc

In [17]:
_hoa_input = '''
HOA: v1
properties: no-univ-branch state-acc
States: 6
Start: 0
AP: 3 "a" "b" "c"
Acceptance: 1 Inf(0)
acc-name: Buchi
name: "Automaton for ((F(a)) & (F(b)) & (F(G(c))))"
tool: "owl ltl2ldgba" "21.0"
--BODY--
State: 0
[!0 & !1] 0
[!0 & 1] 1
[0 & !1] 2
[0 & 1] 3
State: 1
[!0] 1
[0] 3
State: 2
[!1] 2
[1] 3
State: 3
[t] 3
[2] 4
State: 4
[2] 5
State: 5 {0}
[2] 5
--END--
'''

# _parser = HOAParser()
# _parsed_hoa = _parser(_hoa_input)
parser = Lark(hoa_grammar, parser='lalr', transformer=HOA_Transformer())
_parsed_hoa = parser.parse(_hoa_input)

In [18]:
import json
print(json.dumps(_parsed_hoa["header"], indent=2))

{
  "version": "v1",
  "tool": [
    "owl ltl2ldgba",
    "21.0"
  ],
  "name": "Automaton for ((F(a)) & (F(b)) & (F(G(c))))",
  "owl_args": null,
  "start_state": 0,
  "acc_name": "",
  "acceptance": {
    "buchi_count": 1,
    "buchi_sets": [
      0
    ]
  },
  "properties": [
    "no-univ-branch",
    "state-acc"
  ],
  "ap_decl": {
    "count": 3,
    "propositions": [
      "a",
      "b",
      "c"
    ]
  },
  "state_count": 6
}


In [19]:
print(_parsed_hoa["body"])

{'states': [{'state': {'label': None, 'state_id': 0, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': '!0 & !1', 'destination': 0, 'acc_sig': None}, {'label': '!0 & 1', 'destination': 1, 'acc_sig': None}, {'label': '0 & !1', 'destination': 2, 'acc_sig': None}, {'label': '0 & 1', 'destination': 3, 'acc_sig': None}]}, {'state': {'label': None, 'state_id': 1, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': '!0', 'destination': 1, 'acc_sig': None}, {'label': '0', 'destination': 3, 'acc_sig': None}]}, {'state': {'label': None, 'state_id': 2, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': '!1', 'destination': 2, 'acc_sig': None}, {'label': '1', 'destination': 3, 'acc_sig': None}]}, {'state': {'label': None, 'state_id': 3, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': 't', 'destination': 3, 'acc_sig': None}, {'label': '2', 'destination': 4, 'acc_sig': None}]}, {'state': {'label': None, 'state_id': 4, 'dstring': None, 'acc_sig': None}, 'edges': [{'label': '2', 'de

In [20]:
ldba = build_automata_states(_parsed_hoa["body"])
convert_to_state_acceptance(ldba)
bottom = find_bottom_sccs_covering_accepting_sink_sets(ldba, set(_parsed_hoa["header"]["acceptance"]["buchi_sets"]))
accepting_nodes = {int(node) for component in bottom for node in component}
for node in accepting_nodes:
    ldba[node].acceptance_status = AcceptanceStatus.Accepting
for st in ldba:
    print("-", st)

- ( ) 0
	 [!0 & !1] -> 0
	 [!0 & 1] -> 1
	 [0 & !1] -> 2
	 [0 & 1] -> 3
- ( ) 1
	 [!0] -> 1
	 [0] -> 3
- ( ) 2
	 [!1] -> 2
	 [1] -> 3
- ( ) 3
	 [ε] -> 3
	 [2] -> 4
- ( ) 4
	 [2] -> 5
- (●) 5 {0}
	 [2] -> 5
