In [4]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Union

import json
import textwrap
from IPython.display import HTML, display

import yaml

In [5]:
# --- 1. Policy rules in a YAML DSL (as a string) ---

RULES_YAML = """
- name: block_unhosted_wallets
  when:
    field: originator.kyc
    op: eq
    value: false
  action: block
  priority: 20

- name: flag_high_value_eur
  when:
    all:
      - field: amount
        op: gt
        value: 100000
      - field: currency
        op: eq
        value: "EUR"
  action: flag
  priority: 10

- name: flag_virtual_asset_transfer
  when:
    field: context
    op: eq
    value: "Virtual_Asset_Transfer"
  action: flag
  priority: 8

- name: default_allow
  when: {}
  action: allow
  priority: 0
"""

In [6]:
# --- 2. Demo transactions (mirroring your JSON files) ---

tx_high_value = {
    "id": "tx_high_value",
    "originator": {
        "id": "BANK_001",
        "name": "Tier 1 Bank",
        "centrality": 0.15,
        "kyc": True,
    },
    "beneficiary": {
        "id": "CORP_042",
        "name": "Corporate Account",
        "centrality": 0.05,
        "kyc": True,
    },
    "amount": 250000,
    "currency": "EUR",
    "context": "MiCA_Title_III_Settlement",
}

tx_normal = {
    "id": "tx_normal",
    "originator": {
        "id": "BANK_002",
        "name": "Tier 2 Bank",
        "centrality": 0.08,
        "kyc": True,
    },
    "beneficiary": {
        "id": "SME_123",
        "name": "Small Merchant",
        "centrality": 0.03,
        "kyc": True,
    },
    "amount": 15000,
    "currency": "EUR",
    "context": "Standard_EU_Transfer",
}

tx_unhosted = {
    "id": "tx_unhosted",
    "originator": {
        "id": "WALLET_X",
        "name": "Anonymous Wallet",
        "centrality": 0.02,
        "kyc": False,
    },
    "beneficiary": {
        "id": "EXCHANGE_Y",
        "name": "Registered VASP",
        "centrality": 0.10,
        "kyc": True,
    },
    "amount": 5000,
    "currency": "EUR",
    "context": "Virtual_Asset_Transfer",
}

TXS = {
    "tx_high_value": tx_high_value,
    "tx_normal": tx_normal,
    "tx_unhosted": tx_unhosted,
}

In [9]:
# section 2: ast
Decision = Literal["allow", "block", "flag"]


@dataclass
class Condition:
    field: str
    op: str
    value: Any

    def _get_nested(self, tx: Dict[str, Any]) -> Any:
        """
        Resolve dot-separated field paths, e.g. 'originator.kyc'.
        Returns None if any segment is missing.
        """
        current: Any = tx
        for part in self.field.split("."):
            if not isinstance(current, dict) or part not in current:
                return None
            current = current[part]
        return current

    def eval(self, tx: Dict[str, Any]) -> bool:
        v = self._get_nested(tx)
        if v is None:
            return False

        if self.op == "gt":
            return v > self.value
        if self.op == "lt":
            return v < self.value
        if self.op == "eq":
            return v == self.value
        if self.op == "in":
            return v in self.value

        raise ValueError(f"Unknown operator: {self.op!r}")


@dataclass
class CompositeCondition:
    mode: Literal["all", "any"]
    children: List["Node"]

    def eval(self, tx: Dict[str, Any]) -> bool:
        if self.mode == "all":
            return all(child.eval(tx) for child in self.children)
        else:
            return any(child.eval(tx) for child in self.children)


Node = Union[Condition, CompositeCondition]


@dataclass
class Rule:
    name: str
    root: Node
    action: Decision
    priority: int

In [12]:
# rules loader - parser
def build_node(spec: Dict[str, Any]) -> Node:
    """
    Turn a YAML 'when' spec into an AST node.

    Cases:
      - composite: {'all': [ ... ]} or {'any': [ ... ]}
      - simple:    {'field': ..., 'op': ..., 'value': ...}
    """
    if "all" in spec or "any" in spec:
        mode = "all" if "all" in spec else "any"
        children_specs = spec[mode]
        children = [build_node(child) for child in children_specs]
        return CompositeCondition(mode=mode, children=children)

    # simple leaf
    return Condition(
        field=spec["field"],
        op=spec["op"],
        value=spec["value"],
    )


