<a href="https://colab.research.google.com/github/Dilshodyorqinovich/NumpyMohirdev/blob/main/expertsystemEnhanced.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
"""
Rule-Based Expert System for Online Transaction Risk Assessment
- Facts are key -> (value, confidence in [0,1], source)
- Rules are layered: OBSERVE (O*), THINK (T*), DECIDE (D*), CONFIRM (C*)
- Confidence propagation (MYCIN-style):
    AND premise confidence = min(condition confidences)
    inferred confidence = premise_confidence * rule_strength
    combine multiple supports for same conclusion with noisy-or:
        c = (c1 + c2) - (c1 * c2)

Implements:
A) Forward chaining (data-driven) with step-by-step calculation logging
B) Backward chaining (goal-driven) that returns targeted questions for missing prerequisites
C) Mixed chaining:
   1) Forward chain
   2) Take top-2 actions (ignoring threshold) to find discriminating question
   3) Ask best next question (if any)
   4) Update facts
   5) Forward chain again

Action threshold:
- For "final decision output", only actions with confidence >= 0.70 are accepted.
- We still compute and show all action scores for transparency and for mixed-chaining question selection.
"""

from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple


# -------------------------
# Confidence math (MYCIN-ish)
# -------------------------

def clamp01(x: float) -> float:
    return max(0.0, min(1.0, x))

def noisy_or(c1: float, c2: float) -> float:
    """Supportive evidence combination for same conclusion (noisy OR)."""
    c1, c2 = clamp01(c1), clamp01(c2)
    return clamp01((c1 + c2) - (c1 * c2))

def and_premise(conf_list: List[float]) -> float:
    """AND premise confidence = min of condition confidences."""
    return min(conf_list) if conf_list else 0.0


# -------------------------
# Facts + Working Memory
# -------------------------

@dataclass
class Fact:
    value: Any
    conf: float
    source: str = ""

class WorkingMemory:
    def __init__(self):
        self.facts: Dict[str, Fact] = {}

    def has(self, key: str) -> bool:
        return key in self.facts

    def get(self, key: str) -> Optional[Fact]:
        return self.facts.get(key)

    def set(self, key: str, value: Any, conf: float, source: str = ""):
        conf = clamp01(conf)
        if key not in self.facts:
            self.facts[key] = Fact(value=value, conf=conf, source=source)
            return

        existing = self.facts[key]
        # If same value, combine support
        if existing.value == value:
            combined = noisy_or(existing.conf, conf)
            self.facts[key] = Fact(value=value, conf=combined, source=(existing.source + " | " + source).strip(" |"))
            return

        # If conflicting value, keep higher confidence (simple conflict resolution)
        if conf > existing.conf:
            self.facts[key] = Fact(value=value, conf=conf, source=source)

    def confidence(self, key: str, expected_value: Any = True) -> float:
        f = self.get(key)
        if f is None:
            return 0.0
        return f.conf if f.value == expected_value else 0.0


# -------------------------
# Conditions + Rules
# -------------------------

@dataclass
class Condition:
    key: str
    expected_value: Any = True
    predicate: Optional[Callable[[Any], bool]] = None

    # Question hook (only used if missing)
    question: Optional[str] = None
    answer_parser: Optional[Callable[[str], Any]] = None

    def evaluate(self, wm: WorkingMemory) -> Tuple[bool, float, bool]:
        """
        Returns: (known_and_true, confidence, is_missing)
        - If missing -> (False, 0.0, True)
        - If known but false -> (False, 0.0, False)
        - If known and true -> (True, fact_conf, False)
        """
        f = wm.get(self.key)
        if f is None:
            return (False, 0.0, True)

        if self.predicate is not None:
            ok = self.predicate(f.value)
            if ok:
                return (True, f.conf, False)
            return (False, 0.0, False)

        if f.value == self.expected_value:
            return (True, f.conf, False)

        return (False, 0.0, False)


