# Hybrid quantum-classical battery arbitrage (QUBO + classical repair)
We keep the physical limits and price-driven objective from the MILP and push only the binary on/off charging decisions to a QUBO that can be sampled with QAOA/annealing. Continuous pieces (state of charge, feasibility repairs, and final scoring) stay classical.

## Problem recap
- 24 hourly prices \(p_{t}\); wind revenue is a constant.
- Battery: \(E^{\max}=16\) MWh, \(P^{\max}_{ch}=5\) MW, \(P^{\max}_{dis}=4\) MW, \(\eta_{ch}=0.8\), \(\eta_{dis}=1.0\), start/end SOC = 0.
- Cycle budget proxy: \(\sum_{t} d_{t} \le 32\) MWh (\(\le 2\) equivalent full discharges).
- Binary actions per hour: charge (`c_t`), discharge (`d_t`), or idle (`i_t`). We fix power at the limits (5/4 MW) so SOC is a deterministic linear function of the action string.\
This makes the model binary-only with linear constraints; it can be converted automatically to a QUBO.

In [3]:
import pandas as pd
from pathlib import Path

# Load price data
# Works whether run from repo root or test/
data_path = Path('input_data.csv')
if not data_path.exists():
    data_path = Path.cwd().parent / 'input_data.csv'

df = pd.read_csv(data_path)
hours = df['hour'].astype(int).tolist()
prices = df.set_index('hour')['price'].to_dict()

# Parameters (same as MILP)
Pch = 5.0
Pdis = 4.0
Emax = 16.0
eta_ch = 0.8
eta_dis = 1.0
max_cycles = 2  # EFC proxy


## Hybrid design
1) **Binary schedule as QUBO**: use two binaries per hour (`charge_t`, `dis_t`) with constraint `charge_t + dis_t <= 1`. Power is fixed at the limits.\
2) **Linear SOC constraints stay classical**: cumulative SOC after hour t is \(\sum_{\tau \le t} (\eta_{ch} P_{ch} c_{\tau} - P_{dis}/\eta_{dis} \; d_{\tau})\). We impose 0 <= SOC <= Emax for every t and SOC_{24}=0 as linear constraints. Cycle budget is another linear inequality.\
3) **Convert to QUBO**: the binary linear program is converted with penalty terms (Qiskit `QuadraticProgramToQubo`). The objective is price arbitrage \(\sum_{t} p_{t}(P_{dis} d_{t} - P_{ch} c_{t})\).\
4) **Quantum sampler**: solve the QUBO with QAOA/annealing to get candidate bitstrings.\
5) **Classical repair & scoring**: map bitstrings to SOC trajectory, drop infeasible ones, optionally greedily fix a few hours, and evaluate objective exactly. Use MILP as a verifier/benchmark if desired.

In [8]:
from qiskit_optimization import QuadraticProgram
from qiskit_optimization.converters import QuadraticProgramToQubo

# Optional solvers (commented to avoid execution if not installed)
from qiskit_algorithms import QAOA
from qiskit_algorithms.optimizers import COBYLA
from qiskit_optimization.algorithms import MinimumEigenOptimizer
#from qiskit.primitives import Sampler

