In [1]:
import os

os.environ["GRB_LICENSE_FILE"] = r"C:\Users\tomas\Desktop\gurobi.lic"

import gurobipy as gp
from gurobipy import GRB 


In [2]:
import numpy as np
import gurobipy as gp
from gurobipy import GRB
from dataclasses import dataclass
from typing import Callable, List, Tuple, Optional, Dict, Any


# =========================================================
# Data structures
# =========================================================

@dataclass(frozen=True)
class StandardFormMILP:
    """
    Standard-form-ish MILP container.
    We support mixed senses (>=, <=, =) and mixed variable types.
    Objective: min c^T x
    Constraints: A x (sense) b
    """
    A: np.ndarray                 # shape (m,n)
    b: np.ndarray                 # shape (m,)
    sense: List[str]              # length m, each in {">=", "<=", "="}
    c: np.ndarray                 # shape (n,)
    lb: np.ndarray                # shape (n,)
    ub: np.ndarray                # shape (n,)
    vtype: List[str]              # length n, each in {"C","I","B"}


@dataclass(frozen=True)
class MutableSpec:
    """
    Which parameters are mutable, and their integer delta bounds.

    - b_idx: indices of RHS entries b_i that can change
    - c_idx: indices of objective coefficients c_j that can change
    - A_idx: list of (i,j) indices of A_ij coefficients that can change

    Delta bounds are uniform here for simplicity; you can extend to per-index bounds.
    """
    b_idx: List[int]
    c_idx: List[int]
    A_idx: List[Tuple[int,int]]
    delta_lb: int   # integer lower bound for each delta component
    delta_ub: int   # integer upper bound for each delta component


# =========================================================
# Model builder (standard form -> Gurobi)
# =========================================================

def build_forward_model(data: StandardFormMILP,
                        override: Optional[Dict[str, Any]] = None,
                        output_flag: int = 0):
    """
    Build a fresh Gurobi model from standard-form data.
    override can contain updated A, b, c (numpy arrays).
    Returns (model, x_vars, constrs, obj_expr).
    """
    A = data.A if override is None or "A" not in override else override["A"]
    b = data.b if override is None or "b" not in override else override["b"]
    c = data.c if override is None or "c" not in override else override["c"]

    m, n = A.shape
    model = gp.Model("forward")
    model.Params.OutputFlag = output_flag

    # Variables
    x = []
    for j in range(n):
        vt = data.vtype[j]
        if vt not in ("C", "I", "B"):
            raise ValueError(f"Invalid vtype[{j}]={vt}. Use 'C','I','B'.")
        xj = model.addVar(lb=float(data.lb[j]), ub=float(data.ub[j]),
                          vtype={"C":GRB.CONTINUOUS,"I":GRB.INTEGER,"B":GRB.BINARY}[vt],
                          name=f"x[{j}]")
        x.append(xj)

    # Objective
    obj = gp.quicksum(float(c[j]) * x[j] for j in range(n))
    model.setObjective(obj, GRB.MINIMIZE)

    # Constraints
    constrs = []
    for i in range(m):
        lhs = gp.quicksum(float(A[i,j]) * x[j] for j in range(n))
        s = data.sense[i]
        if s == ">=":
            ci = model.addConstr(lhs >= float(b[i]), name=f"con[{i}]")
        elif s == "<=":
            ci = model.addConstr(lhs <= float(b[i]), name=f"con[{i}]")
        elif s == "=":
            ci = model.addConstr(lhs == float(b[i]), name=f"con[{i}]")
        else:
            raise ValueError(f"Invalid sense[{i}]={s}. Use '>=','<=','='.")
        constrs.append(ci)

    model.optimize()
    return model, x, constrs, obj


# =========================================================
# Apply deltas to (A,b,c)
# =========================================================

def apply_deltas(base: StandardFormMILP,
                 spec: MutableSpec,
                 delta_b: np.ndarray,
                 delta_c: np.ndarray,
                 delta_A: np.ndarray):
    """
    Produces new (A,b,c) = (A0+ΔA, b0+Δb, c0+Δc) based on spec ordering.
    """
    A = base.A.copy()
    b = base.b.copy()
    c = base.c.copy()

    for k, i in enumerate(spec.b_idx):
        b[i] = b[i] + delta_b[k]
    for k, j in enumerate(spec.c_idx):
        c[j] = c[j] + delta_c[k]
    for k, (i,j) in enumerate(spec.A_idx):
        A[i,j] = A[i,j] + delta_A[k]

    return A, b, c


# =========================================================
# Witness check for WCE (existential / optimistic)
# =========================================================

def witness_exists(base: StandardFormMILP,
                   A: np.ndarray, b: np.ndarray, c: np.ndarray,
                   theta: float,
                   desired_space_cb: Callable[[gp.Model, List[gp.Var], List[gp.Constr]], None],
                   tol: float = 1e-6,
                   output_flag: int = 0):
    """
    Returns (True, witness_solution_dict) if exists x in desired space that is LL-optimal:
      feasible for (A,b), satisfies desired constraints, and objective <= theta+tol.
    """
    model, x, constrs, obj = build_forward_model(
        StandardFormMILP(A=A, b=b, sense=base.sense, c=c, lb=base.lb, ub=base.ub, vtype=base.vtype),
        override=None,
        output_flag=output_flag
    )

    # Add desired space constraints
    desired_space_cb(model, x, constrs)

    # Optimality cap
    model.addConstr(obj <= theta + tol, name="optimality_cap")

    model.optimize()
    if model.Status == GRB.OPTIMAL:
        sol = {"obj": model.ObjVal, "x": np.array([v.X for v in x])}
        return True, sol
    return False, None


# =========================================================
# Master over delta variables with no-good cuts (enumerative CCG baseline)
# =========================================================

