In [1]:
%pip install pyyaml

Note: you may need to restart the kernel to use updated packages.


In [4]:
import json
from pathlib import Path
import sys

import yaml

# Upewniamy siÄ™, Å¼e katalog "src" jest na Å›cieÅ¼ce importÃ³w Pythona
sys.path.append("src")

from axiomatic_kernel import (
    AxiomKernel,
    VariableSchema,
    AxiomDefinition,
    DecisionLogger,
)
from nl_rule_parser import (
    build_axiom_from_nl,
    RuleParseError,
)
from explanation_engine import (
    DecisionExplainer,
    ExplanationConfig,
)
from rules_io import load_ruleset_from_file, apply_ruleset_to_kernel
from ruleset_manager import RulesetRegistry, Environment
from rule_analytics import RuleAnalyticsEngine


In [5]:

# === 1) SCHEMA DOPASOWANA DO FRAUD_RULES (tylko typy obsÅ‚ugiwane przez kernel) ===
schema = [
    VariableSchema("amount", "int", "Kwota transakcji w jednostkach minimalnych."),
    VariableSchema("tx_count_24h", "int", "Liczba transakcji w ostatnich 24h."),
    VariableSchema("is_pep", "bool", "Czy klient jest PEP."),
    VariableSchema("is_suspicious", "bool", "Czy transakcja jest podejrzana."),
]

logger = DecisionLogger("logs/fraud_rules_demo.jsonl")

kernel = AxiomKernel(
    schema=schema,
    decision_variable="is_suspicious",
    logger=logger,
    rule_version="fraud_rules_v1",
)

# === 2) Wczytanie i rejestracja rulesetu FRAUD w Å›rodowisku DEV ===
rules_path = Path("rules") / "fraud_rules_v1.yaml"
rules_path.parent.mkdir(exist_ok=True)

registry = RulesetRegistry()

record = registry.register_ruleset(
    ruleset_id="fraud_rules_v1",
    path=rules_path,
    environment=Environment.DEV,
)

print(
    f"ðŸ“˜ Zarejestrowano ruleset: {record.key.ruleset_id} "
    f"(v{record.version}) w Å›rodowisku {record.key.environment.value}"
)
print(f"Plik: {record.file_path}")

# === 3) NaÅ‚oÅ¼enie rulesetu na kernel przez RulesetRegistry ===
summary = registry.apply_ruleset_to_kernel(
    ruleset_id="fraud_rules_v1",
    environment=Environment.DEV,
    kernel=kernel,
    schema=schema,
    decision_field_fallback="is_suspicious",
    strict=True,
    extra_metadata={"domain": 'fraud-demo'},
)

print("\nðŸ“Š Podsumowanie Å‚adowania reguÅ‚:")
print(f"- total_rules:   {summary.total_rules}")
print(f"- enabled_rules: {summary.enabled_rules}")
print(f"- loaded_rules:  {summary.loaded_rules}")
print(f"- skipped_rules: {summary.skipped_rules}")
print(f"- errors:        {summary.errors}")


ðŸ“˜ Zarejestrowano ruleset: fraud_rules_v1 (v1.0.0) w Å›rodowisku DEV
Plik: rules/fraud_rules_v1.yaml

ðŸ“Š Podsumowanie Å‚adowania reguÅ‚:
- total_rules:   3
- enabled_rules: 2
- loaded_rules:  2
- skipped_rules: 1
- errors:        {}


In [None]:
# Katalog i plik z reguÅ‚ami dla tego notebooka
rules_dir = Path("rules")
rules_dir.mkdir(exist_ok=True)

rules_file = rules_dir / "notebook_rules.yaml"

# DomyÅ›lny zestaw reguÅ‚ â€“ uÅ¼yty tylko, jeÅ›li plik nie istnieje.
default_rules_yaml = """rules:
  - id: "nl_high_risk_flag"
    text: "If amount > 10000 and risk_score > 5 then flag = true"
  - id: "nl_low_risk_clear"
    text: "If risk_score <= 2 then flag = false"
"""

if not rules_file.exists():
    rules_file.write_text(default_rules_yaml, encoding="utf-8")
    print(f"Utworzono domyÅ›lny plik z reguÅ‚ami: {rules_file}")
else:
    print(f"UÅ¼ywam istniejÄ…cego pliku z reguÅ‚ami: {rules_file}")

# Wczytanie reguÅ‚ z pliku YAML
with rules_file.open("r", encoding="utf-8") as f:
    rules_data = yaml.safe_load(f)

rules_list = rules_data.get("rules", [])

axioms = []
for raw_rule in rules_list:
    rule_id = raw_rule["id"]
    text = raw_rule["text"]

    axiom = build_axiom_from_nl(
        rule_id=rule_id,
        text=text,
        schema=schema,
        decision_field_fallback="flag",
    )
    kernel.add_axiom_safe(axiom)
    axioms.append(axiom)

print("Dodane reguÅ‚y:", [a.id for a in axioms])


In [None]:
explainer = DecisionExplainer(ExplanationConfig(language="pl"))

# Przypadek wysokiego ryzyka â€“ powinno byÄ‡ FLAGGED
case_flagged = {"amount": 15_000, "risk_score": 7}
bundle_flagged = kernel.evaluate(case_flagged)

print("=== RAW BUNDLE (FLAGGED) ===")
print(json.dumps(bundle_flagged, indent=2, ensure_ascii=False))

print("\n=== WYJAÅšNIENIE (FLAGGED) ===")
print(explainer.explain(bundle_flagged).to_text(language="pl"))



# Przypadek niskiego ryzyka â€“ powinno byÄ‡ CLEAN
case_clean = {"amount": 500, "risk_score": 1}
bundle_clean = kernel.evaluate(case_clean)

print("=== RAW BUNDLE (CLEAN) ===")
print(json.dumps(bundle_clean, indent=2, ensure_ascii=False))

print("\n=== WYJAÅšNIENIE (CLEAN) ===")
print(explainer.explain(bundle_clean).to_text(language="pl"))


In [None]:
from z3 import Implies  # type: ignore

unsat_kernel = AxiomKernel(
    schema=schema,
    decision_variable="flag",
    logger=None,
    rule_version="demo_unsat_v1",
)

def rule_flag_true(vars_z3):
    amount = vars_z3["amount"]
    flag = vars_z3["flag"]
    return Implies(amount > 10_000, flag == True)

def rule_flag_false(vars_z3):
    amount = vars_z3["amount"]
    flag = vars_z3["flag"]
    return Implies(amount > 10_000, flag == False)

unsat_kernel.add_axiom(
    AxiomDefinition(
        id="amount_flag_true",
        description="If amount > 10000 then flag must be True.",
        build_constraint=rule_flag_true,
    )
)
unsat_kernel.add_axiom(
    AxiomDefinition(
        id="amount_flag_false",
        description="If amount > 10000 then flag must be False.",
        build_constraint=rule_flag_false,
    )
)

case_conflict = {"amount": 15_000, "risk_score": 5}
bundle_unsat = unsat_kernel.evaluate(case_conflict)

print("=== RAW BUNDLE (UNSAT) ===")
print(json.dumps(bundle_unsat, indent=2, ensure_ascii=False))

print("\n=== WYJAÅšNIENIE (UNSAT) ===")
print(explainer.explain(bundle_unsat).to_text(language="pl"))
