In [4]:
from dataclasses import dataclass, field
from typing import Dict, List, Tuple
import math

PILLARS = ["T", "X", "F", "S"]  # Trade, Tech, Finance, Strategic

@dataclass
class Event:
    id: str
    p: float                          # probability p_i in [0,1]
    B: Dict[str, int]                 # binary memberships B_ik ∈ {0,1} per pillar
    s: Dict[str, int]                 # signs s_ik ∈ {-1,0,+1} per pillar

    def shares(self) -> Dict[str, float]:
        """Compute C_ik = B_ik / d_i with d_i = sum_k B_ik (and d_i=1 if 0)."""
        d_i = sum(self.B.get(k, 0) for k in PILLARS)
        if d_i == 0:
            d_i = 1
        return {k: (self.B.get(k, 0) / d_i) for k in PILLARS}

    def contribs(self) -> Dict[str, float]:
        """Per-pillar signed contributions: C_ik * s_ik * p_i."""
        C = self.shares()
        return {k: C[k] * self.s.get(k, 0) * self.p for k in PILLARS}

def compute_ai(events: List[Event], weights: Dict[str, float] = None) -> Dict:
    """
    Compute pillar counts N_k, numerators, pillar scores, S, and AI.
    weights w_k: nonnegative, sum to 1. If None, equal weights across pillars.
    """
    if weights is None:
        weights = {k: 1.0 / len(PILLARS) for k in PILLARS}

    # Validate weights
    for k in PILLARS:
        if weights.get(k, 0) < 0:
            raise ValueError(f"Weight for pillar {k} must be nonnegative.")
    sw = sum(weights.get(k, 0.0) for k in PILLARS)
    if not math.isclose(sw, 1.0, rel_tol=1e-9, abs_tol=1e-9):
        raise ValueError("Weights must sum to 1.")

    # Aggregate N_k and numerators
    N = {k: 0.0 for k in PILLARS}
    numer = {k: 0.0 for k in PILLARS}
    for e in events:
        shares = e.shares()
        contribs = e.contribs()
        for k in PILLARS:
            N[k] += shares[k]
            numer[k] += contribs[k]

    # Pillar scores (handle N_k=0 safely → score 0)
    pillar_scores = {k: (numer[k] / N[k]) if N[k] > 0 else 0.0 for k in PILLARS}

    # Index
    S = sum(weights[k] * pillar_scores[k] for k in PILLARS)
    AI = 50 * (1 + S)

    return {
        "N": N,
        "numerators": numer,
        "pillar_scores": pillar_scores,
        "S": S,
        "AI": AI,
        "weights": weights,
    }

# ---------- F1–F10 as agreed ----------

events_data = [
    # id,   p,    B (binary memberships),                    s (signs; West=+1, East=-1, Neutral=0)
    Event("F1", 0.68, {"T":1,"X":0,"F":0,"S":0}, {"T":+1,"X":0,"F":0,"S":0}),
    Event("F2", 0.22, {"T":1,"X":0,"F":0,"S":0}, {"T":-1,"X":0,"F":0,"S":0}),
    Event("F3", 0.09, {"T":0,"X":1,"F":0,"S":0}, {"T": 0,"X":0,"F":0,"S":0}),   # Neutral tilt
    Event("F4", 0.08, {"T":0,"X":1,"F":1,"S":0}, {"T": 0,"X":+1,"F":+1,"S":0}),
    Event("F5", 0.66, {"T":0,"X":0,"F":1,"S":0}, {"T": 0,"X":0,"F":-1,"S":0}),
    Event("F6", 0.64, {"T":1,"X":1,"F":0,"S":0}, {"T":+1,"X":+1,"F":0,"S":0}),
    Event("F7", 0.45, {"T":0,"X":0,"F":0,"S":1}, {"T": 0,"X":0,"F":0,"S":-1}),
    Event("F8", 0.95, {"T":1,"X":0,"F":0,"S":0}, {"T":+1,"X":0,"F":0,"S":0}),
    Event("F9", 0.46, {"T":1,"X":0,"F":0,"S":0}, {"T": -1,"X":0,"F":0,"S":0}),   # Neutral tilt
    Event("F10",0.88, {"T":0,"X":0,"F":0,"S":1}, {"T": 0,"X":0,"F":0,"S":+1}),
]

# Equal weights (0.25 each)
equal_weights = {k: 0.25 for k in PILLARS}

# Run
result = compute_ai(events_data, weights=equal_weights)

# Pretty print
print("Pillar counts N_k:", result["N"])
print("Numerators Σ C_ik s_ik p_i:", result["numerators"])
print("Pillar scores:", result["pillar_scores"])
print("S =", round(result["S"], 6))
print("AI =", round(result["AI"], 4))