def build_master(spec: MutableSpec,
                 output_flag: int = 0):
    """
    Master decision variables are integer deltas for all mutable parameters.
    Objective: L1 norm of all deltas.
    Returns (model, delta_vars, abs_vars_plus_minus) for later no-good cuts.
    """
    model = gp.Model("master")
    model.Params.OutputFlag = output_flag

    # Delta variables
    db = [model.addVar(vtype=GRB.INTEGER, lb=spec.delta_lb, ub=spec.delta_ub, name=f"db[{k}]")
          for k in range(len(spec.b_idx))]
    dc = [model.addVar(vtype=GRB.INTEGER, lb=spec.delta_lb, ub=spec.delta_ub, name=f"dc[{k}]")
          for k in range(len(spec.c_idx))]
    dA = [model.addVar(vtype=GRB.INTEGER, lb=spec.delta_lb, ub=spec.delta_ub, name=f"dA[{k}]")
          for k in range(len(spec.A_idx))]

    # L1 linearization: sum |delta|
    def add_abs(v: gp.Var, name: str):
        p = model.addVar(lb=0.0, name=f"{name}_p")
        m = model.addVar(lb=0.0, name=f"{name}_m")
        model.addConstr(v == p - m, name=f"{name}_abs")
        return p + m

    abs_terms = []
    for k, v in enumerate(db):
        abs_terms.append(add_abs(v, f"abs_db[{k}]"))
    for k, v in enumerate(dc):
        abs_terms.append(add_abs(v, f"abs_dc[{k}]"))
    for k, v in enumerate(dA):
        abs_terms.append(add_abs(v, f"abs_dA[{k}]"))

    model.setObjective(gp.quicksum(abs_terms), GRB.MINIMIZE)
    model.update()
    return model, db, dc, dA


def add_no_good_cut(model: gp.Model,
                    vars_list: List[gp.Var],
                    values: List[int],
                    name: str):
    """
    Correct no-good cut for bounded integer variables:
    exclude exactly (vars_list == valuesvalues.

    Enforces: exists k such that v_k <= val_k-1 OR v_k >= val_k+1.
    Implemented via two indicator binaries per component.
    """
    assert len(vars_list) == len(values)

    if len(vars_list) == 0:
        model.addConstr(0 >= 1, name=name)
        return

    diffs = []
    for k, (v, val) in enumerate(zip(vars_list, values)):
        u = model.addVar(vtype=GRB.BINARY, name=f"{name}_u[{k}]")  # v >= val+1
        l = model.addVar(vtype=GRB.BINARY, name=f"{name}_l[{k}]")  # v <= val-1
        model.addConstr(u + l <= 1, name=f"{name}_one_side[{k}]")

        # If u=1 => v >= val+1
        model.addGenConstrIndicator(u, True, v >= val + 1, name=f"{name}_ge[{k}]")
        # If l=1 => v <= val-1
        model.addGenConstrIndicator(l, True, v <= val - 1, name=f"{name}_le[{k}]")

        diffs.append(u + l)

    model.addConstr(gp.quicksum(diffs) >= 1, name=f"{name}_exclude_point")
    model.update()



# =========================================================
# General WCE solver (enumerative CCG baseline)
# =========================================================

def solve_wce_general(base: StandardFormMILP,
                      spec: MutableSpec,
                      desired_space_cb: Callable[[gp.Model, List[gp.Var], List[gp.Constr]], None],
                      max_iter: int = 50,
                      tol: float = 0.0,
                      master_output: int = 0,
                      oracle_output: int = 0):
    """
    General WCE solver:
      min ||ΔΩ||_1 s.t. exists optimal LL solution in desired space.

    Returns dict with best solution found, or None if none found within bounds/iterations.
    """
    master, db_vars, dc_vars, dA_vars = build_master(spec, output_flag=master_output)

    UB = float("inf")
    best = None

    for it in range(max_iter):
        master.optimize()
        if master.Status != GRB.OPTIMAL:
            break

        # Read deltas
        db_val = np.array([int(round(v.X)) for v in db_vars], dtype=int)
        dc_val = np.array([int(round(v.X)) for v in dc_vars], dtype=int)
        dA_val = np.array([int(round(v.X)) for v in dA_vars], dtype=int)
        dist = master.ObjVal

        # Apply deltas
        A_new, b_new, c_new = apply_deltas(base, spec, db_val, dc_val, dA_val)

        # Oracle: solve forward to get theta
        fwd_model, x_vars, constrs, obj_expr = build_forward_model(
            StandardFormMILP(A=A_new, b=b_new, sense=base.sense, c=c_new, lb=base.lb, ub=base.ub, vtype=base.vtype),
            override=None,
            output_flag=oracle_output
        )
        if fwd_model.Status != GRB.OPTIMAL:
            # treat as failure; cut this delta vector
            all_master_vars = db_vars + dc_vars + dA_vars
            all_vals = list(db_val) + list(dc_val) + list(dA_val)
            add_no_good_cut(master, all_master_vars, all_vals, name=f"nogood_infeas[{it}]")
            continue

        theta = fwd_model.ObjVal

        # Witness check
        ok, witness = witness_exists(base, A_new, b_new, c_new, theta, desired_space_cb,
                                     tol=1e-6, output_flag=oracle_output)

        if ok:
            if dist < UB:
                UB = dist
                best = {
                    "dist": UB,
                    "db": db_val, "dc": dc_val, "dA": dA_val,
                    "A": A_new, "b": b_new, "c": c_new,
                    "theta": theta,
                    "witness": witness,
                }
            # Since master is minimizing dist, if we found a witness at current optimum dist,
            # we are done (LB==UB) for this enumerative master.
            if UB - dist <= tol:
                break

        # If witness fails, exclude this delta vector
        all_master_vars = db_vars + dc_vars + dA_vars
        all_vals = list(db_val) + list(dc_val) + list(dA_val)
        add_no_good_cut(master, all_master_vars, all_vals, name=f"nogood[{it}]")

    return best


# =========================================================
# Demo on your toy (now in standard form)
# =========================================================