def load_rules_from_yaml(yaml_text: str) -> List[Rule]:
    raw = yaml.safe_load(textwrap.dedent(yaml_text))

    rules: List[Rule] = []
    for r in raw:
        when_spec = r.get("when") or {}
        if not when_spec:  # empty dict → always true
            when_spec = {"field": "__always__", "op": "eq", "value": True}

        root = build_node(when_spec)

        rules.append(
            Rule(
                name=r["name"],
                root=root,
                action=r["action"],
                priority=int(r.get("priority", 0)),
            )
        )

    return rules

In [13]:
class PolicyEngine:
    """
    Minimal policy-as-code engine.

    - deterministic evaluation order
    - precedence: block > flag > allow
    - structured trace for inspectability
    """

    def __init__(self, rules: List[Rule]):
        self.rules = rules
        self.action_rank = {"block": 2, "flag": 1, "allow": 0}

    def evaluate(self, tx: Dict[str, Any]) -> Dict[str, Any]:
        trace: List[Dict[str, Any]] = []
        matched: List[Rule] = []

        # deterministic order: highest priority first, then name - imp. explain! the sorting - ordering the rules based on the priority defined before ... in case of time sensitivity
        ordered = sorted(self.rules, key=lambda r: (-r.priority, r.name))

        for rule in ordered:
            result = rule.root.eval(tx)
            trace.append({"rule": rule.name, "result": bool(result)}) # It doesn't just tell you the result; it keeps a Diary of what happened.
            if result:
                matched.append(rule)

        if not matched:
            decision: Decision = "allow"
            chosen = None
        else:
            chosen = max(
                matched,
                key=lambda r: (self.action_rank[r.action], r.priority),
            )
            decision = chosen.action # If multiple rules match (e.g., one rule says "Flag" and another says "Block"), how does it decide?

            # It uses the max function with a specific key: (Severity, Priority).

        return {
            "decision": decision,
            "rule": chosen.name if chosen else None,
            "matched_rules": [r.name for r in matched],
            "trace": trace,
        }

This is the 'Courtroom'. The Loader prepares the case files. The Engine acts as the Judge. It hears the evidence (the Trace), sees which laws were broken (the Matched Rules), and then hands down the strictest possible sentence based on the hierarchy we defined (Block > Flag > Allow).

In [14]:
rules = load_rules_from_yaml(RULES_YAML)
engine = PolicyEngine(rules)

for name, tx in TXS.items():
    result = engine.evaluate(tx)
    print("=" * 60)
    print(f"{name}")
    print("- decision:", result["decision"], f"(rule={result['rule']})")
    print("- matched_rules:", result["matched_rules"])
    print("- trace:")
    print(json.dumps(result["trace"], indent=2))