@dataclass
class Rule:
    rid: str
    conditions: List[Condition]
    conclusion_key: str
    conclusion_value: Any
    strength: float  # 0..1
    kind: str        # "O" "T" "D" "C"
    description: str = ""

    def applicable(self, wm: WorkingMemory) -> Tuple[bool, float, List[Condition], Dict[str, Any]]:
        """
        Returns:
          (can_fire_now, inferred_conf, missing_conditions, debug)
        debug includes per-condition evaluation and exact calculation.
        """
        cond_rows = []
        confs: List[float] = []
        missing: List[Condition] = []

        for cond in self.conditions:
            ok, c, is_missing = cond.evaluate(wm)
            f = wm.get(cond.key)

            if f is None:
                cond_rows.append({
                    "key": cond.key,
                    "expected": cond.expected_value if cond.predicate is None else "predicate(...)",
                    "actual": None,
                    "conf": None,
                    "status": "MISSING"
                })
            else:
                cond_rows.append({
                    "key": cond.key,
                    "expected": cond.expected_value if cond.predicate is None else "predicate(...)",
                    "actual": f.value,
                    "conf": f.conf,
                    "status": "OK" if ok else "FAIL"
                })

            if is_missing:
                missing.append(cond)

            # Rule can't fire if any condition not satisfied (missing or false)
            if not ok:
                return (False, 0.0, missing, {"conditions": cond_rows})

            confs.append(c)

        premise_conf = and_premise(confs)
        inferred = clamp01(premise_conf * clamp01(self.strength))

        debug = {
            "conditions": cond_rows,
            "premise_conf": premise_conf,
            "rule_strength": self.strength,
            "inferred_conf": inferred,
            "formula": (
                f"premise=min({', '.join(f'{x:.2f}' for x in confs)})={premise_conf:.2f}; "
                f"inferred={premise_conf:.2f}*{self.strength:.2f}={inferred:.3f}"
            )
        }
        return (True, inferred, [], debug)


# -------------------------
# Expert System Engine
# -------------------------