def demo():
    # Toy variables: x1,x2 continuous, y1,y2 binary
    # Put them into x vector as [x1,x2,y1,y2]
    # Objective: 2x1 + 1x2 + 3y1 + 1y2
    c0 = np.array([2, 1, 3, 1], dtype=float)

    # Constraints:
    # (1) x1 + x2 >= b
    # (2) x1 - 5 y1 <= 0
    # (3) x2 - 5 y2 <= 0
    # Standard form with mixed senses:
    A0 = np.array([
        [1, 1, 0, 0],
        [1, 0, -5, 0],
        [0, 1, 0, -5],
    ], dtype=float)
    sense = [">=", "<=", "<="]

    b0 = np.array([4, 0, 0], dtype=float)

    lb = np.array([0, 0, 0, 0], dtype=float)
    ub = np.array([GRB.INFINITY, GRB.INFINITY, 1, 1], dtype=float)
    vtype = ["C", "C", "B", "B"]

    base = StandardFormMILP(A=A0, b=b0, sense=sense, c=c0, lb=lb, ub=ub, vtype=vtype)

    # Desired space: y1 == 1  (x[2] is y1)
    def desired_cb(model: gp.Model, xvars: List[gp.Var], constrs: List[gp.Constr]):
        model.addConstr(xvars[2] == 1, name="desired_y1_on")
        model.addConstr(xvars[0] <= 1.5, name="desired_x2_limit")

    # Mutable spec: allow changing ONLY the RHS of constraint (1) (b[0]) by integer deltas
    spec = MutableSpec(
        b_idx=[0,1],     # only demand RHS mutable
        c_idx=[],      # no cost changes
        A_idx=[],      # no A changes
        delta_lb=-10,
        delta_ub=+10
    )

    best = solve_wce_general(
        base=base,
        spec=spec,
        desired_space_cb=desired_cb,
        max_iter=50,
        master_output=0,
        oracle_output=0
    )

    if best is None:
        print("No WCE found.")
        return

    print("=== BEST WCE FOUND ===")
    print("L1 dist:", best["dist"])
    print("delta b:", best["db"])
    print("new b:", best["b"])
    print("new A:", best["A"])
    print("new c:", best["c"])
    print("theta:", best["theta"])
    print("witness obj:", best["witness"]["obj"])
    print("witness x:", best["witness"]["x"])


if __name__ == "__main__":
    demo()


Set parameter WLSAccessID
Set parameter WLSSecret
Set parameter LicenseID to value 2653481
Academic license 2653481 - for non-commercial use only - registered to to___@ug.uchile.cl
=== BEST WCE FOUND ===
L1 dist: 1.0
delta b: [ 0 -1]
new b: [ 4. -1.  0.]
new A: [[ 1.  1.  0.  0.]
 [ 1.  0. -5.  0.]
 [ 0.  1.  0. -5.]]
new c: [2. 1. 3. 1.]
theta: 8.0
witness obj: 8.0
witness x: [0. 4. 1. 1.]


In [None]:
def solve_forward_only(base: StandardFormMILP, A: np.ndarray, b: np.ndarray, c: np.ndarray, output_flag: int = 0):
    """
    Solve the forward MILP once and return theta and primal x.
    """
    fwd_model, x_vars, constrs, obj_expr = build_forward_model(
        StandardFormMILP(A=A, b=b, sense=base.sense, c=c, lb=base.lb, ub=base.ub, vtype=base.vtype),
        override=None,
        output_flag=output_flag
    )
    if fwd_model.Status != GRB.OPTIMAL:
        return None, None
    x = np.array([v.X for v in x_vars])
    return fwd_model.ObjVal, x


def compute_l1_dist_from_deltas(db: np.ndarray, dc: np.ndarray, dA: np.ndarray):
    return float(np.sum(np.abs(db)) + np.sum(np.abs(dc)) + np.sum(np.abs(dA)))


def sanity_check_wce_result(base: StandardFormMILP,
                            spec: MutableSpec,
                            desired_space_cb: Callable[[gp.Model, List[gp.Var], List[gp.Constr]], None],
                            best: Dict[str, Any],
                            tol: float = 1e-6,
                            output_flag: int = 0):
    """
    Sanity-check that 'best' is a valid WCE for the forward model:
      1) best['dist'] matches |deltas|_1
      2) theta is the forward optimum under (A,b,c)
      3) witness exists in desired space with obj <= theta + tol
      4) witness objective approximately equals theta (optional check)
    """
    assert best is not None, "best is None (no WCE found)."

    # 1) Check distance
    dist_calc = compute_l1_dist_from_deltas(best["db"], best["dc"], best["dA"])
    print("=== SANITY 1: Distance ===")
    print(f"reported dist = {best['dist']}, computed dist = {dist_calc}")
    if abs(best["dist"] - dist_calc) > 1e-6:
        print("WARNING: dist mismatch. (If master used weighted distance, this is expected.)")

    # 2) Re-solve forward
    print("\n=== SANITY 2: Forward optimality ===")
    theta_star, x_star = solve_forward_only(base, best["A"], best["b"], best["c"], output_flag=output_flag)
    if theta_star is None:
        raise RuntimeError("Forward model at best parameters is not optimal/feasible.")
    print(f"theta(recomputed) = {theta_star:.6f} | theta(stored) = {best['theta']:.6f} | gap = {best['theta'] - theta_star:.3e}")

    # 3) Witness existence
    print("\n=== SANITY 3: Witness exists in desired space at optimum ===")
    ok, wit = witness_exists(
        base=base,
        A=best["A"],
        b=best["b"],
        c=best["c"],
        theta=theta_star,
        desired_space_cb=desired_space_cb,
        tol=tol,
        output_flag=output_flag
    )
    print("witness_exists ?", ok)
    if not ok:
        raise RuntimeError("Sanity failed: no witness exists, so this is NOT a WCE.")

    print(f"witness obj = {wit['obj']:.6f} | theta = {theta_star:.6f} | wit-theta = {wit['obj'] - theta_star:.3e}")

    # 4) Optional: check witness objective approximately equals theta (it should, given your formulation)
    if wit["obj"] > theta_star + 1e-5:
        print("WARNING: witness objective is above theta; check your witness constraint/tolerance.")

    print("\n=== SANITY PASSED: This is a valid Weak Optimality CE ===")
    return {"theta": theta_star, "x_opt": x_star, "witness": wit}