def build_arbitrage_qp(prices, hours, *, Pch, Pdis, Emax, eta_ch, eta_dis, max_cycles):
    qp = QuadraticProgram(name='battery_arbitrage_qubo_ready')
    # Decision binaries and exclusivity
    for t in hours:
        qp.binary_var(name=f'charge_{t}')
        qp.binary_var(name=f'dis_{t}')
        qp.linear_constraint(linear={f'charge_{t}': 1, f'dis_{t}': 1}, sense='<=', rhs=1, name=f'mode_limit_{t}')

    # SOC running-sum constraints (prefix sums, all linear)
    for t in hours:
        coeffs = {}
        for tau in hours:
            if tau <= t:
                coeffs[f'charge_{tau}'] = coeffs.get(f'charge_{tau}', 0.0) + eta_ch * Pch
                coeffs[f'dis_{tau}'] = coeffs.get(f'dis_{tau}', 0.0) - Pdis / eta_dis
        qp.linear_constraint(linear=coeffs, sense='<=', rhs=Emax, name=f'soc_max_{t}')
        qp.linear_constraint(linear=coeffs, sense='>=', rhs=0.0, name=f'soc_min_{t}')

    # Terminal SOC = 0
    coeffs_terminal = {}
    for tau in hours:
        coeffs_terminal[f'charge_{tau}'] = coeffs_terminal.get(f'charge_{tau}', 0.0) + eta_ch * Pch
        coeffs_terminal[f'dis_{tau}'] = coeffs_terminal.get(f'dis_{tau}', 0.0) - Pdis / eta_dis
    qp.linear_constraint(linear=coeffs_terminal, sense='==', rhs=0.0, name='soc_end_zero')

    # Cycle/EFC budget
    qp.linear_constraint(linear={f'dis_{t}': Pdis for t in hours}, sense='<=', rhs=max_cycles * Emax, name='cycle_budget')

    # Objective: price-weighted arbitrage (maximize)
    linear_obj = {f'dis_{t}': prices[t] * Pdis for t in hours}
    linear_obj.update({f'charge_{t}': -prices[t] * Pch for t in hours})
    qp.maximize(linear=linear_obj)
    return qp

qp = build_arbitrage_qp(prices, hours, Pch=Pch, Pdis=Pdis, Emax=Emax, eta_ch=eta_ch, eta_dis=eta_dis, max_cycles=max_cycles)
print(qp.prettyprint())

# Convert to QUBO (penalty can be tuned; leave None for automatic choice)
converter = QuadraticProgramToQubo(penalty=None)
qubo = converter.convert(qp)
print(qubo)

# Example solver wiring (commented out to avoid execution):
# sampler = Sampler()
# qaoa = QAOA(sampler=sampler, reps=2, optimizer=COBYLA(maxiter=50))
# meo = MinimumEigenOptimizer(qaoa)
# result = meo.solve(qubo)
# print('Best candidate objective:', result.fval)
# print(result.variables_dict)


Problem name: battery_arbitrage_qubo_ready

Maximize
  -444.79999999999995*charge_1 - 354.45*charge_10 - 170.7*charge_11
  - 99.85*charge_12 - 82.44999999999999*charge_13 - 68.60000000000001*charge_14
  - 93.10000000000001*charge_15 - 168.25*charge_16 - 315*charge_17
  - 551.85*charge_18 - 625.15*charge_19 - 419.09999999999997*charge_2
  - 738*charge_20 - 757.0999999999999*charge_21 - 652.45*charge_22
  - 511.4*charge_23 - 459.25*charge_24 - 415*charge_3 - 412.8*charge_4
  - 414.09999999999997*charge_5 - 430.05*charge_6 - 516.05*charge_7
  - 665.45*charge_8 - 568.65*charge_9 + 355.84*dis_1 + 283.56*dis_10
  + 136.56*dis_11 + 79.88*dis_12 + 65.96*dis_13 + 54.88*dis_14 + 74.48*dis_15
  + 134.6*dis_16 + 252*dis_17 + 441.48*dis_18 + 500.12*dis_19 + 335.28*dis_2
  + 590.4*dis_20 + 605.68*dis_21 + 521.96*dis_22 + 409.12*dis_23 + 367.4*dis_24
  + 332*dis_3 + 330.24*dis_4 + 331.28*dis_5 + 344.04*dis_6 + 412.84*dis_7
  + 532.36*dis_8 + 454.92*dis_9

Subject to
  Linear constraints (74)
    char

In [9]:
print("QUBO binaries =", len(qubo.variables))


QUBO binaries = 350


