### Short version

Same thing as in process_mining_AST.ipynb, but short

In [1]:
import pm4py

from abc import ABC, abstractmethod
from typing import List, Any, cast
import json, re, subprocess, os, shutil
from pathlib import Path
from collections import defaultdict

In [2]:
def enumerate_tau(node, counter):
    if (not hasattr(node, "children") or not node.children) and getattr(node, "label", None) is None:
        node.label = f"tau_{counter[0]}"
        counter[0] += 1

    if hasattr(node, "children") and node.children is not None:
        for child in node.children:
            enumerate_tau(child, counter)

none_counter = [1]

In [3]:
class ASTNode(ABC):
    @abstractmethod
    def to_string(self, depth: int = 0) -> str:
        pass

class LiteralASTNode(ASTNode):
    def __init__(self, label: str):
        self.label = label

    def to_string(self, depth: int = 0) -> str:
        indent = "    " * depth
        return f"{indent}Literal: '{self.label}'"

class OperatorASTNode(ASTNode):
    def __init__(self, operator: str, children: List[ASTNode]):
        self.operator = operator
        self.children = children

    def to_string(self, depth: int = 0) -> str:
        indent = "    " * depth
        child_str = "\n".join(child.to_string(depth + 1) for child in self.children)
        return f"{indent}Operator: {self.operator}\n{child_str}"


In [4]:
def process_tree_to_ast(pt: Any) -> ASTNode:
    if hasattr(pt, "children") and pt.children:
        op = getattr(pt, "operator", None) or "Seq"
        children_ast = [process_tree_to_ast(c) for c in pt.children]
        return OperatorASTNode(operator=str(op), children=children_ast)
    else:
        label = getattr(pt, "label", None) or "tau"
        return LiteralASTNode(label=str(label))

In [5]:
class ASTVisitor(ABC):
    @abstractmethod
    def visit_literal(self, node: LiteralASTNode):
        pass

    @abstractmethod
    def visit_operator(self, node: OperatorASTNode):
        pass

    def visit(self, node: ASTNode) -> Any:
        if isinstance(node, LiteralASTNode):
            lit = cast(LiteralASTNode, node)
            return self.visit_literal(lit)
        elif isinstance(node, OperatorASTNode):
            op = cast(OperatorASTNode, node)
            return self.visit_operator(op)
        else:
            ...


In [6]:
class LogicalSpecificationVisitor(ASTVisitor):
    def visit_literal(self, node: LiteralASTNode):
        return f"'{node.label}'"

    def visit_operator(self, node: OperatorASTNode):
        op = node.operator.lower()
        if op == "seq":
            if len(node.children) == 2:
                ini = self.visit(node.children[0])
                fin = self.visit(node.children[1])
                return [
                    f"{ini} => <> {fin}",
                    f"~{ini} => ~<> {fin}",
                    f"[]~({ini} & {fin})"
                ]
            else:
                return [
                    f"{self.visit(node.children[i])} => <> {self.visit(node.children[i+1])}"
                    for i in range(len(node.children)-1)
                ]
        else:
            parts = []
            for c in node.children:
                r = self.visit(c)
                parts.append(r[0] if isinstance(r, list) else r)
            args_str = ", ".join(parts)
            return [f"{node.operator}({args_str})"]


visitor = LogicalSpecificationVisitor()

In [7]:
class Format:
    @staticmethod
    def lowercase_in_quotes(s: str) -> str:
        def _repl(m: re.Match) -> str:
            inner = m.group(1)
            return f"'{inner.lower()}'"
        return re.sub(r"'([^']*)'", _repl, s)

    @staticmethod
    def replace_parens_in_quotes(s: str) -> str:
        def _repl(m: re.Match) -> str:
            inner = m.group(1)
            inner = inner.replace('(', '-').replace(')', '-')
            return f"'{inner}'"
        return re.sub(r"'([^']*)'", _repl, s)

    @staticmethod
    def replace_colons_in_quotes(s: str) -> str:
        def _repl(match):
            inner = match.group(1)
            return f"'{inner.replace(':', '_')}'"
        return re.sub(r"'([^']*)'", _repl, s)

    @staticmethod
    def replace_spaces_with_underscore(tree):
        def replace_spaces(match):
            return re.sub(r'\s+', '_', match.group(0))

        text_with_underscore = re.sub(r"'(.*?)'", replace_spaces, tree)
        text_no_quotes = re.sub(r"'", '', text_with_underscore)

        return text_no_quotes

    @staticmethod
    def label_expressions(expression: str) -> str:
        labelled_expression = ""
        label_number = 0
        for c in expression:
            if c == '(':
                label_number += 1
                labelled_expression += f"({label_number}]"
            elif c == ')':
                labelled_expression += f"[{label_number})"
                label_number -= 1
            else:
                labelled_expression += c
        return labelled_expression