In [None]:
def demo():
    # -------------------------------
    # 1) Define the base MILP
    # -------------------------------
    c0 = np.array([2, 1, 3, 1], dtype=float)

    A0 = np.array([
        [1, 1, 0, 0],
        [1, 0, -5, 0],
        [0, 1, 0, -5],
    ], dtype=float)

    sense = [">=", "<=", "<="]
    b0 = np.array([4, 0, 0], dtype=float)

    lb = np.array([0, 0, 0, 0], dtype=float)
    ub = np.array([GRB.INFINITY, GRB.INFINITY, 1, 1], dtype=float)
    vtype = ["C", "C", "B", "B"]

    base = StandardFormMILP(
        A=A0, b=b0, sense=sense,
        c=c0, lb=lb, ub=ub, vtype=vtype
    )

    # -------------------------------
    # 2) Desired space
    # -------------------------------
    def desired_cb(model, xvars, constrs):
        # y1 == 1  (x[2])
        model.addConstr(xvars[2] == 1, name="desired_y1_on")

    # -------------------------------
    # 3) Mutable specification
    # -------------------------------
    spec = MutableSpec(
        b_idx=[0],     # only demand RHS
        c_idx=[],
        A_idx=[],
        delta_lb=-10,
        delta_ub=10
    )

    # -------------------------------
    # 4) Solve WCE
    # -------------------------------
    best = solve_wce_general(
        base=base,
        spec=spec,
        desired_space_cb=desired_cb,
        max_iter=50,
        master_output=0,
        oracle_output=0
    )

    if best is None:
        print("No WCE found.")
        return

    print("\n=== BEST WCE FOUND ===")
    print("L1 dist:", best["dist"])
    print("delta b:", best["db"])
    print("new b:", best["b"])
    print("theta:", best["theta"])
    print("witness obj:", best["witness"]["obj"])
    print("witness x:", best["witness"]["x"])

    # -------------------------------
    # 5) Sanity check
    # -------------------------------
    print("\n=== RUN SANITY CHECK ===")
    sanity_check_wce_result(
        base=base,
        spec=spec,
        desired_space_cb=desired_cb,
        best=best,
        tol=1e-6,
        output_flag=0
    )


# ✅ ONLY THIS at top level
demo()


# Mixed-Integer Weak-Optimality Counterfactual Explanations (WCE) via Projection / Value-Function Cuts (RHS-only)

We consider a **lower-level MILP** (forward optimization) with continuous dispatch variables and binary commitment variables.  
We want a **Weak-Optimality Counterfactual Explanation (WCE)**: the smallest change to selected parameters such that there exists **at least one lower-level optimal solution** that lies in a *desired space* (e.g., a generator must be ON).

This notebook implements **Option 1**: only the **RHS** vector \(b\) is mutable (demand / limits), i.e.
$$
b(\Delta) = b^0 + \Delta b,
$$
with \(\Delta b\) bounded and chosen to minimize an L1 distance.

---

## 1. Lower-level (forward) MILP

Let the forward MILP be
$$
\theta(\Delta b) \;=\; \min_{x,y} \; c_x^\top x + c_y^\top y
\quad \text{s.t.}\quad
A_x x + A_y y \ge b^0 + \Delta b,\;
x \in X,\; y \in Y,
$$
where
- $x$ are continuous variables (dispatch),
- $y$ are integer/binary variables (commitments),
- only RHS entries in $b$ are mutable.

---

## 2. Weak-optimality counterfactual explanation (WCE)

Given a desired region $\mathcal{D}$ (e.g. $u_1=1$), a **WCE** solves
$$
\min_{\Delta b} \; \|\Delta b\|_1
\quad \text{s.t.}\quad
\exists (x^w,y^w)\in \mathcal{D} \cap \arg\min \{c_x^\top x + c_y^\top y \;:\; A_x x + A_y y \ge b^0 + \Delta b\}.
$$
“Weak optimality” means: **there exists at least one optimal solution** in $\mathcal{D}$ (not necessarily all optimal solutions).

---

## 3. Value-function (projection) formulation for fixed integer pattern

Fix an integer pattern $y\in Y$. The remaining problem in $x$ is an LP:
$$
\phi_y(\Delta b) = c_y^\top y + \min_{x\in X}\{c_x^\top x : A_x x \ge (b^0+\Delta b) - A_y y \}.
$$

By LP strong duality (for fixed $y$), there exists a dual feasible polyhedron $\Pi$ (independent of $\Delta b$ when only RHS is mutable) such that
$$
\phi_y(\Delta b) \;=\; c_y^\top y \;+\; \max_{\pi\in \Pi}\; \pi^\top\big((b^0+\Delta b)-A_y y\big).
$$
Therefore, the epigraph of $\phi_y$ can be outer-approximated by **value-function cuts** (projection cuts):
$$    
\eta_y \;\ge\; c_y^\top y + \pi^\top(b^0-A_y y) + \pi^\top \Delta b,
$$
where $\pi$ is a dual extreme point (or any dual optimal multiplier) for the fixed-$y$ LP.

---

## 4. Master problem (restricted)

We introduce
- witness variables $(x^w,y^w)$ that must satisfy feasibility and desired-space constraints,
- a scalar $\eta$ tied to the witness objective,
- for each tracked competitor pattern $y$, a scalar $\eta_y$ representing $\phi_y(\Delta b)$ via accumulated cuts.

The master solves:
- minimize $\|\Delta b\|_1$,
- enforce witness feasibility and desired constraints,
- enforce $\eta = c_x^\top x^w + c_y^\top y^w$,
- enforce $\eta \le \eta_y$ for tracked patterns $y$,
- enforce value-function cuts for each tracked pattern.

---

## 5. Separation (oracle) and cut generation

Given a master solution $\Delta b$,
1) Solve the full lower-level MILP to get the true optimum $\theta(\Delta b)$ and an optimal integer pattern $y^{sep}$.
2) Check whether there exists an optimal solution in desired space (solve a witness-feasibility model with objective cap $\le \theta(\Delta b)$).
3) If WCE is not satisfied, generate a projection cut:
   - Fix $y=y^{sep}$, solve the continuous LP in $x$,
   - read dual multipliers $\pi^\star$ for the RHS constraints,
   - add the cut
     $$
     \eta_{y^{sep}} \ge c_y^\top y^{sep} + (\pi^\star)^\top(b^0-A_y y^{sep}) + (\pi^\star)^\top \Delta b.
     $$

Repeat until WCE is found (or iteration limit is reached).

---

## 6. Why this is more rigorous than no-good cuts

- No-good cuts eliminate **one** parameter vector $\Delta b$ per iteration.
- Projection/value-function cuts tighten a **convex, piecewise-linear** outer approximation of each $\phi_y(\Delta b)$, cutting off **regions** of $\Delta b$.
- For RHS-only mutability, the cuts are linear and theoretically grounded in LP duality and decomposition.

---


In [72]:
import numpy as np
import gurobipy as gp
from gurobipy import GRB
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional, Callable, Any