class ExpertSystem:
    def __init__(self, rules: List[Rule], action_threshold: float = 0.70):
        self.rules = rules
        self.action_threshold = action_threshold

        # Compact log (like your FIRING LOG)
        self.firing_log: List[Tuple[str, str, Any, float]] = []

        # Detailed log with math steps
        self.calc_log: List[str] = []

    # A) Forward chaining (data-driven)
    def forward_chain(self, wm: WorkingMemory, max_iters: int = 100, verbose: bool = True):
        fired_any = True
        iters = 0
        self.firing_log.clear()
        self.calc_log.clear()

        while fired_any and iters < max_iters:
            fired_any = False
            iters += 1

            for rule in self.rules:
                can_fire, inferred_conf, _missing, debug = rule.applicable(wm)
                if not can_fire:
                    continue

                # If already have same conclusion with >= confidence, skip
                existing = wm.get(rule.conclusion_key)
                if existing is not None and existing.value == rule.conclusion_value and existing.conf >= inferred_conf:
                    continue

                wm.set(
                    rule.conclusion_key,
                    rule.conclusion_value,
                    inferred_conf,
                    source=f"{rule.rid}:{rule.description}"
                )
                self.firing_log.append((rule.rid, rule.conclusion_key, rule.conclusion_value, inferred_conf))

                # Build detailed calculation trace
                lines = [f"{rule.rid} ({rule.kind}) {rule.description}"]
                for row in debug.get("conditions", []):
                    lines.append(
                        f"  - cond: {row['key']} expected={row['expected']} actual={row['actual']} "
                        f"conf={row['conf']} status={row['status']}"
                    )
                if "formula" in debug:
                    lines.append(f"  - calc: {debug['formula']}")
                lines.append(f"  => infer {rule.conclusion_key}={rule.conclusion_value} conf={inferred_conf:.3f}")

                text = "\n".join(lines)
                self.calc_log.append(text)
                if verbose:
                    print(text)
                    print()

                fired_any = True

    def explanations(self) -> List[str]:
        return [f"- {rid} inferred {k}={v} with conf={c:.2f}" for rid, k, v, c in self.firing_log]

    def detailed_calculations(self) -> str:
        return "\n".join(self.calc_log)

    # All actions (no threshold) — for transparency and mixed chaining
    def rank_actions_all(self, wm: WorkingMemory) -> List[Tuple[str, float]]:
        actions = ["APPROVE", "STEP_UP_VERIFY", "MANUAL_REVIEW", "DECLINE"]
        scored = []
        for a in actions:
            c = wm.confidence(f"action_{a}", True)
            if c > 0:
                scored.append((a, c))
        return sorted(scored, key=lambda x: x[1], reverse=True)

    # Threshold-filtered actions — for "final decision"
    def rank_actions(self, wm: WorkingMemory, threshold: Optional[float] = None) -> List[Tuple[str, float]]:
        if threshold is None:
            threshold = self.action_threshold

        actions = ["APPROVE", "STEP_UP_VERIFY", "MANUAL_REVIEW", "DECLINE"]
        scored = []
        for a in actions:
            c = wm.confidence(f"action_{a}", True)
            if c >= threshold:
                scored.append((a, c))
        return sorted(scored, key=lambda x: x[1], reverse=True)

    def final_action(self, wm: WorkingMemory) -> Tuple[Optional[str], float, str]:
        ranked = self.rank_actions(wm)  # thresholded
        if ranked:
            a, c = ranked[0]
            return a, c, f"Top action meets threshold {self.action_threshold:.2f}."

        # No action meets threshold => recommend asking best next question
        actions_all = self.rank_actions_all(wm)
        top2 = [a for a, _ in actions_all[:2]]
        q = self.choose_best_question(wm, top2)
        if q:
            return None, 0.0, f"No action >= {self.action_threshold:.2f}. Ask next: {q[1]}"
        return None, 0.0, f"No action >= {self.action_threshold:.2f} and no discriminating question found."

    # B) Backward chaining (goal-driven)
    def backward_chain(self, goal_key: str, goal_value: Any, wm: WorkingMemory) -> Dict[str, Any]:
        f = wm.get(goal_key)
        if f is not None and f.value == goal_value:
            return {"goal_supported_conf": f.conf, "candidate_rules": [], "questions": []}

        candidate_rules = [r for r in self.rules if r.conclusion_key == goal_key and r.conclusion_value == goal_value]

        questions: List[Tuple[str, str]] = []
        best_support = 0.0

        for r in candidate_rules:
            # FIX: applicable returns 4 values now
            can_fire, inferred, missing, _debug = r.applicable(wm)
            if can_fire:
                best_support = max(best_support, inferred)
            else:
                for cond in missing:
                    if cond.question and not wm.has(cond.key):
                        questions.append((cond.key, cond.question))

        # Deduplicate questions preserving order
        seen = set()
        uniq_questions = []
        for k, q in questions:
            if k not in seen:
                seen.add(k)
                uniq_questions.append((k, q))

        return {
            "goal_supported_conf": best_support,
            "candidate_rules": [r.rid for r in candidate_rules],
            "questions": uniq_questions
        }

    # C) Mixed chaining — choose best discriminating question between top-2 actions
    def choose_best_question(self, wm: WorkingMemory, top_actions: List[str]) -> Optional[Tuple[str, str]]:
        if len(top_actions) < 2:
            return None

        a1, a2 = top_actions[0], top_actions[1]
        g1 = f"action_{a1}"
        g2 = f"action_{a2}"

        bc1 = self.backward_chain(g1, True, wm)
        bc2 = self.backward_chain(g2, True, wm)

        q1 = {k: q for k, q in bc1["questions"]}
        q2 = {k: q for k, q in bc2["questions"]}

        keys = set(q1.keys()) | set(q2.keys())
        if not keys:
            return None

        leverage_bonus = {
            "user_confirmed_transaction": 1.0,
            "otp_passed": 0.8,
            "card_reported_stolen": 1.0,
            "device_new_to_account": 0.6,
            "ip_high_risk": 0.6,
            "billing_shipping_mismatch": 0.5,
        }

        best = None
        best_score = -1e9
        for k in keys:
            only_one_side = (k in q1) ^ (k in q2)
            score = 2.0 if only_one_side else 1.0
            score += leverage_bonus.get(k, 0.0)
            q_text = q1.get(k) or q2.get(k)
            if q_text and score > best_score:
                best_score = score
                best = (k, q_text)

        return best


# -------------------------
# Rule base: O / T / C / D
# -------------------------