In [8]:
class ProcessTreeAdapter:
    @staticmethod
    def extract_arguments_from_labelled_expression(labelled_expression):
        pattern_label_number = int(labelled_expression[labelled_expression.index(
            "(") + 1:labelled_expression.index("]")])
        trimmed_labelled_expression = labelled_expression[labelled_expression.index("]") + 1: list(re.finditer(r'\[', labelled_expression))[-1].start()]
        split = trimmed_labelled_expression.split(",")
        arguments = []
        brackets_counter = 0
        temp_arg = ""
        for s in split:
            brackets_counter += s.count('(')
            brackets_counter -= s.count(')')
            temp_arg += s + ","
            if brackets_counter == 0:
                temp_arg = temp_arg[:-1]
                arguments.append(temp_arg)
                temp_arg = ""
        return arguments, pattern_label_number

    @staticmethod
    def find_symbol(labelled_expression):
        pattern = r'^[^()]*'
        match = re.match(pattern, labelled_expression)
        if match:
            return re.sub(r'\s+', '', match.group())
        else:
            raise Exception("No match")

    @staticmethod
    def replace_symbol_with_name(labelled_pattern_expression, pattern_label_number, old_symbol, new_name):
        pattern = rf"{re.escape(old_symbol)}\(\s*{pattern_label_number}\s*\]"
        final_name = f"{new_name}({pattern_label_number}]"
        replaced_string = re.sub(pattern, final_name, labelled_pattern_expression, count=1)

        return replaced_string

    @staticmethod
    def get_highest_label(labelledExpression: str) -> int:
        maxLabel = -1
        active = False
        sb = ""
        for c in labelledExpression:
            if c == '(':
                active = True
            elif c == ']':
                if int(sb) > maxLabel:
                    maxLabel = int(sb)
                sb = ""
                active = False
            elif active:
                sb += c
        return maxLabel

In [9]:
class Sequence:
    _counter = defaultdict(int)

    @staticmethod
    def change_symbol_into_name(labelled_expression, pattern_label_number):
        n = len(labelled_expression)
        if not (2 <= n <= 5):
            raise Exception(f"Pattern for Seq{n} does not exist")

        Sequence._counter[n] += 1

        pattern_name = f'Seq{n}'
        old = f"({pattern_label_number}]" + ",".join(labelled_expression) + f"[{pattern_label_number})"
        new = old

        return pattern_name, old, new


class Loop:
    _counter = defaultdict(int)

    @staticmethod
    def change_symbol_into_name(labelled_expression, pattern_label_number):
        n = len(labelled_expression)
        if not (2 <= n <= 30):
            raise Exception(f"Pattern for Loop{n} does not exist")

        Loop._counter[n] += 1
        idx = Loop._counter[n]

        pattern_name = f'Loop{n}'
        old = "(" + str(pattern_label_number) + "]" + ",".join(labelled_expression) + "[" + str(pattern_label_number) + ")"

        start = f'l{n}_s_{idx}'
        end   = f'l{n}_e_{idx}'
        labelled_expression = [start] + labelled_expression + [end]

        new = "(" + str(pattern_label_number) + "]" + ",".join(labelled_expression) + "[" + str(pattern_label_number) + ")"
        return pattern_name, old, new