In [73]:
@dataclass(frozen=True)
class UCInstance:
    """
    Small UC-like MILP:
      - continuous generation g[i,t] >= 0
      - binary commitment u[i] in {0,1} (shared across time for simplicity)
      - demand constraints per period
      - capacity constraints g[i,t] <= Pmax[i]*u[i]

    We only allow RHS mutability on demand constraints: D_t = D0_t + db_t
    """
    I: int
    T: int
    Pmax: np.ndarray          # (I,)
    gen_cost: np.ndarray      # (I,) variable cost per MWh
    fixed_cost: np.ndarray    # (I,) fixed cost for u_i
    D0: np.ndarray            # (T,) base demand


@dataclass(frozen=True)
class RHSOnlySpec:
    """
    RHS-only mutable demand deltas.
    db_t are integer deltas within [lb,ub] (finite => finite convergence on this grid).
    """
    db_lb: int
    db_ub: int
    l1_weight: float = 1.0


In [74]:
def solve_forward_uc(inst: UCInstance, D: np.ndarray, output_flag: int = 0) -> Dict[str, Any]:
    """
    Solve full lower-level MILP for given demand vector D.
    Returns objective, g, u, and the optimal u-pattern.
    """
    m = gp.Model("uc_forward")
    m.Params.OutputFlag = output_flag

    I, T = inst.I, inst.T
    g = m.addVars(I, T, lb=0.0, vtype=GRB.CONTINUOUS, name="g")
    u = m.addVars(I, vtype=GRB.BINARY, name="u")

    # Objective
    m.setObjective(
        gp.quicksum(inst.gen_cost[i] * g[i,t] for i in range(I) for t in range(T))
        + gp.quicksum(inst.fixed_cost[i] * u[i] for i in range(I)),
        GRB.MINIMIZE
    )

    # Demand constraints
    demand_con = []
    for t in range(T):
        con = m.addConstr(gp.quicksum(g[i,t] for i in range(I)) >= float(D[t]), name=f"demand[{t}]")
        demand_con.append(con)

    # Capacity constraints
    for i in range(I):
        for t in range(T):
            m.addConstr(g[i,t] <= float(inst.Pmax[i]) * u[i], name=f"cap[{i},{t}]")

    m.optimize()
    if m.Status != GRB.OPTIMAL:
        return {"status": m.Status}

    u_pat = tuple(int(round(u[i].X)) for i in range(I))
    return {
        "status": GRB.OPTIMAL,
        "obj": float(m.ObjVal),
        "g": np.array([[g[i,t].X for t in range(T)] for i in range(I)], dtype=float),
        "u": np.array([int(round(u[i].X)) for i in range(I)], dtype=int),
        "u_pat": u_pat
    }


In [75]:
def solve_fixed_u_lp_get_cutdata(inst: UCInstance, D: np.ndarray, u_pat: Tuple[int, ...], output_flag: int = 0):
    """
    Solve the fixed-u LP and return OA cut data:
      Vu(Dk) and pi(Dk) where pi are dual multipliers of demand constraints.

    OA/Benders cut:
        eta_u >= Vu(Dk) + sum_t pi_t(Dk) * (D_t - Dk_t)

    IMPORTANT:
      - demand constraints must be written as sum g >= D  (exactly)
      - we only use Pi of those demand constraints
    """
    I, T = inst.I, inst.T

    m = gp.Model("uc_fixed_u_lp")
    m.Params.OutputFlag = output_flag

    g = m.addVars(I, T, lb=0.0, vtype=GRB.CONTINUOUS, name="g")

    fixed_term = gp.quicksum(float(inst.fixed_cost[i]) * int(u_pat[i]) for i in range(I))
    var_term   = gp.quicksum(float(inst.gen_cost[i]) * g[i, t] for i in range(I) for t in range(T))
    obj = var_term + fixed_term
    m.setObjective(obj, GRB.MINIMIZE)

    # Demand constraints: sum_i g_it >= D_t
    demand_con = []
    for t in range(T):
        con = m.addConstr(gp.quicksum(g[i, t] for i in range(I)) >= float(D[t]), name=f"demand[{t}]")
        demand_con.append(con)

    # Capacity constraints: g_it <= Pmax_i * u_i (fixed)
    for i in range(I):
        for t in range(T):
            m.addConstr(g[i, t] <= float(inst.Pmax[i]) * int(u_pat[i]), name=f"cap[{i},{t}]")

    m.optimize()
    if m.Status != GRB.OPTIMAL:
        return {"status": m.Status}

    Vu = float(m.ObjVal)
    pi = np.array([demand_con[t].Pi for t in range(T)], dtype=float)  # duals w.r.t. demand

    return {"status": GRB.OPTIMAL, "Vu": Vu, "pi": pi, "Dk": np.array(D, dtype=float), "u_pat": u_pat}


In [76]:
def witness_exists_uc(
    inst: UCInstance,
    D: np.ndarray,
    theta: float,
    desired_cb: Callable[[gp.Model, Dict[Tuple[int,int], gp.Var], Dict[int, gp.Var]], None],
    tol: float = 1e-6,
    output_flag: int = 0
) -> Tuple[bool, Optional[Dict[str, Any]]]:
    """
    Check if exists an optimal solution in desired space:
      feasible + desired constraints + objective <= theta + tol.
    """
    m = gp.Model("uc_witness")
    m.Params.OutputFlag = output_flag

    I, T = inst.I, inst.T
    g = m.addVars(I, T, lb=0.0, vtype=GRB.CONTINUOUS, name="g")
    u = m.addVars(I, vtype=GRB.BINARY, name="u")

    obj = (
        gp.quicksum(inst.gen_cost[i] * g[i,t] for i in range(I) for t in range(T))
        + gp.quicksum(inst.fixed_cost[i] * u[i] for i in range(I))
    )
    m.setObjective(obj, GRB.MINIMIZE)

    # Demand constraints
    for t in range(T):
        m.addConstr(gp.quicksum(g[i,t] for i in range(I)) >= float(D[t]), name=f"demand[{t}]")

    # Capacity constraints
    for i in range(I):
        for t in range(T):
            m.addConstr(g[i,t] <= float(inst.Pmax[i]) * u[i], name=f"cap[{i},{t}]")

    # Desired-space constraints
    desired_cb(m, g, u)

    # Weak optimality cap
    m.addConstr(obj <= float(theta) + float(tol), name="opt_cap")

    m.optimize()
    if m.Status == GRB.OPTIMAL:
        return True, {
            "obj": float(m.ObjVal),
            "u": np.array([int(round(u[i].X)) for i in range(I)], dtype=int),
            "g": np.array([[g[i,t].X for t in range(T)] for i in range(I)], dtype=float),
        }
    return False, None


