In [1]:
from dataclasses import dataclass
from typing import Tuple
import numpy as np

@dataclass
class HVACParams:
    prices: np.ndarray                # length T
    cooling_per_hour: float = 0.5
    heating_per_hour: float = 2.0
    start_temp: float = 20.0
    min_temp: float = 18.0
    max_temp: float = 22.0
    penalty_scale: float = 1_000.0
    grid_step: float = 0.1            # °C discretization
    grid_pad: float = 3.0             # °C extra range on both sides

def build_grid(p: HVACParams) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Create temperature grid, transitions for a=0/1, and penalty lookup."""
    grid_min = p.min_temp - p.grid_pad
    grid_max = p.max_temp + p.grid_pad
    temps_grid = np.round(np.arange(grid_min, grid_max + p.grid_step, p.grid_step), 10)
    S = len(temps_grid)

    def clamp_to_idx(temp: float) -> int:
        j = int(round((temp - grid_min) / p.grid_step))
        return max(0, min(S - 1, j))

    # Transitions: next state index after applying action at this step
    next_idx_off = np.empty(S, dtype=np.int32)
    next_idx_on  = np.empty(S, dtype=np.int32)
    for s in range(S):
        t_now = temps_grid[s]
        next_idx_off[s] = clamp_to_idx(t_now - p.cooling_per_hour)
        next_idx_on[s]  = clamp_to_idx(t_now - p.cooling_per_hour + p.heating_per_hour)

    # Penalty on the *resulting* temperature (post-step)
    low = np.maximum(0.0, p.min_temp - temps_grid)
    high = np.maximum(0.0, temps_grid - p.max_temp)
    penalty_lut = p.penalty_scale * (low**3 + high**3)

    return temps_grid, next_idx_off, next_idx_on, penalty_lut

def dp_solve(p: HVACParams):
    """Solve optimal ON/OFF heating schedule via DP over discretized temperatures."""
    T = len(p.prices)
    temps_grid, ns0, ns1, penalty_lut = build_grid(p)
    S = len(temps_grid)

    # Value function and policy
    V = np.zeros((T + 1, S))
    pi = np.zeros((T, S), dtype=np.uint8)  # 0=OFF, 1=ON

    # Backward pass
    for t in range(T - 1, -1, -1):
        c0 = penalty_lut[ns0] + V[t + 1, ns0]                # OFF: no energy cost
        c1 = penalty_lut[ns1] + V[t + 1, ns1] + p.prices[t]  # ON: pay price
        better_is_on = c1 < c0
        V[t] = np.where(better_is_on, c1, c0)
        pi[t] = better_is_on.astype(np.uint8)

    # Start state index (closest grid point to start temperature)
    grid_min = temps_grid[0]
    s0 = int(np.clip(np.round((p.start_temp - grid_min) / p.grid_step), 0, S - 1))
    best_cost = float(V[0, s0])

    # Forward pass to extract actions and post-step temperatures
    actions = np.zeros(T, dtype=np.uint8)
    temps_post = np.zeros(T)
    penalties = np.zeros(T)
    energy = np.zeros(T)

    s = s0
    for t in range(T):
        a = pi[t, s]
        actions[t] = a
        s_next = ns1[s] if a else ns0[s]
        temps_post[t] = temps_grid[s_next]
        penalties[t] = penalty_lut[s_next]
        energy[t] = p.prices[t] if a else 0.0
        s = s_next

    return actions, temps_post, energy, penalties, best_cost

def print_trajectory(actions, temps_post, energy, penalties, prices, start_temp, cool, heat):
    """Pretty print the planned trajectory (per-step results after applying the action)."""
    total_energy = energy.sum()
    total_penalty = penalties.sum()
    total_cost = total_energy + total_penalty

    print("Planned trajectory (post-step state each hour)")
    print("t  Action  Temp(°C)  Price  Energy  Penalty")
    print("-- ------  --------  -----  ------  -------")
    for t, (a, temp, price, e, pen) in enumerate(zip(actions, temps_post, prices, energy, penalties)):
        print(f"{t:2d}  {'ON ' if a else 'OFF'}    {temp:8.2f}  {price:5.0f}  {e:6.0f}  {pen:7.1f}")

    print("\nSummary")
    print(f" Start temp: {start_temp:.2f}°C")
    print(f" Cooling/heating per hour: -{cool} / +{heat} °C")
    print(f" ON hours: {actions.sum()} of {len(actions)}")
    print(f" Energy cost:  {total_energy:.1f}")
    print(f" Penalty cost: {total_penalty:.1f}")
    print(f" Total cost:   {total_cost:.1f}")

# -------------------------
# Example usage
# -------------------------
if __name__ == "__main__":
    PRICES = np.array([1, 10, 10, 1, 10, 10, 1, 1, 1, 1, 10, 10, 100, 1, 1]).repeat(10)

    params = HVACParams(
        prices=PRICES,
        cooling_per_hour=0.5,
        heating_per_hour=2.0,
        start_temp=20.0,
        min_temp=18.0,
        max_temp=22.0,
        penalty_scale=1_000.0,
        grid_step=0.1,    # tune for speed/accuracy
        grid_pad=3.0
    )

    actions, temps_post, energy, penalties, best_cost = dp_solve(params)
    print_trajectory(
        actions, temps_post, energy, penalties,
        prices=params.prices,
        start_temp=params.start_temp,
        cool=params.cooling_per_hour,
        heat=params.heating_per_hour
    )

Planned trajectory (post-step state each hour)
t  Action  Temp(°C)  Price  Energy  Penalty
-- ------  --------  -----  ------  -------
 0  OFF       19.50      1       0      0.0
 1  OFF       19.00      1       0      0.0
 2  OFF       18.50      1       0      0.0
 3  OFF       18.00      1       0      0.0
 4  ON        19.50      1       1      0.0
 5  OFF       19.00      1       0      0.0
 6  OFF       18.50      1       0      0.0
 7  OFF       18.00      1       0      0.0
 8  ON        19.50      1       1      0.0
 9  ON        21.00      1       1      0.0
10  OFF       20.50     10       0      0.0
11  OFF       20.00     10       0      0.0
12  OFF       19.50     10       0      0.0
13  OFF       19.00     10       0      0.0
14  OFF       18.50     10       0      0.0
15  OFF       18.00     10       0      0.0
16  ON        19.50     10      10      0.0
17  OFF       19.00     10       0      0.0
18  OFF       18.50     10       0      0.0
19  OFF       18.00     10   