# Alpha-Miner Playground: Footprint Table + Petri Net (Workflow net)

Enter **weighted traces** and compute:

1) **Directly-follows (weighted)**  
2) **Footprint relations**: `∥`, `→`, `←`, `#`  
3) **Alpha-Miner sets**: `T_in`, `T_out`, `X_L`, `Y_L` (maximal pairs)  
4) A **Petri net (workflow net)** rendered via Graphviz

**Trace format:** `a,b,c` (commas; whitespace ignored)  
**Weight:** non-negative number (0 ignores the trace)


In [10]:
import itertools
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, Markdown, clear_output
from graphviz import Digraph


In [11]:
def parse_trace(trace_str: str):
    """
    Parse a trace like 'a,b,c' into ['a','b','c'].
    Whitespace is ignored. Empty tokens are removed.
    """
    if trace_str is None:
        return []
    s = str(trace_str).strip()
    if not s:
        return []
    parts = [p.strip() for p in s.split(",")]
    return [p for p in parts if p != ""]


def extract_activities(weighted_traces):
    A = set()
    for trace, w in weighted_traces:
        if w is None or w <= 0:
            continue
        A.update(trace)
    return sorted(A)


def start_activities(weighted_traces):
    s = set()
    for trace, w in weighted_traces:
        if w is None or w <= 0 or not trace:
            continue
        s.add(trace[0])
    return sorted(s)


def end_activities(weighted_traces):
    e = set()
    for trace, w in weighted_traces:
        if w is None or w <= 0 or not trace:
            continue
        e.add(trace[-1])
    return sorted(e)


def directly_follows_counts(A, weighted_traces):
    """
    Weighted directly-follows counts matrix.
    counts[a][b] = sum of weights over all occurrences of a directly followed by b.
    """
    idx = {a: i for i, a in enumerate(A)}
    n = len(A)
    M = [[0.0] * n for _ in range(n)]
    for trace, w in weighted_traces:
        if w is None or w <= 0:
            continue
        w = float(w)
        for x, y in zip(trace, trace[1:]):
            if x in idx and y in idx:
                M[idx[x]][idx[y]] += w
    return pd.DataFrame(M, index=A, columns=A)


def compute_relations(A, df_counts, threshold=0.0):
    """
    Footprint relations from directly-follows counts.

    existence(a,b): count(a,b) > threshold
    ∥ : existence(a,b) and existence(b,a)
    → : existence(a,b) and not existence(b,a)
    ← : existence(b,a) and not existence(a,b)
    # : otherwise

    NOTE (your requested change):
      - For diagonal (a,a), we set:
          ∥  if existence(a,a) is true (i.e., 'a,a' occurs in some trace, weighted > threshold)
          #  otherwise
    """
    exists = (df_counts > float(threshold))
    rel = {}

    for a in A:
        for b in A:
            ab = bool(exists.loc[a, b])
            ba = bool(exists.loc[b, a])  # same as ab when a==b

            if a == b:
                rel[(a, b)] = "∥" if ab else "#"
                continue

            if ab and ba:
                rel[(a, b)] = "∥"
            elif ab and not ba:
                rel[(a, b)] = "→"
            elif (not ab) and ba:
                rel[(a, b)] = "←"
            else:
                rel[(a, b)] = "#"

    return rel


def relation_sets(A, df_counts, threshold=0.0):
    """
    Returns:
      >L       : set of (a,b) with existence(a,b)
      ∥L       : set of (a,b) with mutual existence(a,b) and existence(b,a)
      →L       : set of (a,b) with existence(a,b) and not existence(b,a)
      #L       : set of (a,b) with neither existence(a,b) nor existence(b,a)
                PLUS diagonal (a,a) only if NOT existence(a,a) (since diagonal may be ∥ now)
    """
    exists = (df_counts > float(threshold))

    gt = set()
    parallel = set()
    causal = set()
    notf = set()

    for a in A:
        for b in A:
            ab = bool(exists.loc[a, b])
            ba = bool(exists.loc[b, a])

            if ab:
                gt.add((a, b))

            if a == b:
                if ab:
                    parallel.add((a, a))  # self-loop => a ∥ a
                else:
                    notf.add((a, a))      # no self-loop => a # a
                continue

            if ab and ba:
                parallel.add((a, b))
            elif ab and not ba:
                causal.add((a, b))
            elif (not ab) and (not ba):
                notf.add((a, b))

    return gt, parallel, causal, notf


def footprint_table(A, rel):
    import pandas as pd
    return pd.DataFrame([[rel[(r, c)] for c in A] for r in A], index=A, columns=A)


def powerset_nonempty(items):
    items = list(items)
    for r in range(1, len(items) + 1):
        for comb in itertools.combinations(items, r):
            yield frozenset(comb)