# If you want tabular rows of contributions per event, uncomment below:
# for e in events_data:
#     print(e.id, e.contribs())

Pillar counts N_k: {'T': 4.5, 'X': 2.0, 'F': 1.5, 'S': 2.0}
Numerators Σ C_ik s_ik p_i: {'T': 1.27, 'X': 0.36, 'F': -0.62, 'S': 0.43}
Pillar scores: {'T': 0.2822222222222222, 'X': 0.18, 'F': -0.41333333333333333, 'S': 0.215}
S = 0.065972
AI = 53.2986


In [5]:
def clamp01(x): 
    return max(0.0, min(1.0, x))

def perturb_events(events: List[Event], idx: int, delta: float) -> List[Event]:
    out = []
    for j, e in enumerate(events):
        if j == idx:
            out.append(Event(e.id, clamp01(e.p + delta), e.B, e.s))
        else:
            out.append(Event(e.id, e.p, e.B, e.s))
    return out

def sensitivity_table(events: List[Event], weights: Dict[str, float], step: float = 0.10):
    """
    For each forecast i, compute AI with p_i up/down by 'step' (±10pp default).
    Returns list of dicts with dAI_up, dAI_down, and max_abs_dAI.
    """
    base_ai = compute_ai(events, weights)["AI"]
    rows = []
    for i, e in enumerate(events):
        ai_up   = compute_ai(perturb_events(events, i, +step), weights)["AI"]
        ai_down = compute_ai(perturb_events(events, i, -step), weights)["AI"]
        d_up, d_down = ai_up - base_ai, ai_down - base_ai
        rows.append({
            "id": e.id,
            "p": e.p,
            "AI_base": base_ai,
            "AI_up": ai_up, "dAI_up": d_up,
            "AI_down": ai_down, "dAI_down": d_down,
            "max_abs_dAI": max(abs(d_up), abs(d_down)),
            "direction_max": "up" if abs(d_up) >= abs(d_down) else "down",
        })
    # sort by influence
    rows.sort(key=lambda r: r["max_abs_dAI"], reverse=True)
    return rows

# Run it
sens = sensitivity_table(events_data, equal_weights, step=0.10)
for r in sens:
    print(f'{r["id"]}: p={r["p"]:.2f} | dAI_up={r["dAI_up"]:.3f} | '
          f'dAI_down={r["dAI_down"]:.3f} | max|ΔAI|={r["max_abs_dAI"]:.3f} ({r["direction_max"]})')

F5: p=0.66 | dAI_up=-0.833 | dAI_down=0.833 | max|ΔAI|=0.833 (up)
F4: p=0.08 | dAI_up=0.729 | dAI_down=-0.583 | max|ΔAI|=0.729 (up)
F7: p=0.45 | dAI_up=-0.625 | dAI_down=0.625 | max|ΔAI|=0.625 (up)
F10: p=0.88 | dAI_up=0.625 | dAI_down=-0.625 | max|ΔAI|=0.625 (down)
F6: p=0.64 | dAI_up=0.451 | dAI_down=-0.451 | max|ΔAI|=0.451 (down)
F1: p=0.68 | dAI_up=0.278 | dAI_down=-0.278 | max|ΔAI|=0.278 (down)
F2: p=0.22 | dAI_up=-0.278 | dAI_down=0.278 | max|ΔAI|=0.278 (up)
F8: p=0.95 | dAI_up=0.139 | dAI_down=-0.278 | max|ΔAI|=0.278 (down)
F9: p=0.46 | dAI_up=-0.278 | dAI_down=0.278 | max|ΔAI|=0.278 (up)
F3: p=0.09 | dAI_up=0.000 | dAI_down=0.000 | max|ΔAI|=0.000 (up)


In [6]:
# ========= Pillar-weight sensitivity (drop-in) =========

from copy import deepcopy

def compute_pillar_scores(events):
    """Return N_k, numerators, and pillar_scores dicts using current events (independent of weights)."""
    # reuse compute_ai with equal weights just to reuse its aggregation;
    # pillar scores don't depend on weights anyway.
    eq_w = {k: 1.0/len(PILLARS) for k in PILLARS}
    res = compute_ai(events, eq_w)
    return res["N"], res["numerators"], res["pillar_scores"]

def ai_from_weights(pillar_scores, weights):
    """AI = 50 * (1 + sum_k w_k * PillarScore_k)"""
    S = sum(weights[k] * pillar_scores[k] for k in PILLARS)
    return 50.0 * (1.0 + S), S

def weight_gradients(pillar_scores):
    """Closed-form gradients: dAI/dw_k = 50 * PillarScore_k"""
    return {k: 50.0 * pillar_scores[k] for k in PILLARS}