class ExclusiveChoice:
    _counter = defaultdict(int)

    @staticmethod
    def change_symbol_into_name(labelled_expression, pattern_label_number):
        n = len(labelled_expression)
        if not (2 <= n <= 30):
            raise Exception(f"Pattern for Xor{n} does not exist")

        ExclusiveChoice._counter[n] += 1
        idx = ExclusiveChoice._counter[n]
        pattern_name = f'Xor{n}'
        labelled_expression_to_replace = "(" + str(pattern_label_number) + "]" + ",".join(
            labelled_expression) + "[" + str(pattern_label_number) + ")"

        start = f'x{n}_s_{idx}'
        end   = f'x{n}_e_{idx}'
        labelled_expression = [start] + labelled_expression + [end]

        new_labelled_expression = "(" + str(pattern_label_number) + "]" + ",".join(
            labelled_expression) + "[" + str(pattern_label_number) + ")"
        return pattern_name, labelled_expression_to_replace, new_labelled_expression


class Parallelism:
    _counter = defaultdict(int)

    @staticmethod
    def change_symbol_into_name(labelled_expression, pattern_label_number):
        n = len(labelled_expression)

        if not (2 <= n <= 30):
            raise Exception(f"Pattern for And{n} does not exist")

        Parallelism._counter[n] += 1
        idx = Parallelism._counter[n]

        pattern_name = f'And{n}'
        labelled_expression_to_replace = "(" + str(pattern_label_number) + "]" + ",".join(
            labelled_expression) + "[" + str(pattern_label_number) + ")"

        start = f'a{n}_s_{idx}'
        end   = f'a{n}_e_{idx}'
        labelled_expression = [start] + labelled_expression + [end]

        new_labelled_expression = "(" + str(pattern_label_number) + "]" + ",".join(
            labelled_expression) + "[" + str(pattern_label_number) + ")"
        return pattern_name, labelled_expression_to_replace, new_labelled_expression

In [10]:
class PatternExpressionGenerator:
    def __init__(self, converted_expression):
        self.converted_expression = converted_expression

    def add_approved_workflow_patterns(self, expression):
        if expression is None or isinstance(expression, list):
            return

        symbol = ProcessTreeAdapter.find_symbol(expression)
        up = symbol.upper()
        if up.startswith("SEQ"):
            symbol = "->"
        elif up.startswith("AND"):
            symbol = "+"
        elif up.startswith("XOR"):
            symbol = "X"
        elif up.startswith("LOOP"):
            symbol = "*"

        pattern = r'>|X|\+|\*'
        matches = re.findall(pattern, expression)

        if len(matches) != 0:
            arguments = ProcessTreeAdapter.extract_arguments_from_labelled_expression(
                expression)
            expression = arguments[0]
            pattern_label_number = arguments[1]

            if symbol == '->' or symbol.upper().startswith("SEQ"):
                new_name = Sequence.change_symbol_into_name(expression, pattern_label_number)
                self.converted_expression = ProcessTreeAdapter.replace_symbol_with_name(
                    self.converted_expression, pattern_label_number, symbol, new_name[0])
                expr_tmp = self.converted_expression
                self.converted_expression = expr_tmp.replace(new_name[1], new_name[2])

            elif symbol == '*' or symbol.upper().startswith("LOOP"):
                new_name = Loop.change_symbol_into_name(
                    expression, pattern_label_number)
                self.converted_expression = ProcessTreeAdapter.replace_symbol_with_name(
                    self.converted_expression, pattern_label_number, symbol, new_name[0])
                self.converted_expression = self.converted_expression.replace(
                    new_name[1], new_name[2])

            elif symbol == '+' or symbol.upper().startswith("AND"):
                new_name = Parallelism.change_symbol_into_name(
                    expression, pattern_label_number)
                self.converted_expression = ProcessTreeAdapter.replace_symbol_with_name(
                    self.converted_expression, pattern_label_number, symbol, new_name[0])
                self.converted_expression = self.converted_expression.replace(
                    new_name[1], new_name[2])

            elif symbol == 'X' or symbol.upper().startswith("XOR"):
                new_name = ExclusiveChoice.change_symbol_into_name(
                    expression, pattern_label_number)
                self.converted_expression = ProcessTreeAdapter.replace_symbol_with_name(
                    self.converted_expression, pattern_label_number, symbol, new_name[0])
                self.converted_expression = self.converted_expression.replace(
                    new_name[1], new_name[2])

            else:
                raise Exception(f"Pattern for {symbol} does not exist")

        return expression

    def get_converted_expression(self):
        return self.converted_expression