def build_rules() -> List[Rule]:
    rules: List[Rule] = []

    # OBSERVE (O*): raw inputs -> signals
    rules += [
        Rule(
            rid="O1",
            kind="O",
            description="High IP risk score implies ip_high_risk",
            conditions=[Condition("ip_risk_score", predicate=lambda x: x >= 80,
                                 question="What is the IP risk score (0-100)?")],
            conclusion_key="ip_high_risk",
            conclusion_value=True,
            strength=0.90
        ),
        Rule(
            rid="O2",
            kind="O",
            description="Very high amount implies high_amount",
            conditions=[Condition("amount", predicate=lambda x: x >= 500,
                                 question="What is the transaction amount?")],
            conclusion_key="high_amount",
            conclusion_value=True,
            strength=0.70
        ),
        Rule(
            rid="O3",
            kind="O",
            description="New device to account implies device_new_to_account",
            conditions=[Condition("device_seen_before", expected_value=False,
                                 question="Has this device been seen on this account before? (yes/no)")],
            conclusion_key="device_new_to_account",
            conclusion_value=True,
            strength=0.80
        ),
        Rule(
            rid="O4",
            kind="O",
            description="Billing and shipping mismatch implies mismatch signal",
            conditions=[Condition("billing_shipping_same", expected_value=False,
                                 question="Is billing address the same as shipping? (yes/no)")],
            conclusion_key="billing_shipping_mismatch",
            conclusion_value=True,
            strength=0.85
        ),
        Rule(
            rid="O5",
            kind="O",
            description="Account very new implies new_account",
            conditions=[Condition("account_age_days", predicate=lambda x: x < 7,
                                 question="How old is the account (days)?")],
            conclusion_key="new_account",
            conclusion_value=True,
            strength=0.75
        ),
        Rule(
            rid="O6",
            kind="O",
            description="Geo distance large implies geo_mismatch",
            conditions=[Condition("geo_distance_km", predicate=lambda x: x >= 1500,
                                 question="Approx geo distance between IP and billing country (km)?")],
            conclusion_key="geo_mismatch",
            conclusion_value=True,
            strength=0.80
        ),
        Rule(
            rid="O7",
            kind="O",
            description="Multiple failed attempts implies velocity_attack",
            conditions=[Condition("failed_attempts_1h", predicate=lambda x: x >= 3,
                                 question="How many failed attempts in last 1 hour?")],
            conclusion_key="velocity_attack",
            conclusion_value=True,
            strength=0.85
        ),
    ]

    # THINK (T*): signals -> hypotheses
    rules += [
        Rule(
            rid="T1",
            kind="T",
            description="ip_high_risk AND device_new_to_account -> account_takeover_suspected",
            conditions=[Condition("ip_high_risk"), Condition("device_new_to_account")],
            conclusion_key="account_takeover_suspected",
            conclusion_value=True,
            strength=0.80
        ),
        Rule(
            rid="T2",
            kind="T",
            description="billing_shipping_mismatch AND high_amount -> fraud_suspected",
            conditions=[Condition("billing_shipping_mismatch"), Condition("high_amount")],
            conclusion_key="fraud_suspected",
            conclusion_value=True,
            strength=0.75
        ),
        Rule(
            rid="T3",
            kind="T",
            description="velocity_attack AND new_account -> bot_fraud_suspected",
            conditions=[Condition("velocity_attack"), Condition("new_account")],
            conclusion_key="bot_fraud_suspected",
            conclusion_value=True,
            strength=0.90
        ),
        Rule(
            rid="T4",
            kind="T",
            description="geo_mismatch AND device_new_to_account -> anomalous_location",
            conditions=[Condition("geo_mismatch"), Condition("device_new_to_account")],
            conclusion_key="anomalous_location",
            conclusion_value=True,
            strength=0.75
        ),
    ]

    # CONFIRM (C*): verification/mitigation
    rules += [
        Rule(
            rid="C1",
            kind="C",
            description="User confirms transaction increases trust",
            conditions=[Condition("user_confirmed_transaction", expected_value=True,
                                 question="Was this transaction made by you? (yes/no)")],
            conclusion_key="legit_user_confirmed",
            conclusion_value=True,
            strength=1.00
        ),
        Rule(
            rid="C2",
            kind="C",
            description="OTP/3DS passed increases trust",
            conditions=[Condition("otp_passed", expected_value=True,
                                 question="Did the OTP/3DS verification succeed? (yes/no)")],
            conclusion_key="stepup_trust_gained",
            conclusion_value=True,
            strength=1.00
        ),
        Rule(
            rid="C3",
            kind="C",
            description="User confirms travel explains geo mismatch",
            conditions=[Condition("user_confirmed_travel", expected_value=True,
                                 question="Are you currently traveling? (yes/no)")],
            conclusion_key="geo_mismatch_explained",
            conclusion_value=True,
            strength=0.90
        ),
        Rule(
            rid="C4",
            kind="C",
            description="Card reported stolen is very strong decline signal",
            conditions=[Condition("card_reported_stolen", expected_value=True,
                                 question="Is the card reported stolen? (yes/no)")],
            conclusion_key="stolen_card_suspected",
            conclusion_value=True,
            strength=1.00
        ),
    ]

    # DECIDE (D*): hypotheses -> actions
    rules += [
        Rule(
            rid="D1",
            kind="D",
            description="Strong stolen card -> DECLINE",
            conditions=[Condition("stolen_card_suspected")],
            conclusion_key="action_DECLINE",
            conclusion_value=True,
            strength=0.98
        ),
        Rule(
            rid="D2",
            kind="D",
            description="Bot fraud -> DECLINE",
            conditions=[Condition("bot_fraud_suspected")],
            conclusion_key="action_DECLINE",
            conclusion_value=True,
            strength=0.90
        ),
        Rule(
            rid="D3",
            kind="D",
            description="Account takeover suspected -> STEP_UP_VERIFY",
            conditions=[Condition("account_takeover_suspected")],
            conclusion_key="action_STEP_UP_VERIFY",
            conclusion_value=True,
            strength=0.85
        ),
        Rule(
            rid="D4",
            kind="D",
            description="Fraud suspected but not certain -> MANUAL_REVIEW",
            conditions=[Condition("fraud_suspected")],
            conclusion_key="action_MANUAL_REVIEW",
            conclusion_value=True,
            strength=0.80
        ),
        Rule(
            rid="D5",
            kind="D",
            description="Anomalous location -> STEP_UP_VERIFY",
            conditions=[Condition("anomalous_location")],
            conclusion_key="action_STEP_UP_VERIFY",
            conclusion_value=True,
            strength=0.70
        ),
        # Mitigation/support rules
        Rule(
            rid="D6",
            kind="D",
            description="Legit user confirmed -> APPROVE",
            conditions=[Condition("legit_user_confirmed")],
            conclusion_key="action_APPROVE",
            conclusion_value=True,
            strength=0.90
        ),
        Rule(
            rid="D7",
            kind="D",
            description="Step-up trust gained -> APPROVE",
            conditions=[Condition("stepup_trust_gained")],
            conclusion_key="action_APPROVE",
            conclusion_value=True,
            strength=0.85
        ),
        Rule(
            rid="D8",
            kind="D",
            description="Geo mismatch explained -> APPROVE",
            conditions=[Condition("geo_mismatch_explained")],
            conclusion_key="action_APPROVE",
            conclusion_value=True,
            strength=0.60
        ),
    ]

    return rules