In [77]:
def build_master_uc_rhs_only(
    inst: UCInstance,
    spec: RHSOnlySpec,
    desired_cb: Callable[[gp.Model, Any, Any], None],
    tracked_patterns: List[Tuple[int, ...]],
    cuts_by_pattern: Dict[Tuple[int, ...], List[Dict[str, Any]]],
    output_flag: int = 0
) -> Dict[str, Any]:
    """
    Correct OA Master (RHS-only) for WCE.

    Master variables:
      - db[t] integer (demand shifts)
      - witness vars g_w[i,t], u_w[i]
      - eta[u_pat] (epigraph) for each tracked pattern

    Constraints:
      - witness feasibility at D = D0 + db
      - desired constraints on witness
      - witness_cost <= eta[u_pat] for all tracked u_pat
      - eta[u_pat] >= OA cuts: Vu(Dk) + pi(Dk)^T (D - Dk)

    Objective:
      min sum |db[t]|
    """
    I, T = inst.I, inst.T
    m = gp.Model("master_uc_rhs_only")
    m.Params.OutputFlag = output_flag

    # --------------------------
    # db vars and L1 norm
    # --------------------------
    db = []
    abs_terms = []
    for t in range(T):
        v = m.addVar(vtype=GRB.INTEGER, lb=spec.db_lb, ub=spec.db_ub, name=f"db[{t}]")
        db.append(v)
        p = m.addVar(lb=0.0, name=f"abs_db_p[{t}]")
        n = m.addVar(lb=0.0, name=f"abs_db_n[{t}]")
        m.addConstr(v == p - n, name=f"abs_link[{t}]")
        abs_terms.append(p + n)

    D_expr = [float(inst.D0[t]) + db[t] for t in range(T)]

    # --------------------------
    # Witness variables
    # --------------------------
    g_w = m.addVars(I, T, lb=0.0, vtype=GRB.CONTINUOUS, name="g_w")
    u_w = m.addVars(I, vtype=GRB.BINARY, name="u_w")

    # Witness feasibility
    for t in range(T):
        m.addConstr(gp.quicksum(g_w[i, t] for i in range(I)) >= D_expr[t], name=f"w_demand[{t}]")
    for i in range(I):
        for t in range(T):
            m.addConstr(g_w[i, t] <= float(inst.Pmax[i]) * u_w[i], name=f"w_cap[{i},{t}]")

    # Desired constraints
    desired_cb(m, g_w, u_w)

    # Witness cost
    witness_cost = (
        gp.quicksum(float(inst.gen_cost[i]) * g_w[i, t] for i in range(I) for t in range(T)) +
        gp.quicksum(float(inst.fixed_cost[i]) * u_w[i] for i in range(I))
    )

    # --------------------------
    # Epigraph variables eta[u] + OA cuts
    # --------------------------
    eta = {}
    for u_pat in tracked_patterns:
        key = "".join(map(str, u_pat))
        eta_u = m.addVar(lb=-GRB.INFINITY, name=f"eta[{key}]")
        eta[u_pat] = eta_u

        # Witness must be no more expensive than value of each tracked competitor pattern
        m.addConstr(witness_cost <= eta_u, name=f"w_le_eta[{key}]")

        # OA cuts for this pattern
        for k, cut in enumerate(cuts_by_pattern.get(u_pat, [])):
            Vu_k = float(cut["Vu"])
            pi_k = np.array(cut["pi"], dtype=float)
            Dk   = np.array(cut["Dk"], dtype=float)

            rhs = Vu_k + gp.quicksum(float(pi_k[t]) * (D_expr[t] - float(Dk[t])) for t in range(T))
            m.addConstr(eta_u >= rhs, name=f"oa_cut[{key},{k}]")

        # IMPORTANT: if there are no cuts yet, eta is unbounded below and master becomes too weak.
        # To prevent that, add a trivial lower bound cut at D0 if missing in caller logic.
        if len(cuts_by_pattern.get(u_pat, [])) == 0:
            # conservative: eta_u >= -1e9
            m.addConstr(eta_u >= -1e9, name=f"eta_lb_guard[{key}]")

    m.setObjective(float(spec.l1_weight) * gp.quicksum(abs_terms), GRB.MINIMIZE)
    m.update()

    return {"model": m, "db": db, "D_expr": D_expr}


In [78]:
def check_oa_cut_validity(inst, u_pat, cut, D_test_list, output_flag=0, eps=1e-7):
    """
    Verifies that the OA cut:
        eta >= Vu(Dk) + pi(Dk)^T (D - Dk)
    is a LOWER bound on V_u(D) for multiple D points.

    For each D_test:
      compute V_u(D_test) by solving fixed-u LP
      compute RHS of cut at D_test
      assert V_u(D_test) >= RHS - eps
    """
    Dk = cut["Dk"]
    Vu_k = cut["Vu"]
    pi_k = cut["pi"]

    for D in D_test_list:
        lp = solve_fixed_u_lp_get_cutdata(inst, D, u_pat, output_flag=output_flag)
        if lp["status"] != GRB.OPTIMAL:
            continue
        Vu_D = lp["Vu"]
        rhs = Vu_k + float(np.dot(pi_k, (np.array(D, dtype=float) - np.array(Dk, dtype=float))))
        if Vu_D + eps < rhs:
            raise RuntimeError(
                f"OA cut INVALID for u={u_pat}\n"
                f"Dk={Dk}, Vu(Dk)={Vu_k}, pi={pi_k}\n"
                f"At D={D}: Vu(D)={Vu_D} < RHS={rhs}\n"
                f"This indicates the cut sign/sense is wrong (dual convention mismatch)."
            )