def reallocate(weights, a, b, delta):
    """
    Move 'delta' weight from pillar b -> a (delta can be +/-).
    Returns a NEW weights dict (does not mutate input).
    """
    if a == b:
        raise ValueError("a and b must be different pillars.")
    if a not in PILLARS or b not in PILLARS:
        raise ValueError("Unknown pillar name.")
    w = deepcopy(weights)
    w[a] = w.get(a, 0.0) + delta
    w[b] = w.get(b, 0.0) - delta
    # sanity checks
    if any(w[k] < -1e-12 for k in PILLARS):
        raise ValueError(f"Negative weight produced: {w}")
    s = sum(w[k] for k in PILLARS)
    # renormalize tiny numerical drift if any
    if abs(s - 1.0) > 1e-12:
        for k in PILLARS:
            w[k] /= s
    return w

def scenario_table_weight_moves(pillar_scores, base_weights, moves, delta=0.10):
    """
    Evaluate a list of (from_pillar, to_pillar) moves with size 'delta'.
    Returns sorted list of dicts by AI_new descending.
    """
    AI_base, S_base = ai_from_weights(pillar_scores, base_weights)
    rows = []
    for frm, to in moves:
        w_new = reallocate(base_weights, to, frm, delta)  # move delta from frm -> to
        AI_new, S_new = ai_from_weights(pillar_scores, w_new)
        rows.append({
            "move": f"{frm} -> {to} ({delta:.2f})",
            "S_new": S_new,
            "AI_new": AI_new,
            "dAI": AI_new - AI_base,
            "weights_new": w_new
        })
    # sort by dAI descending
    rows.sort(key=lambda r: r["dAI"], reverse=True)
    return AI_base, S_base, rows

# ----- Run the analysis -----

# 1) Pillar scores at current inputs/signs (F9 Trade, East tilt)
Nk, numer, pscore = compute_pillar_scores(events_data)

print("Pillar counts N_k:", Nk)
print("Numerators:", numer)
print("Pillar scores:", pscore)

# 2) Gradients dAI/dw_k = 50 * PillarScore_k
grads = weight_gradients(pscore)
print("\nWeight gradients (dAI/dw_k):")
for k in PILLARS:
    print(f"  {k}: {grads[k]:.4f}")

# 3) Scenarios: move 10pp of weight between pillars (feel free to add more)
base_w = {k: 0.25 for k in PILLARS}  # equal weights baseline
moves = [
    ("F","T"), ("F","X"), ("F","S"),
    ("T","F"), ("X","F"), ("S","F"),
    # add your own moves here, e.g. ("T","X"), ("S","T"), ...
]
AI_base, S_base, rows = scenario_table_weight_moves(pscore, base_w, moves, delta=0.10)

print(f"\nBaseline: S={S_base:.6f}, AI={AI_base:.3f}")
print("\n10pp reallocation scenarios (others unchanged):")
for r in rows:
    print(f'{r["move"]:8s} | S_new={r["S_new"]:.5f} | AI_new={r["AI_new"]:.3f} | ΔAI={r["dAI"]:+.3f} | w_new={r["weights_new"]}')

Pillar counts N_k: {'T': 4.5, 'X': 2.0, 'F': 1.5, 'S': 2.0}
Numerators: {'T': 1.27, 'X': 0.36, 'F': -0.62, 'S': 0.43}
Pillar scores: {'T': 0.2822222222222222, 'X': 0.18, 'F': -0.41333333333333333, 'S': 0.215}

Weight gradients (dAI/dw_k):
  T: 14.1111
  X: 9.0000
  F: -20.6667
  S: 10.7500

Baseline: S=0.065972, AI=53.299

10pp reallocation scenarios (others unchanged):
F -> T (0.10) | S_new=0.13553 | AI_new=56.776 | ΔAI=+3.478 | w_new={'T': 0.35, 'X': 0.25, 'F': 0.15, 'S': 0.25}
F -> S (0.10) | S_new=0.12881 | AI_new=56.440 | ΔAI=+3.142 | w_new={'T': 0.25, 'X': 0.25, 'F': 0.15, 'S': 0.35}
F -> X (0.10) | S_new=0.12531 | AI_new=56.265 | ΔAI=+2.967 | w_new={'T': 0.25, 'X': 0.35, 'F': 0.15, 'S': 0.25}
X -> F (0.10) | S_new=0.00664 | AI_new=50.332 | ΔAI=-2.967 | w_new={'T': 0.25, 'X': 0.15, 'F': 0.35, 'S': 0.25}
S -> F (0.10) | S_new=0.00314 | AI_new=50.157 | ΔAI=-3.142 | w_new={'T': 0.25, 'X': 0.25, 'F': 0.35, 'S': 0.15}
T -> F (0.10) | S_new=-0.00358 | AI_new=49.821 | ΔAI=-3.478 | w_new