In [11]:
class GetPatternExpression:

    @staticmethod
    def process_patterns(pattern_list: list, instance) -> list:
        new_pattern_list = []
        for pattern in pattern_list:
            new_pattern = instance.add_approved_workflow_patterns(pattern)
            if isinstance(new_pattern, list):
                new_pattern = GetPatternExpression.process_patterns(new_pattern, instance)
            new_pattern_list.append(new_pattern)
        return new_pattern_list

    @staticmethod
    def recursive_process(pattern_list: list, instance, depth: int) -> list:
        if depth <= 0:
            return pattern_list
        pattern_list = GetPatternExpression.process_patterns(pattern_list, instance)
        return GetPatternExpression.recursive_process(pattern_list, instance, depth - 1)

    @staticmethod
    def get_pattern_expression(labelled_pattern_expression: str) -> str:
        pattern_list = [labelled_pattern_expression]
        generator = PatternExpressionGenerator(labelled_pattern_expression)
        max_label = ProcessTreeAdapter.get_highest_label(labelled_pattern_expression)
        GetPatternExpression.recursive_process(pattern_list, generator, max_label)
        return generator.get_converted_expression()

In [12]:
class WorkflowPatternTemplate:
    def __init__(self, name, number_of_arguments, rules):
        self.name = name
        self.number_of_arguments = number_of_arguments
        self.rules = rules

    @staticmethod
    def load_pattern_property_set(path_to_pattern_rules_file):
        with open(path_to_pattern_rules_file, 'r') as file:
            e_data = json.load(file)
            pattern_property_set = []
            for workflow_pattern_template_name, pattern_descr_json_object in e_data.items():
                number_of_arguments = pattern_descr_json_object["number of args"]
                rules = pattern_descr_json_object["rules"]
                workflow_pattern_template = WorkflowPatternTemplate(
                    workflow_pattern_template_name, number_of_arguments, rules)
                pattern_property_set.append(workflow_pattern_template)
            return pattern_property_set

    def get_name(self):
        return self.name

    def set_name(self, name):
        self.name = name

    def get_number_of_arguments(self):
        return self.number_of_arguments

    def set_number_of_arguments(self, number_of_arguments):
        self.number_of_arguments = number_of_arguments

    def get_rules(self):
        return self.rules

    def set_rules(self, rules):
        self.rules = rules

In [13]:
class WorkflowPattern:
    def __init__(self, workflow_pattern_template, pattern_arguments):
        self.workflow_pattern_template = workflow_pattern_template
        self.pattern_arguments = pattern_arguments

    @staticmethod
    def get_workflow_pattern_from_expression(pattern_expression, pattern_property_set):
        workflow_name = pattern_expression[:pattern_expression.index("(")]
        workflow_pattern_template = next(
            (template for template in pattern_property_set if template.get_name() == workflow_name), None)

        if workflow_pattern_template is None:
            raise Exception("Workflow pattern template not found! Workflow name: " + workflow_name)
        pattern_arguments = WorkflowPattern.extract_arguments_from_labelled_expression(
            pattern_expression, pattern_property_set)
        return WorkflowPattern(workflow_pattern_template, pattern_arguments)

    @staticmethod
    def extract_arguments_from_labelled_expression(labelled_expression, pattern_property_set):
        workflow_name = labelled_expression[:labelled_expression.index("(")]
        workflow_pattern_template = next(
            (template for template in pattern_property_set if template.get_name() == workflow_name), None)
        if workflow_pattern_template is None:
            raise Exception("Workflow pattern template not found!")

        number_of_arguments = int(workflow_pattern_template.get_number_of_arguments())
        pattern_label_number = int(labelled_expression[labelled_expression.index(
            "(") + 1:labelled_expression.index("]")])
        trimmed_labelled_expression = labelled_expression[labelled_expression.index("]") + 1 : list(re.finditer(r'\[', labelled_expression))[-1].start()]

        split = trimmed_labelled_expression.split(",")
        arguments = []
        brackets_counter = 0
        temp_arg = ""
        for s in split:
            brackets_counter += s.count('(')
            brackets_counter -= s.count(')')
            temp_arg += s + ","
            if brackets_counter == 0:
                temp_arg = temp_arg[:-1]
                arguments.append(temp_arg)
                temp_arg = ""

        if len(arguments) != number_of_arguments:
            print(labelled_expression)
            print(pattern_label_number)
            raise Exception("Too much arguments")
        return arguments

    @staticmethod
    def count_occurrence_of_char(string, char):
        return string.count(char)

    @staticmethod
    def is_not_atomic(argument):
        return "=>" in argument or "|" in argument or "^" in argument or "]" in argument

    def get_workflow_pattern_template(self):
        return self.workflow_pattern_template

    def set_workflow_pattern_template(self, workflow_pattern_template):
        self.workflow_pattern_template = workflow_pattern_template

    def get_workflow_pattern_filled_rules(self):
        if len(self.pattern_arguments) > 0:
            outcomes = []
            for outcome in self.workflow_pattern_template.get_rules():
                outcome_with_params = outcome
                for i, arg in enumerate(self.pattern_arguments):
                    outcome_with_params = outcome_with_params.replace(
                        "arg" + str(i), arg)
                outcomes.append(outcome_with_params)
            return outcomes
        else:
            raise Exception(
                "No arguments for the given pattern in the expression")

    def get_pattern_arguments(self):
        return self.pattern_arguments

    def set_pattern_arguments(self, pattern_arguments):
        self.pattern_arguments = pattern_arguments