In [79]:
def wce_projection_oa_rhs_only_uc(
    inst: UCInstance,
    spec: RHSOnlySpec,
    desired_cb: Callable[[gp.Model, Any, Any], None],
    max_iter: int = 50,
    tol: float = 1e-6,
    master_output: int = 0,
    oracle_output: int = 0
) -> Optional[Dict[str, Any]]:
    """
    OA / projection algorithm for RHS-only WCE in the UC-like toy.

    Key invariant:
      tracked_patterns only contains patterns for which we have at least one valid OA cut.
    """
    I, T = inst.I, inst.T

    tracked_patterns: List[Tuple[int, ...]] = []
    cuts_by_pattern: Dict[Tuple[int, ...], List[Dict[str, Any]]] = {}

    # --------------------------
    # Init at D0
    # --------------------------
    D0 = np.array(inst.D0, dtype=float)
    fwd0 = solve_forward_uc(inst, D0, output_flag=oracle_output)
    if fwd0.get("status") != GRB.OPTIMAL:
        return None

    u0 = fwd0["u_pat"]

    # add pattern + first cut
    cut0 = solve_fixed_u_lp_get_cutdata(inst, D0, u0, output_flag=oracle_output)
    if cut0.get("status") != GRB.OPTIMAL:
        # If we cannot generate a cut, OA cannot proceed rigorously.
        return None

    tracked_patterns.append(u0)
    cuts_by_pattern[u0] = [cut0]

    # --------------------------
    # OA loop
    # --------------------------
    for it in range(max_iter):
        # 1) solve master
        pack = build_master_uc_rhs_only(
            inst=inst,
            spec=spec,
            desired_cb=desired_cb,
            tracked_patterns=tracked_patterns,
            cuts_by_pattern=cuts_by_pattern,
            output_flag=master_output
        )
        m = pack["model"]
        m.optimize()
        if m.Status != GRB.OPTIMAL:
            return None

        db_val = np.array([int(round(v.X)) for v in pack["db"]], dtype=int)
        D = np.array(inst.D0, dtype=float) + db_val
        dist_l1 = float(m.ObjVal)

        # 2) oracle: true forward solve at D
        fwd = solve_forward_uc(inst, D, output_flag=oracle_output)
        if fwd.get("status") != GRB.OPTIMAL:
            # shouldn’t happen often here, but skip if it does
            continue

        theta = float(fwd["obj"])
        u_hat = fwd["u_pat"]

        # 3) witness existence (weak optimality)
        ok, wit = witness_exists_uc(inst, D, theta, desired_cb, tol=tol, output_flag=oracle_output)
        if ok:
            return {
                "iterations": it + 1,
                "dist_l1": dist_l1,
                "db": db_val,
                "D": D,
                "theta": theta,
                "forward": fwd,
                "witness": wit,
                "tracked_patterns": list(tracked_patterns),
            }

        # 4) add a cut for u_hat at this D
        new_cut = solve_fixed_u_lp_get_cutdata(inst, D, u_hat, output_flag=oracle_output)
        if new_cut.get("status") != GRB.OPTIMAL:
            # If fixed-u LP infeasible, then V_u(D)=+inf; that pattern is irrelevant at this D,
            # but since u_hat came from a feasible forward MILP, fixed-u LP SHOULD be feasible.
            # If it's not, there is a bug in fixed-u LP formulation.
            continue

        if u_hat not in cuts_by_pattern:
            tracked_patterns.append(u_hat)
            cuts_by_pattern[u_hat] = [new_cut]
        else:
            # avoid exact duplicates
            duplicate = False
            for old in cuts_by_pattern[u_hat]:
                if np.max(np.abs(np.array(old["Dk"]) - np.array(new_cut["Dk"]))) <= 1e-12:
                    duplicate = True
                    break
            if not duplicate:
                cuts_by_pattern[u_hat].append(new_cut)

    return None


In [80]:
def brute_scan_wce(inst, desired_cb, db_lb, db_ub, output_flag=0):
    best = None
    for d1 in range(db_lb, db_ub+1):
        for d2 in range(db_lb, db_ub+1):
            db = np.array([d1, d2], dtype=int)
            D = inst.D0 + db
            fw = solve_forward_uc(inst, D, output_flag=output_flag)
            if fw.get("status") != GRB.OPTIMAL:
                continue
            theta = fw["obj"]
            ok, wit = witness_exists_uc(inst, D, theta, desired_cb, tol=1e-6, output_flag=output_flag)
            if ok:
                dist = abs(d1) + abs(d2)
                if (best is None) or (dist < best["dist"]):
                    best = {"db": db, "D": D, "dist": dist, "theta": theta, "wit": wit, "fw": fw}
    return best


In [81]:
def sanity_check_best_uc(inst: UCInstance, desired_cb, best: Dict[str, Any], tol: float = 1e-6, output_flag: int = 0):
    if best is None:
        raise ValueError("best is None")

    D = best["D"]
    # recompute forward
    fw = solve_forward_uc(inst, D, output_flag=output_flag)
    if fw.get("status") != GRB.OPTIMAL:
        raise RuntimeError("Sanity: forward not optimal")

    theta = fw["obj"]
    # witness existence at theta
    ok, _ = witness_exists_uc(inst, D, theta, desired_cb, tol=tol, output_flag=output_flag)
    if not ok:
        raise RuntimeError("Sanity: witness does not exist at recomputed theta")

    print("Sanity check passed.")
    print(f"theta={theta:.6f}, stored theta={best['theta']:.6f}, gap={best['theta']-theta:+.2e}")