def compute_XL_YL(A, rel):
    """
    X_L: all pairs (A_set, B_set), non-empty, such that:
      - for all distinct a1,a2 in A_set: a1 # a2
      - for all distinct b1,b2 in B_set: b1 # b2
      - for all a in A_set, b in B_set: a → b

    Y_L: maximal pairs from X_L under subset ordering:
      (A,B) <= (A',B') iff A ⊆ A' and B ⊆ B'
    """
    A_list = list(A)

    def all_hash(S):
        S = list(S)
        for i in range(len(S)):
            for j in range(i + 1, len(S)):
                if rel[(S[i], S[j])] != "#":
                    return False
        return True

    def all_causal(AS, BS):
        for a in AS:
            for b in BS:
                if rel[(a, b)] != "→":
                    return False
        return True

    XL = set()
    for AS in powerset_nonempty(A_list):
        if not all_hash(AS):
            continue
        for BS in powerset_nonempty(A_list):
            if not all_hash(BS):
                continue
            if all_causal(AS, BS):
                XL.add((AS, BS))

    # maximal pairs
    YL = set(XL)
    for (A1, B1) in XL:
        for (A2, B2) in XL:
            if (A1, B1) == (A2, B2):
                continue
            if A1.issubset(A2) and B1.issubset(B2):
                # (A1,B1) is dominated by a bigger/equal pair
                if A1 != A2 or B1 != B2:
                    if (A1, B1) in YL:
                        YL.remove((A1, B1))
                    break

    return XL, YL


def build_petri_net(T, T_in, T_out, YL):
    """
    Build a workflow net from Alpha-Miner result.

    Places: p_start, p_end, and one place for each (A,B) in YL.
    Transitions: activities T
    Arcs:
      p_start -> t for t in T_in
      t -> p_end for t in T_out
      for each (A,B) in YL:
        a -> p_(A,B) for a in A
        p_(A,B) -> b for b in B
    """
    places = ["p_start", "p_end"]
    place_for_pair = {}

    for k, (AS, BS) in enumerate(sorted(YL, key=lambda x: (sorted(list(x[0])), sorted(list(x[1]))))):
        pid = f"p{len(places)-2}"
        places.append(pid)
        place_for_pair[(AS, BS)] = pid

    arcs = []  # (src, dst)
    for t in T_in:
        arcs.append(("p_start", t))
    for t in T_out:
        arcs.append((t, "p_end"))

    for (AS, BS), pid in place_for_pair.items():
        for a in AS:
            arcs.append((a, pid))
        for b in BS:
            arcs.append((pid, b))

    return {
        "places": places,
        "transitions": list(T),
        "arcs": arcs,
        "pair_places": place_for_pair,
    }


def render_petri_net(net, pair_places, show_pair_labels=True):
    """
    Graphviz rendering: places as circles, transitions as boxes.
    """
    dot = Digraph("petri", graph_attr={"rankdir": "LR"})

    # Places
    for p in net["places"]:
        if p in ("p_start", "p_end"):
            dot.node(p, label=p, shape="circle")
        else:
            label = p
            if show_pair_labels:
                # reverse-map to (A,B)
                inv = {v: k for k, v in pair_places.items()}
                AS, BS = inv[p]
                label = f"{p}\n{{{','.join(sorted(AS))}}} -> {{{','.join(sorted(BS))}}}"
            dot.node(p, label=label, shape="circle")

    # Transitions
    for t in net["transitions"]:
        dot.node(t, label=t, shape="box")

    # Arcs
    for src, dst in net["arcs"]:
        dot.edge(src, dst)

    return dot


## Interactive GUI

Add/remove rows, set weights, then compute.

Tip: keep **Threshold = 0** to match the standard alpha-miner definition of `a >_L b` (existence in the log).


In [12]:
# ---------- UI widgets ----------

rows_box = widgets.VBox([])

def make_row(trace_value="", weight_value=1.0):
    trace = widgets.Text(
        value=trace_value,
        placeholder="e.g. a,b,c",
        description="Trace:",
        layout=widgets.Layout(width="60%")
    )
    weight = widgets.FloatText(
        value=float(weight_value),
        description="Weight:",
        layout=widgets.Layout(width="25%")
    )
    remove_btn = widgets.Button(description="Remove", button_style="danger", layout=widgets.Layout(width="12%"))
    row = widgets.HBox([trace, weight, remove_btn], layout=widgets.Layout(width="100%"))

    def _remove(_):
        rows = list(rows_box.children)
        if row in rows:
            rows.remove(row)
            rows_box.children = tuple(rows)

    remove_btn.on_click(_remove)
    return row