In [11]:
"""
QAOA for the simplest discrete skeleton:
- exactly ONE full 4h charge block at max power (5 MWh/h)
- exactly ONE full 4h discharge block at max power (4 MWh/h)
- discharge must start AFTER charge finishes (j >= i+4)

Decision variables (one-hot):
  u_i = 1  if charge block starts at hour i  (i = 1..21)
  v_j = 1  if discharge block starts at hour j (j = 1..21)

Objective (maximize profit) turned into QUBO minimization:
  min  sum_i C_i u_i  - sum_j R_j v_j
      + A( sum_i u_i - 1 )^2
      + A( sum_j v_j - 1 )^2
      + B * sum_{i} sum_{j < i+4} u_i v_j

Requires:
  pip install qiskit qiskit-algorithms qiskit-optimization pandas numpy
"""

import numpy as np
import pandas as pd

from qiskit.primitives import StatevectorSampler as Sampler
from qiskit_algorithms import QAOA
from qiskit_algorithms.optimizers import COBYLA
from qiskit_optimization import QuadraticProgram
from qiskit_optimization.algorithms import MinimumEigenOptimizer


def build_qubo_from_prices(prices: np.ndarray, A: float | None = None, B: float | None = None):
    """
    Build a pure QUBO QuadraticProgram for the single full-cycle skeleton.
    prices: length-24 array, p_t in EUR/MWh for t=1..24
    """
    assert len(prices) == 24
    # feasible start times for a 4-hour block: 1..21
    starts = np.arange(1, 22)  # 1..21
    n = len(starts)

    # Precompute block costs/revenues
    # Charge cost if start i: C_i = 5 * sum_{t=i..i+3} p_t
    # Discharge revenue if start j: R_j = 4 * sum_{t=j..j+3} p_t
    C = np.array([5.0 * prices[i - 1 : i - 1 + 4].sum() for i in starts])
    R = np.array([4.0 * prices[j - 1 : j - 1 + 4].sum() for j in starts])

    # Choose penalty magnitudes if not provided
    scale = float(max(C.max(), R.max()))
    if A is None:
        A = 10.0 * scale  # strong enough to enforce one-hot
    if B is None:
        B = 10.0 * scale  # strong enough to enforce ordering

    qp = QuadraticProgram("battery_single_cycle_skeleton")

    # Variables: u_1..u_21, v_1..v_21
    u_names = [f"u_{i}" for i in starts]
    v_names = [f"v_{j}" for j in starts]
    for name in u_names + v_names:
        qp.binary_var(name)

    linear = {}
    quadratic = {}
    constant = 0.0

    def add_linear(var, coeff):
        linear[var] = linear.get(var, 0.0) + float(coeff)

    def add_quadratic(var1, var2, coeff):
        # Canonicalize key ordering for consistency
        a, b = (var1, var2) if var1 <= var2 else (var2, var1)
        quadratic[(a, b)] = quadratic.get((a, b), 0.0) + float(coeff)

    # Economic term: min sum C_i u_i - sum R_j v_j
    for k, i in enumerate(starts):
        add_linear(f"u_{i}", C[k])
    for k, j in enumerate(starts):
        add_linear(f"v_{j}", -R[k])

    # One-hot penalty for u: A( sum u - 1 )^2
    # Expands to: A( -sum u + 2*sum_{i<k} u_i u_k + 1 )
    constant += A
    for i in starts:
        add_linear(f"u_{i}", -A)
    for a in range(n):
        for b in range(a + 1, n):
            add_quadratic(f"u_{starts[a]}", f"u_{starts[b]}", 2.0 * A)

    # One-hot penalty for v: A( sum v - 1 )^2
    constant += A
    for j in starts:
        add_linear(f"v_{j}", -A)
    for a in range(n):
        for b in range(a + 1, n):
            add_quadratic(f"v_{starts[a]}", f"v_{starts[b]}", 2.0 * A)

    # Ordering penalty: forbid discharge starting before charge finishes
    # If charge starts at i, charge occupies i..i+3, so discharge must start at j >= i+4.
    # Penalize illegal pairs (j < i+4):  B * u_i * v_j
    for i in starts:
        for j in starts:
            if j < i + 4:
                add_quadratic(f"u_{i}", f"v_{j}", B)

    qp.minimize(constant=constant, linear=linear, quadratic=quadratic)

    return qp, starts, C, R, A, B