In [14]:
class CalculatingConsolidatedExpression:

    @staticmethod
    def generate_consolidated_expression(pattern_expression: str, expression_type: str, pattern_property_set: List[WorkflowPatternTemplate]) -> str:

        if expression_type not in ("ini", "fin"):
            raise Exception("Type must equal 'ini' or 'fin'!")

        workflow_pattern = WorkflowPattern.get_workflow_pattern_from_expression(
            pattern_expression, pattern_property_set)
        rules_with_atomic_activities = workflow_pattern.get_workflow_pattern_filled_rules()
        ini = rules_with_atomic_activities[0]
        fin = rules_with_atomic_activities[1]

        if expression_type == "ini":
            ex = ini
        else:
            ex = fin

        expression_arguments = WorkflowPattern.extract_arguments_from_labelled_expression(
            pattern_expression, pattern_property_set)
        for argument in expression_arguments:
            if WorkflowPattern.is_not_atomic(argument):
                inner_consolidated_expression = CalculatingConsolidatedExpression.generate_consolidated_expression(
                    argument, expression_type, pattern_property_set)

                ex = ex.replace(argument, inner_consolidated_expression)
        return ex

In [15]:
class GeneratingLogicalSpecifications:

    @staticmethod
    def generate_logical_specifications(pattern_expression: str, pattern_property_set: List[WorkflowPatternTemplate], verbose=False) -> str:
        logical_specification = []
        labelled_expression = pattern_expression
        highest_label_number = ProcessTreeAdapter.get_highest_label(
            labelled_expression)
        for l in range(highest_label_number, 0, -1):
            c = 1
            pat = GeneratingLogicalSpecifications.get_pat(
                labelled_expression, l, c, pattern_property_set)
            while pat is not None:
                L2 = pat.get_workflow_pattern_filled_rules()
                L2 = L2[2:]
                for arg in pat.get_pattern_arguments():
                    if WorkflowPattern.is_not_atomic(arg):
                        cons = CalculatingConsolidatedExpression.generate_consolidated_expression(
                            arg, "ini", pattern_property_set) + " | " + CalculatingConsolidatedExpression.generate_consolidated_expression(arg, "fin", pattern_property_set)
                        L2_cons = [outcome.replace(arg, cons)
                                   for outcome in L2]
                        L2 = L2_cons
                c += 1
                logical_specification.extend(L2)
                pat = GeneratingLogicalSpecifications.get_pat(
                    labelled_expression, l, c, pattern_property_set)

        logical_specification = list(set(logical_specification))
        connected_string = ""
        if verbose:
            print("\nResult: ")
        for l_value in logical_specification:
            connected_string += l_value + "\n"
            if verbose:
                print(l_value)
        return connected_string

    @staticmethod
    def get_pat(labelled_expression: str, l: int, c: int, pattern_property_set: List[WorkflowPatternTemplate]) -> Any:
        entry_occurrences = labelled_expression.count("(" + str(l) + "]")
        end_occurrences = labelled_expression.count("[" + str(l) + ")")
        if entry_occurrences != end_occurrences:
            raise Exception("(" + str(l) + "] not equal [" + str(l) + ")")

        if entry_occurrences < c:
            return None

        expression_split_by_entry = re.split(rf"\({l}\]", labelled_expression)
        pattern_content = re.split(
            rf"\[{l}\)", expression_split_by_entry[c])[0]
        split_by_bracket = re.split(r"\]", expression_split_by_entry[c - 1])
        workflow_name = re.split(r",", split_by_bracket[-1])[-1]
        workflow_exp = workflow_name + f"({l}]" + pattern_content + f"[{l})"
        return WorkflowPattern.get_workflow_pattern_from_expression(workflow_exp, pattern_property_set)