# -------------------------
# Utilities / Demo
# -------------------------

def seed_transaction(wm: WorkingMemory, **kwargs):
    """Seed raw transaction facts; each input is taken as confidence 1.0."""
    for k, v in kwargs.items():
        wm.set(k, v, 1.0, source="input")

def pretty_facts(wm: WorkingMemory) -> str:
    lines = []
    for k, f in sorted(wm.facts.items()):
        lines.append(f"{k} = {f.value} (conf={f.conf:.2f})")
    return "\n".join(lines)

def demo():
    rules = build_rules()
    es = ExpertSystem(rules, action_threshold=0.70)
    wm = WorkingMemory()

    # Your sample transaction
    seed_transaction(
        wm,
        amount=2000,
        ip_risk_score=90,
        device_seen_before=False,
        billing_shipping_same=True,
        account_age_days=2,
        geo_distance_km=2000,
        failed_attempts_1h=0
    )

    print("=== FORWARD CHAIN (DETAILED CALCULATIONS) ===")
    es.forward_chain(wm, verbose=True)

    print("=== FACTS AFTER FORWARD CHAIN ===")
    print(pretty_facts(wm))

    print("\n=== FIRING LOG (COMPACT) ===")
    print("\n".join(es.explanations()))

    print("\n=== ACTION SCORES (NO THRESHOLD) ===")
    print(es.rank_actions_all(wm))  # shows STEP_UP and MANUAL even if < 0.70

    print("\n=== ACTIONS RANKED (>= THRESHOLD 0.70) ===")
    print(es.rank_actions(wm))      # filtered list (may be empty)

    print("\n=== FINAL ACTION (THRESHOLD-BASED) ===")
    final_a, final_c, msg = es.final_action(wm)
    print(final_a, final_c, msg)

    # Mixed chaining question selection uses the no-threshold top2
    top2 = [a for a, _ in es.rank_actions_all(wm)[:2]]
    q = es.choose_best_question(wm, top2)
    print("\n=== BEST NEXT QUESTION (MIXED) ===")
    print(q)

    # Backward chaining example: try to support DECLINE
    bc = es.backward_chain("action_DECLINE", True, wm)
    print("\n=== BACKWARD CHAIN FOR DECLINE ===")
    print(bc)