def decode_solution(result, starts):
    """Extract chosen (i,j) from the optimizer result."""
    x = result.x  # solution vector in the order qp.variables
    var_names = [v.name for v in result.variables]
    sol = dict(zip(var_names, x))

    # pick argmax among u_i and v_j (should be exactly 1 if penalties worked)
    u_vals = np.array([sol[f"u_{i}"] for i in starts])
    v_vals = np.array([sol[f"v_{j}"] for j in starts])

    i_star = int(starts[u_vals.argmax()])
    j_star = int(starts[v_vals.argmax()])
    return i_star, j_star, sol


def main():
    data_path = Path('input_data.csv')
    if not data_path.exists():
        data_path = Path.cwd().parent / 'input_data.csv'
    df = pd.read_csv(data_path)

    prices = df["price"].to_numpy()

    qp, starts, C, R, A, B = build_qubo_from_prices(prices, A=None, B=None)

    # QAOA setup (statevector simulation)
    sampler = Sampler(default_shots=4096)
    optimizer = COBYLA(maxiter=250)
    qaoa = QAOA(sampler=sampler, optimizer=optimizer, reps=2)

    # Solve QUBO with QAOA
    meo = MinimumEigenOptimizer(qaoa)
    result = meo.solve(qp)

    i_star, j_star, sol = decode_solution(result, starts)

    # Compute profit of the decoded schedule (in EUR)
    # profit = discharge revenue - charge cost
    Ci = 5.0 * prices[i_star - 1 : i_star - 1 + 4].sum()
    Rj = 4.0 * prices[j_star - 1 : j_star - 1 + 4].sum()
    profit = Rj - Ci

    print("=== QAOA result (single full cycle skeleton) ===")
    print(f"Chosen charge start i* = {i_star}  (hours {i_star}-{i_star+3})")
    print(f"Chosen discharge start j* = {j_star} (hours {j_star}-{j_star+3})")
    print(f"Profit (EUR) = {profit:.2f}")
    print(f"Penalty weights: A={A:.1f}, B={B:.1f}")
    print()

    # Optional: quick classical verification (brute force over feasible pairs)
    best = (-1e18, None, None)
    for i in starts:
        for j in starts:
            if j >= i + 4:
                Ci = 5.0 * prices[i - 1 : i - 1 + 4].sum()
                Rj = 4.0 * prices[j - 1 : j - 1 + 4].sum()
                pr = Rj - Ci
                if pr > best[0]:
                    best = (pr, i, j)
    print("=== Classical check (brute force) ===")
    print(f"Best feasible profit (EUR) = {best[0]:.2f} at i={best[1]}, j={best[2]}")

    # Build a readable 24h schedule for the decoded (i*,j*) plan
    schedule = []
    for t in range(1, 25):
        if i_star <= t <= i_star + 3:
            schedule.append(("CH", 5.0, 0.0))
        elif j_star <= t <= j_star + 3:
            schedule.append(("DIS", 0.0, 4.0))
        else:
            schedule.append(("IDLE", 0.0, 0.0))

    out = pd.DataFrame({
        "hour": np.arange(1, 25),
        "price": prices,
        "mode": [m for (m, _, _) in schedule],
        "charge_MWh": [c for (_, c, _) in schedule],
        "discharge_MWh": [d for (_, _, d) in schedule],
    })
    print("\n=== 24h schedule (decoded) ===")
    print(out.to_string(index=False))


if __name__ == "__main__":
    main()


MemoryError: Unable to allocate 64.0 TiB for an array with shape (4398046511104,) and data type complex128

## Classical post-processing
- Recompute SOC for each sampled bitstring; discard or repair any that violate 0 <= SOC <= Emax or the cycle budget.
- Greedy repair: if SOC dips below 0, flip the latest discharge to idle; if it exceeds Emax, flip the latest charge to idle; then re-check the end-SOC constraint. This keeps changes local.
- Score candidates with the exact objective, keep the best. Optionally warm-start the quantum solver with the classical MILP optimum.

Quantum calls are commented to keep the notebook runnable even without Qiskit primitives; enable them in an environment that has the providers you want (local simulator, cloud sampler, or annealer connector).