In [16]:
pattern_rules = "../data/patterns.json"
ltl_pattern_property_set = WorkflowPatternTemplate.load_pattern_property_set(
    pattern_rules)

def get_results(pattern_expression):
    ini = CalculatingConsolidatedExpression.generate_consolidated_expression(
        pattern_expression.replace(" ", ""), "ini", ltl_pattern_property_set)
    print("ini: " + ini)
    fin = CalculatingConsolidatedExpression.generate_consolidated_expression(
        pattern_expression.replace(" ", ""), "fin", ltl_pattern_property_set)
    print("fin: " + fin)

    return GeneratingLogicalSpecifications.generate_logical_specifications(
        pattern_expression.replace(" ", ""), ltl_pattern_property_set)

In [17]:
# If problems if vampire - adjust it to your needs
def _find_vampire() -> str:
    p = shutil.which("vampire")
    if p:
        return p

    candidates = [
        "~/.local/bin/vampire",
        "~/vampire/bin/vampire",
        "~/vampire/bin/vampire_rel",
        "~/vampire",
    ]
    for c in candidates:
        cp = Path(os.path.expanduser(c))
        if cp.exists() and os.access(cp, os.X_OK):
            return str(cp)

    raise FileNotFoundError(
        "No 'vampire'. Try better."
    )

# Main vampire function
def run_vampire(tptp_file_path: str):
    tptp = Path(tptp_file_path).expanduser()
    if not tptp.exists():
        print(f"Plik TPTP nie istnieje: {tptp.resolve()}")
        return

    try:
        vampire_bin = _find_vampire()
        env = os.environ.copy()
        vdir = str(Path(vampire_bin).parent)
        env["PATH"] = vdir + os.pathsep + env.get("PATH", "")

        result = subprocess.run(
            [vampire_bin, "--input_syntax", "tptp", str(tptp)],
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            env=env,
        )

        print("Vampire Result:")
        print(result.stdout)
        if result.stderr:
            print("Errors:")
            print(result.stderr)

    except subprocess.CalledProcessError as e:
        print("Error during vampire run:")
        print(f"Exit code: {e.returncode}")
        print(f"Out:\n{e.stdout}")
        print(f"Errors:\n{e.stderr}")
    except FileNotFoundError as e:
        print(f"No file: {e}")


_exist_pattern     = re.compile(r"(?i)Exist\s*\((.*?)\)", re.DOTALL)
_op_pattern        = re.compile(r"\s*(&|\||=>)\s*")
_atom_pattern      = re.compile(r"\b[A-Za-z][A-Za-z0-9_]*\b")
_pred_call_pattern = re.compile(r"([a-z][0-9a-z_]*\(\s*X\s*\))")

def balance_parens(s: str) -> str:
    out = []
    depth = 0
    for ch in s:
        if ch == '(':
            depth += 1
            out.append(ch)
        elif ch == ')':
            if depth > 0:
                depth -= 1
                out.append(ch)
            else:
                continue
        else:
            out.append(ch)
    if depth > 0:
        out.append(')' * depth)
    return ''.join(out)