#if __name__ == "__main__":
 #   demo()
 # --- ADD THIS BELOW YOUR EXISTING CODE (or replace your demo() with this) ---
# It builds ONLY the rules you listed and seeds ONLY the inputs you listed.

from typing import List

def build_rules_user_case() -> List[Rule]:
    rules: List[Rule] = []

    # ===== OBSERVE (O*) =====
    # O2: IF amount > 1500 THEN very_high_amount (0.80)
    rules.append(
        Rule(
            rid="O2",
            kind="O",
            description="IF amount > 1500 THEN very_high_amount",
            conditions=[Condition("amount", predicate=lambda x: x > 1500)],
            conclusion_key="very_high_amount",
            conclusion_value=True,
            strength=0.80,
        )
    )

    # O8: IF device_seen_before = False THEN new_device (0.70)
    rules.append(
        Rule(
            rid="O8",
            kind="O",
            description="IF device_seen_before=False THEN new_device",
            conditions=[Condition("device_seen_before", expected_value=False)],
            conclusion_key="new_device",
            conclusion_value=True,
            strength=0.70,
        )
    )

    # O11: IF ip_risk_score >= 80 THEN high_ip_risk (0.00)  <-- as you wrote it
    # NOTE: If you meant 0.80, change strength=0.80.
    rules.append(
        Rule(
            rid="O11",
            kind="O",
            description="IF ip_risk_score >= 80 THEN high_ip_risk",
            conditions=[Condition("ip_risk_score", predicate=lambda x: x >= 80)],
            conclusion_key="high_ip_risk",
            conclusion_value=True,
            strength=0.00,
        )
    )

    # O13: IF country_mismatch THEN location_mismatch (0.70)
    rules.append(
        Rule(
            rid="O13",
            kind="O",
            description="IF country_mismatch THEN location_mismatch",
            conditions=[Condition("country_mismatch", expected_value=True)],
            conclusion_key="location_mismatch",
            conclusion_value=True,
            strength=0.70,
        )
    )

    # ===== THINK (T*) =====
    # T1: IF location_mismatch AND new_device THEN account_takeover_risk (0.75)
    rules.append(
        Rule(
            rid="T1",
            kind="T",
            description="IF location_mismatch AND new_device THEN account_takeover_risk",
            conditions=[Condition("location_mismatch"), Condition("new_device")],
            conclusion_key="account_takeover_risk",
            conclusion_value=True,
            strength=0.75,
        )
    )

    # ===== DECIDE (D*) =====
    # D2: IF account_takeover_risk THEN STEP_UP_VERIFY_recommended (0.70)
    rules.append(
        Rule(
            rid="D2",
            kind="D",
            description="IF account_takeover_risk THEN STEP_UP_VERIFY_recommended",
            conditions=[Condition("account_takeover_risk")],
            conclusion_key="action_STEP_UP_VERIFY",
            conclusion_value=True,
            strength=0.70,
        )
    )

    # D3: IF payment_fraud_risk AND high_amount THEN MANUAL_REVIEW_recommended (0.70)
    # (Will NOT fire in this scenario unless you also seed payment_fraud_risk and high_amount somehow.)
    rules.append(
        Rule(
            rid="D3",
            kind="D",
            description="IF payment_fraud_risk AND high_amount THEN MANUAL_REVIEW_recommended",
            conditions=[Condition("payment_fraud_risk"), Condition("high_amount")],
            conclusion_key="action_MANUAL_REVIEW",
            conclusion_value=True,
            strength=0.70,
        )
    )

    # D7: IF high_ip_risk AND very_high_amount THEN STEP_UP_VERIFY_recommended (0.55)
    rules.append(
        Rule(
            rid="D7",
            kind="D",
            description="IF high_ip_risk AND very_high_amount THEN STEP_UP_VERIFY_recommended",
            conditions=[Condition("high_ip_risk"), Condition("very_high_amount")],
            conclusion_key="action_STEP_UP_VERIFY",
            conclusion_value=True,
            strength=0.55,
        )
    )

    # ===== CONFIRM (C*) =====
    # C5: IF known_device THEN trusted_device (0.60)
    # (Will NOT fire unless you seed known_device=True)
    rules.append(
        Rule(
            rid="C5",
            kind="C",
            description="IF known_device THEN trusted_device",
            conditions=[Condition("known_device", expected_value=True)],
            conclusion_key="trusted_device",
            conclusion_value=True,
            strength=0.60,
        )
    )

    return rules