In [82]:
def demo():
    # -------------------------------
    # 1) Define the base MILP (UC-like)
    # -------------------------------
    I, T = 3, 2
    Pmax = np.array([6.0, 6.0, 6.0])

    gen_cost = np.array([2.0, 2.1, 6.0])
    fixed_cost = np.array([3.0, 1.0, 0.0])

    D0 = np.array([6.0, 6.0])
    base = UCInstance(I=I, T=T, Pmax=Pmax, gen_cost=gen_cost, fixed_cost=fixed_cost, D0=D0)

    # -------------------------------
    # 2) Desired space
    # -------------------------------
    def desired_cb(model, gvars, uvars):
        model.addConstr(uvars[0] == 1, name="desired_u1_on")


    # -------------------------------
    # 3) Mutable specification (RHS-only)
    # -------------------------------
    spec = RHSOnlySpec(
        db_lb=-10,
        db_ub=10,
        l1_weight=1.0
    )

    # -------------------------------
    # 3.5) Brute existence sanity scan
    # -------------------------------
    brute = brute_scan_wce(base, desired_cb, db_lb=-10, db_ub=10, output_flag=0)
    print("\n=== BRUTE SCAN (existence check) ===")
    if brute is None:
        print("No WCE exists within the db bounds (so OA cannot find one).")
        return
    print("Found WCE by brute force:")
    print("dist:", brute["dist"], "db:", brute["db"], "D:", brute["D"])
    print("forward u:", brute["fw"]["u"], "theta:", brute["theta"])
    print("witness u:", brute["wit"]["u"], "witness obj:", brute["wit"]["obj"])

    # -------------------------------
    # 4) Solve WCE (Projection / OA)
    # -------------------------------
    best = wce_projection_oa_rhs_only_uc(
        inst=base,
        spec=spec,
        desired_cb=desired_cb,
        max_iter=50,
        tol=1e-6,
        master_output=0,
        oracle_output=0
    )
    print("best from OA:", best)
    if best is None:
        print("No WCE found (OA failed) — usually indicates cut implementation is incorrect.")
        return

    print("\n=== BEST WCE FOUND (OA) ===")
    print("iterations:", best["iterations"])
    print("L1 dist:", best["dist_l1"])
    print("db:", best["db"], "=> D:", best["D"])
    print("theta:", best["theta"])
    print("forward u:", best["forward"]["u"], "pattern:", best["forward"]["u_pat"])
    print("witness u:", best["witness"]["u"])
    print("witness obj:", best["witness"]["obj"])

    # -------------------------------
    # 5) Sanity check
    # -------------------------------
    print("\n=== RUN SANITY CHECK ===")
    sanity_check_best_uc(base, desired_cb, best, tol=1e-6, output_flag=0)


# ✅ ONLY THIS at top level
demo()



=== BRUTE SCAN (existence check) ===
Found WCE by brute force:
dist: 1 db: [0 1] D: [6. 7.]
forward u: [1 1 1] theta: 30.1
witness u: [1 1 1] witness obj: 30.1
best from OA: None
No WCE found (OA failed) — usually indicates cut implementation is incorrect.


In [83]:
D_test = np.array([6.0, 7.0])
I, T = 3, 2
Pmax = np.array([6.0, 6.0, 6.0])

gen_cost = np.array([2.0, 2.1, 6.0])
fixed_cost = np.array([3.0, 1.0, 0.0])

D0 = np.array([6.0, 7.0])
base = UCInstance(I=I, T=T, Pmax=Pmax, gen_cost=gen_cost, fixed_cost=fixed_cost, D0=D0)
def desired_cb(model, gvars, uvars):
    model.addConstr(uvars[0] == 1, name="desired_u1_on")

fw = solve_forward_uc(base, D_test, output_flag=1)



theta = fw["obj"]
ok, wit = witness_exists_uc(base, D_test, theta, desired_cb, tol=1e-6, output_flag=1)

print("Witness exists?", ok)
if ok:
    print("Witness obj:", wit["obj"])
    print("Witness u:", wit["u"])

print("FW status:", fw["status"])
print("FW obj:", fw["obj"])
print("FW u:", fw["u"], "u_pat:", fw["u_pat"])


Set parameter OutputFlag to value 1
Gurobi Optimizer version 13.0.0 build v13.0.0rc1 (win64 - Windows 11+.0 (26200.2))

CPU model: AMD Ryzen 7 5800H with Radeon Graphics, instruction set [SSE2|AVX|AVX2]
Thread count: 8 physical cores, 16 logical processors, using up to 16 threads

Academic license 2653481 - for non-commercial use only - registered to to___@ug.uchile.cl
Optimize a model with 8 rows, 9 columns and 18 nonzeros (Min)
Model fingerprint: 0xd684668a
Model has 8 linear objective coefficients
Variable types: 6 continuous, 3 integer (3 binary)
Coefficient statistics:
  Matrix range     [1e+00, 6e+00]
  Objective range  [1e+00, 6e+00]
  Bounds range     [1e+00, 1e+00]
  RHS range        [6e+00, 7e+00]
Presolve removed 2 rows and 1 columns
Presolve time: 0.00s
Presolved: 6 rows, 8 columns, 14 nonzeros
Variable types: 0 continuous, 8 integer (2 binary)
Found heuristic solution: objective 30.1000000

Root relaxation: objective 2.860000e+01, 5 iterations, 0.00 seconds (0.00 work unit

In [87]:
from itertools import product
all_patterns = [tuple(p) for p in product([0,1],[0,1],[0,1])]
print("patterns:", all_patterns)
spec = RHSOnlySpec(db_lb=-10, db_ub=10, l1_weight=1.0)


patterns: [(0, 0, 0), (0, 0, 1), (0, 1, 0), (0, 1, 1), (1, 0, 0), (1, 0, 1), (1, 1, 0), (1, 1, 1)]


In [88]:
best = wce_projection_oa_rhs_only_uc(
    inst=base,
    spec=spec,
    desired_cb=desired_cb,
    max_iter=50,
    tol=1e-6,
    master_output=1,
    oracle_output=0
)
print(best)

Set parameter OutputFlag to value 1
Gurobi Optimizer version 13.0.0 build v13.0.0rc1 (win64 - Windows 11+.0 (26200.2))

CPU model: AMD Ryzen 7 5800H with Radeon Graphics, instruction set [SSE2|AVX|AVX2]
Thread count: 8 physical cores, 16 logical processors, using up to 16 threads

Academic license 2653481 - for non-commercial use only - registered to to___@ug.uchile.cl
Optimize a model with 13 rows, 16 columns and 39 nonzeros (Min)
Model fingerprint: 0x44f9727f
Model has 4 linear objective coefficients
Variable types: 11 continuous, 5 integer (3 binary)
Coefficient statistics:
  Matrix range     [1e+00, 6e+00]
  Objective range  [1e+00, 1e+00]
  Bounds range     [1e+00, 1e+01]
  RHS range        [1e+00, 3e+01]
Presolve removed 13 rows and 16 columns
Presolve time: 0.00s
Presolve: All rows and columns removed

Explored 0 nodes (0 simplex iterations) in 0.01 seconds (0.00 work units)
Thread count was 1 (of 16 available processors)

Solution count 1: 0 

Optimal solution found (tolerance 