tx_high_value
- decision: flag (rule=flag_high_value_eur)
- matched_rules: ['flag_high_value_eur']
- trace:
[
  {
    "rule": "block_unhosted_wallets",
    "result": false
  },
  {
    "rule": "flag_high_value_eur",
    "result": true
  },
  {
    "rule": "flag_virtual_asset_transfer",
    "result": false
  },
  {
    "rule": "default_allow",
    "result": false
  }
]
tx_normal
- decision: allow (rule=None)
- matched_rules: []
- trace:
[
  {
    "rule": "block_unhosted_wallets",
    "result": false
  },
  {
    "rule": "flag_high_value_eur",
    "result": false
  },
  {
    "rule": "flag_virtual_asset_transfer",
    "result": false
  },
  {
    "rule": "default_allow",
    "result": false
  }
]
tx_unhosted
- decision: block (rule=block_unhosted_wallets)
- matched_rules: ['block_unhosted_wallets', 'flag_virtual_asset_transfer']
- trace:
[
  {
    "rule": "block_unhosted_wallets",
    "result": true
  },
  {
    "rule": "flag_high_value_eur",
    "result": false
  },
  {
    "rule": "fla

In [15]:
def build_html_report(results: Dict[str, Dict[str, Any]]) -> str:
    sections = []
    for name, data in results.items():
        tx = data["tx"]
        res = data["result"]

        tx_pretty = json.dumps(tx, indent=2)
        trace_pretty = json.dumps(res["trace"], indent=2)

        section = f"""
        <section>
          <h2>{name}</h2>
          <p><strong>Decision:</strong> {res['decision']}
             {' (rule: ' + res['rule'] + ')' if res['rule'] else ''}</p>
          <p><strong>Matched rules:</strong> {', '.join(res['matched_rules']) or 'none'}</p>

          <h3>Transaction</h3>
          <pre>{tx_pretty}</pre>

          <h3>Evaluation trace</h3>
          <pre>{trace_pretty}</pre>
        </section>
        <hr/>
        """
        sections.append(section)

    html = f"""<!doctype html>
<html>
<head>
  <meta charset="utf-8">
  <title>Aspasia Policy Engine Demo</title>
  <style>
    body {{
      font-family: system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
      margin: 2rem;
      max-width: 1200px;
    }}
    h1 {{
      margin-bottom: 0.5rem;
    }}
    pre {{
      background: #f5f5f5;
      padding: 0.75rem 1rem;
      border-radius: 6px;
      overflow-x: auto;
      font-size: 0.85rem;
    }}
    section {{
      margin-bottom: 2rem;
    }}
  </style>
</head>
<body>
  <h1>Encoding Regulation – Minimal Policy-as-Code Engine</h1>
  <p>This page is generated by a small Python engine that evaluates
     sample transactions against YAML rules and records an evaluation trace.</p>

  {''.join(sections)}

</body>
</html>
"""
    return html

In [16]:
# Compute results and build HTML

results = {}
for name, tx in TXS.items():
    results[name] = {
        "tx": tx,
        "result": engine.evaluate(tx),
    }

html = build_html_report(results)

# Show inside the notebook
display(HTML(html))

# Optionally also write to a local file you can open in a browser
with open("demo.html", "w", encoding="utf-8") as f:
    f.write(html)
print("Wrote demo.html in current directory.")

Wrote demo.html in current directory.


In [17]:
import ipywidgets as widgets
from IPython.display import display, clear_output

# Text area prefilled with your DSL
rules_text = widgets.Textarea(
    value=RULES_YAML,
    description="Rules YAML",
    layout=widgets.Layout(width="100%", height="250px"),
    style={"description_width": "100px"},
)

# Dropdown to choose which transaction to evaluate
tx_dropdown = widgets.Dropdown(
    options=list(TXS.keys()),
    value="tx_high_value",
    description="Transaction:",
    style={"description_width": "100px"},
)

run_button = widgets.Button(
    description="Evaluate",
    button_style="primary",
)

output = widgets.Output()


def on_run_clicked(_):
    with output:
        clear_output()
        try:
            # parse rules from the current textbox
            rules = load_rules_from_yaml(rules_text.value)
            engine = PolicyEngine(rules)

            # get selected transaction
            tx_name = tx_dropdown.value
            tx = TXS[tx_name]

            result = engine.evaluate(tx)

            print(f"=== {tx_name} ===")
            print("Transaction:")
            print(json.dumps(tx, indent=2))
            print()
            print("Decision:", result["decision"], f"(rule={result['rule']})")
            print("Matched rules:", result["matched_rules"])
            print("Trace:")
            print(json.dumps(result["trace"], indent=2))
        except Exception as e:
            print("Error while evaluating rules:", e)


run_button.on_click(on_run_clicked)

ui = widgets.VBox(
    [
        rules_text,
        widgets.HBox([tx_dropdown, run_button]),
        output,
    ]
)

display(ui)

VBox(children=(Textarea(value='\n- name: block_unhosted_wallets\n  when:\n    field: originator.kyc\n    op: e…