def demo_user_case():
    # Threshold = 0.70 (as requested earlier)
    es = ExpertSystem(build_rules_user_case(), action_threshold=0.70)
    wm = WorkingMemory()

    # ===== Seed YOUR input facts with YOUR confidences =====
    wm.set("amount", 2000, 1.0, source="input")
    wm.set("ip_risk_score", 80, 1.0, source="input")
    wm.set("country_mismatch", True, 0.7, source="input")     # "yes" with 0.7 confidence
    wm.set("device_seen_before", False, 1.0, source="input")  # "no" => False, 1.0 confidence

    print("=== FORWARD CHAIN (DETAILED CALCULATIONS) ===")
    es.forward_chain(wm, verbose=True)

    print("=== FACTS AFTER FORWARD CHAIN ===")
    print(pretty_facts(wm))

    print("\n=== FIRING LOG (COMPACT) ===")
    print("\n".join(es.explanations()))

    print("\n=== ACTION SCORES (NO THRESHOLD) ===")
    print(es.rank_actions_all(wm))

    print("\n=== ACTIONS RANKED (>= THRESHOLD 0.70) ===")
    print(es.rank_actions(wm))

    print("\n=== FINAL ACTION (THRESHOLD-BASED) ===")
    final_a, final_c, msg = es.final_action(wm)
    print(final_a, final_c, msg)


if __name__ == "__main__":
    demo_user_case()

=== FORWARD CHAIN (DETAILED CALCULATIONS) ===
O2 (O) IF amount > 1500 THEN very_high_amount
  - cond: amount expected=predicate(...) actual=2000 conf=1.0 status=OK
  - calc: premise=min(1.00)=1.00; inferred=1.00*0.80=0.800
  => infer very_high_amount=True conf=0.800

O8 (O) IF device_seen_before=False THEN new_device
  - cond: device_seen_before expected=False actual=False conf=1.0 status=OK
  - calc: premise=min(1.00)=1.00; inferred=1.00*0.70=0.700
  => infer new_device=True conf=0.700

O11 (O) IF ip_risk_score >= 80 THEN high_ip_risk
  - cond: ip_risk_score expected=predicate(...) actual=80 conf=1.0 status=OK
  - calc: premise=min(1.00)=1.00; inferred=1.00*0.00=0.000
  => infer high_ip_risk=True conf=0.000

O13 (O) IF country_mismatch THEN location_mismatch
  - cond: country_mismatch expected=True actual=True conf=0.7 status=OK
  - calc: premise=min(0.70)=0.70; inferred=0.70*0.70=0.490
  => infer location_mismatch=True conf=0.490

T1 (T) IF location_mismatch AND new_device THEN accou