def transform_to_tptp(input_str: str) -> str:
    def inline_existentials(formula: str) -> str:
        prev = None
        while prev != formula:
            prev = formula
            formula = _exist_pattern.sub(
                lambda m: f"?[X]: ( {inline_existentials(m.group(1).strip().rstrip('.-'))} )",
                formula
            )
        return formula

    lines = [ln.strip() for ln in input_str.splitlines() if ln.strip()]
    result = []

    for idx, line in enumerate(lines, start=1):
        name    = f"f{idx}"
        content = line.strip()

        quant = ""
        if content.startswith("ForAll"):
            quant   = "!"
            content = content[len("ForAll"):].strip()
        elif content.startswith("Exist"):
            quant   = "?"
            content = content[len("Exist"):].strip()

        if content.startswith("(") and content.endswith(")"):
            content = content[1:-1].strip()

        content = inline_existentials(content)

        content = content.replace("^", "&")
        content = _op_pattern.sub(lambda m: f" {m.group(1)} ", content)
        content = " ".join(content.split())

        content = content.replace(".", "")
        content = content.replace("-", "")

        def atom_to_pred(m: re.Match) -> str:
            tok = m.group(0)
            if tok in {"&","|","=>","?","!","[","]",":","X"}:
                return tok
            return f"{tok.lower()}(X)"

        expr = _atom_pattern.sub(atom_to_pred, content)
        expr = _pred_call_pattern.sub(r"(\1)", expr)

        expr = balance_parens(expr)

        prefix = f"{quant}[X]:" if quant in ("!","?") else ""
        inner = (prefix + " " + expr).strip()

        result.append(f"fof({name}, axiom, {inner}).")

    return "\n".join(result)


In [18]:
log = pm4py.read_xes("../data/Hospital Billing - Event Log.xes")

  from .autonotebook import tqdm as notebook_tqdm
parsing log, completed traces :: 100%|██████████| 100000/100000 [00:07<00:00, 13897.73it/s]


In [19]:
def generate_tptp_from_log(x):
    log_tree = pm4py.discover_process_tree_inductive(x)
    enumerate_tau(log_tree, none_counter)
    log_ast = process_tree_to_ast(log_tree)
    logical_specification_log = visitor.visit(log_ast)
    out_log = "\n".join(str(f) for f in logical_specification_log)
    out_log = Format.lowercase_in_quotes(out_log)
    out_log = Format.replace_parens_in_quotes(out_log)
    out_log = Format.replace_colons_in_quotes(out_log)
    log_s = Format.replace_spaces_with_underscore(out_log)
    log_l = Format.label_expressions(log_s)
    log_p = GetPatternExpression.get_pattern_expression(log_l)
    pattern_expression_log = get_results(log_p)
    formulas_log = f"""{pattern_expression_log}"""
    tptp_formulas_log = transform_to_tptp(formulas_log)
    return tptp_formulas_log

In [22]:
tptp_formulas_log = generate_tptp_from_log(log)

ini: l2_s_11
fin: x2_e_20


In [23]:
open('../problems/data.p', 'w').close()
with open("../problems/data.p", "a") as f:
    f.write(tptp_formulas_log)

In [24]:
run_vampire("../problems/data.p")

Vampire Result:
% SZS status Satisfiable for data
% SZS output start Saturation.
cnf(u2065,axiom,
    ~x2_s_32(sK288)).

cnf(u2074,axiom,
    ~x2_e_32(sK288)).

cnf(u2079,axiom,
    ~l2_s_11(sK286)).

cnf(u2088,axiom,
    ~tau_46(sK283)).

cnf(u2097,axiom,
    ~code_ok(sK283)).

cnf(u2102,axiom,
    ~manual(sK283)).

cnf(u2107,axiom,
    ~l2_s_19(sK281)).

cnf(u2116,axiom,
    ~l2_s_18(sK279)).

cnf(u2125,axiom,
    x2_s_31(sK277)).

cnf(u2133,axiom,
    ~x3_s_5(sK275)).

cnf(u2136,axiom,
    ~sP4).

cnf(u2140,axiom,
    ~sP5).

cnf(u2175,axiom,
    ~x3_s_6(sK271)).

cnf(u2178,axiom,
    ~sP2).

cnf(u2182,axiom,
    ~sP3).

cnf(u2217,axiom,
    ~x3_s_4(sK267)).

cnf(u2229,axiom,
    ~x3_e_4(sK267)).

cnf(u2244,axiom,
    ~tau_48(sK264)).

cnf(u2253,axiom,
    ~l2_s_17(sK264)).

cnf(u2258,axiom,
    ~l2_e_17(sK264)).

cnf(u2263,axiom,
    ~x2_s_29(sK262)).

cnf(u2294,axiom,
    ~x2_s_27(sK259)).

cnf(u2307,axiom,
    ~x2_e_27(sK259)).

cnf(u2312,axiom,
    ~tau_42(sK257)).

cnf(u2321,ax