def add_row(_=None, trace_value="", weight_value=1.0):
    rows = list(rows_box.children)
    rows.append(make_row(trace_value=trace_value, weight_value=weight_value))
    rows_box.children = tuple(rows)

# Example data
add_row(trace_value="a,c,d", weight_value=3)
add_row(trace_value="a,d,c", weight_value=2)
add_row(trace_value="b,c,d", weight_value=2)
add_row(trace_value="b,d,c", weight_value=4)

add_btn = widgets.Button(description="Add trace row", button_style="success")
add_btn.on_click(add_row)

threshold = widgets.FloatSlider(
    value=0.0,
    min=0.0,
    max=10.0,
    step=0.5,
    description="Threshold:",
    readout=True,
    continuous_update=False,
    layout=widgets.Layout(width="60%")
)

show_pair_labels = widgets.Checkbox(value=True, description="Show (A,B) labels on places")

compute_btn = widgets.Button(description="Compute + render Petri net", button_style="primary")

out = widgets.Output()

def read_rows():
    weighted_traces = []
    for row in rows_box.children:
        trace_widget, weight_widget, _ = row.children
        t = parse_trace(trace_widget.value)
        w = float(weight_widget.value)
        weighted_traces.append((t, w))
    return weighted_traces

def on_compute(_):
    with out:
        clear_output()
        weighted_traces = read_rows()
        A = extract_activities(weighted_traces)

        if not A:
            display(Markdown("**No activities found.** Add at least one non-empty trace with weight > 0."))
            return

        # Basic Alpha-Miner ingredients
        T_in = start_activities(weighted_traces)
        T_out = end_activities(weighted_traces)

        df_counts = directly_follows_counts(A, weighted_traces)
        rel = compute_relations(A, df_counts, threshold=threshold.value)
        fp = footprint_table(A, rel)

        gt, par, causal, notf = relation_sets(A, df_counts, threshold=threshold.value)
        XL, YL = compute_XL_YL(A, rel)

        display(Markdown("## Activities / Start / End"))
        display(pd.DataFrame({
            "Set": ["T (activities)", "T_in (start)", "T_out (end)"],
            "Elements": [", ".join(A), ", ".join(T_in), ", ".join(T_out)]
        }))

        display(Markdown("## Directly-follows counts (weighted)"))
        display(df_counts)

        display(Markdown("## Footprint table"))
        display(fp)

        def fmt_pairs(pairs):
            return ", ".join([f"({a},{b})" for a, b in sorted(pairs)])

        display(Markdown("## Relation sets"))
        display(Markdown(f"**>L:** {fmt_pairs(gt)}"))
        display(Markdown(f"**∥L:** {fmt_pairs(par)}"))
        display(Markdown(f"**→L (causal):** {fmt_pairs(causal)}"))
        display(Markdown(f"**#L (incl. diagonal):** {fmt_pairs(notf)}"))

        def fmt_setpair(p):
            AS, BS = p
            return f"({{{','.join(sorted(AS))}}}, {{{','.join(sorted(BS))}}})"

        display(Markdown("## X_L (candidate place pairs)"))
        display(Markdown(", ".join([fmt_setpair(p) for p in sorted(XL, key=lambda x: (sorted(x[0]), sorted(x[1])))] ) or "_(empty)_"))

        display(Markdown("## Y_L (maximal pairs)"))
        display(Markdown(", ".join([fmt_setpair(p) for p in sorted(YL, key=lambda x: (sorted(x[0]), sorted(x[1])))] ) or "_(empty)_"))

        # Build and render Petri net
        net = build_petri_net(A, T_in, T_out, YL)
        dot = render_petri_net(net, net["pair_places"], show_pair_labels=show_pair_labels.value)

        display(Markdown("## Petri net (workflow net)"))
        display(dot)

compute_btn.on_click(on_compute)

display(widgets.VBox([
    widgets.HTML("<h3>Enter weighted traces</h3>"),
    rows_box,
    threshold,
    show_pair_labels,
    widgets.HBox([compute_btn, add_btn]),
    out
]))


VBox(children=(HTML(value='<h3>Enter weighted traces</h3>'), VBox(children=(HBox(children=(Text(value='a,c,d',…

## Why it shows in Jupyter but not in PyCharm

This notebook uses **ipywidgets**, which needs a **Jupyter front-end** (browser notebook, JupyterLab, or a notebook UI inside an IDE).

If you run the same code as a plain Python script, the widgets have no front-end to render.  
Two practical options:

- Open and run the **.ipynb** in a Jupyter front-end (including PyCharm's notebook support, if enabled).
- Use a script-based UI (e.g., Streamlit) and reuse the same computation functions.
