In [4]:
from collections import defaultdict
from dwave.samplers import SimulatedAnnealingSampler
from dwave.system.samplers import DWaveSampler
from dwave.system.composites import EmbeddingComposite
from dwave.system import DWaveSampler, EmbeddingComposite
from dimod import BinaryQuadraticModel
import numpy as np
import networkx as nx

import matplotlib
import matplotlib.pyplot as plt
matplotlib.use("agg")
%matplotlib inline

from __future__ import annotations
import numpy as np
from typing import Sequence, Tuple, Dict
import time

QUBO MODELING
-------------
Minimise  
$$\min_{x_i  \in \{ 0, 1\}^5 } H = \min_{x_i  \in \{ 0, 1\}^5 } \big( Σ_t  [ (C_i - 2γ D_t P_{i,t} + γ P_{i,t}² + δ_i(1-A_{i,t}) - ε_i A_{i,t}) x_{t,i}
                   +  Σ_{i<j} 2γ P_{i,t} P_{j,t} x_{t,i} x_{t,j} ] \big)$$

- Binary x_{t,i} indicates whether source $i$ is ON at step $t$.
- Fuel (index 4) has no availability term ($A=1$ by definition).

In [121]:
class QURRENT:
    def __init__(self, 
        T: int,
        costs: Sequence[float],
        P: np.ndarray,
        demand: Sequence[float],
        avail: np.ndarray,
        delta: Sequence[float],
        gamma: float,
        epsilon: Sequence[float]) -> None:
    
        self.qubo = None
        self.Q = {}
        self.Qmat = np.zeros((5 * T, 5 * T), dtype=float)
        self.bqm = BinaryQuadraticModel.from_qubo(self.Q)
        self.sampler = None
        self.response = None
        self.solution = None
        self.energy = None
        self.num_reads = []
        self.elapsed_time = 0.0
        self.print = print

        # INPUTS
        self.T = T
        self.costs = costs
        self.P = P
        self.demand = demand
        self.avail = avail
        self.delta = delta
        self.gamma = gamma
        self.epsilon = epsilon


        """
        GOAL:
        Build a QUBO matrix for multi-source (Solar, Wind, WTE, Hydro, Fuel) grid-scheduling.

        SUMMARY
        -------
        Minimise  H = Σ_t  [ (C_i - 2γ D_t P_{i,t} + γ P_{i,t}² + δ_i(1-A_{i,t}) - ε_i A_{i,t}) x_{t,i}
                            +  Σ_{i<j} 2γ P_{i,t} P_{j,t} x_{t,i} x_{t,j} ]

        Binary x_{t,i} indicates whether source *i* is ON at step *t*.
        Fuel (index 4) has no availability term (A=1 by definition).

        """

    # --------------------------------------------------------------------------- #
    #   USER HELPER: THIS IS TO HELP THE USER DEFINE THE INPUT PARAMS
    # --------------------------------------------------------------------------- #
    def check_input(self):
        if len(self.costs) != 5:
            raise ValueError("costs must have length 5 (solar, wind, wte, hydro, fuel)")
        if self.P.shape != (self.T, 5):
            raise ValueError(f"P must be shape (T,5); got {self.P.shape}")
        if len(self.demand) != self.T:
            raise ValueError("demand must have length T")
        if self.avail.shape != (self.T, 4):
            raise ValueError(f"avail must be shape (T,4) for the 4 renewables; got {self.avail.shape}")
        if len(self.delta) != 4 or len(self.epsilon) != 4:
            raise ValueError("delta and epsilon must have length 4 (for the 4 renewable sources)")
        else:
            print("ALL IS GOOD!")


    # --------------------------------------------------------------------------- #
    #   UTILITY: FLATTEN
    # --------------------------------------------------------------------------- #
    def flatten_index(self, t: int, i: int) -> int:
        """Map (time-step, source) -> flat index in [0, 5T)."""
        return 5 * t + i

    # --------------------------------------------------------------------------- #
    #   BUILD QUBO MATRIX
    # --------------------------------------------------------------------------- #
    def build_mat(self) -> np.ndarray:
        """
        Parameters
        ----------
        T        : horizon length
        costs    : [C_solar, C_wind, C_wte, C_hydro, C_fuel]
        P        : ndarray shape (T,5) of available power for each source at each t
        demand   : list/array length T of demand D_t
        gamma    : demand-mismatch penalty (>0)
        delta    : length-4 list of unavailability penalties for the four renewables
        epsilon  : length-4 list of rewards for using available renewables
        avail    : ndarray shape (T,4) of availability flags (0/1) for the renewables

        Returns
        -------
        Qmat  : (5T × 5T) numpy array (symmetric)
        """

        for t in range(T):
            Dt = self.demand[t]
            # DIAGONAL TERMS
            for i in range(5):
                k = self.flatten_index(t, i)
                Ci = costs[i]
                Pi = P[t, i]
                diag = Ci - 2 * self.gamma * Dt * Pi + self.gamma * Pi ** 2
                if i < 4:  # renewable, add availability modifiers
                    Ai = self.avail[t, i]
                    diag += self.delta[i] * (1 - Ai) - self.epsilon[i] * Ai
                self.Qmat[k, k] = diag

            # OFF-DIAGONAL TERMS
            for i in range(5):
                for j in range(i + 1, 5):
                    k1, k2 = self.flatten_index(t, i), self.flatten_index(t, j)
                    self.Qmat[k1, k2] = self.Qmat[k2, k1] = self.gamma * self.P[t, i] * self.P[t, j]

    # --------------------------------------------------------------------------- #
    #   FORMAT QUBO MATRIX
    # --------------------------------------------------------------------------- #
    def build_Q(self) -> Dict[Tuple[int, int], float]:
        threshold: float = 1e-12
        """
        Convert dense Qmat to the sparse dict format.
        Entries with |value| < `threshold` are dropped.
        """
        n = self.Qmat.shape[0]
        for i in range(n):
            for j in range(i, n):
                val = self.Qmat[i, j]
                if abs(val) > threshold:
                    self.Q[(i, j)] = float(val)

    # --------------------------------------------------------------------------- #
    #   SOLVE USING QUNATUM ANNEALING
    # --------------------------------------------------------------------------- #
    def solveQA(self, num_reads):

        # LABLE
        label = "ENERGY OPTIMIZATION"
        
        t0 = time.time()
        self.sampler = EmbeddingComposite(DWaveSampler())
        t1 = time.time()
        elapsed_time = t1 - t0
        self.elapsed_time = elapsed_time
        
        sampler_name = self.sampler.properties['child_properties']['chip_id']
        self.response = self.sampler.sample(self.bqm, num_reads, label=label)
        
        # GET RESULT
        self.response = self.sampler.sample(self.bqm, num_reads)
        self.solution = list(self.response.first.sample.values())
        self.energy = self.response.first.energy
        
        # PRINT RESULTS
        if (self.print):
            print("NREADS: ", num_reads)
            print("CONFIG:", self.solution)
            print("ENERGY:", self.energy)
            print(f"TIME[s]:  {elapsed_time:.6f}")

In [166]:
def QPUJob(Q, num_reads, label):
    # Initialize the QPU solver
    print("Initializing QPU solver...")
    bqm = BinaryQuadraticModel.from_qubo(Q)
    sampler = EmbeddingComposite(DWaveSampler())
    
    # Start timing the sampling process
    start_time = time.time()
    print("Submitting job to the QPU...")
    
    # Submit the job to the QPU
    response = sampler.sample(bqm, num_reads=num_reads, label=label)
    end_time = time.time()
    
    # Get the best result
    solution = list(response.first.sample.values())
    energy = response.first.energy
    
    # Print timing information
    total_time = end_time - start_time
    print("\n=== QPU Job Completed ===")
    print(f"Label: {label}")
    print(f"Total Time (seconds): {total_time:.4f}")
    print(f"Number of Reads: {num_reads}")
    
    # Print response details
    print("\n=== Detailed Response ===")
    print(f"Best configuration: {solution}")
    print(f"Best energy: {energy}")
    print(f"All samples: {response}")
    print(f"Number of solutions: {len(response)}")
    print(f"Occurrences of best solution: {response.first.num_occurrences}")
    
    return response

In [167]:
import pprint, textwrap
from __future__ import annotations
import numpy as np
from typing import Sequence, Tuple, Dict

model = QURRENT(T, costs, P, demand, avail, delta, gamma, epsilon)
model.check_input()
model.build_mat()
model.build_Q()
Q = model.Q


ALL IS GOOD!


In [None]:
T = 3

# Cost per unit (relative numbers; lower = better)
costs = [1.0, 2.5, 3.5, 5.0, 10.0]  # Solar, Wind, WTE, Hydro, Fuel

# Power available from each source at each hour (kW)
P = np.array([
    [60, 10, 20, 5, 100],    # 9:00 AM
    [70, 5, 20, 5, 100],     # 10:00 AM
    [80, 7, 20, 5, 100],     # 11:00 AM
], dtype=float)

# Demand (kW)
demand = [80, 85, 90]   # grid load at each hour

# Availability (binary 0/1 for renewable sources only: Solar, Wind, WTE, Hydro)
avail = np.array([
    [1, 1, 1, 1],  # all renewables OK at 9:00
    [1, 0, 1, 1],  # wind unavailable at 10:00
    [1, 1, 1, 1],  # all back at 11:00
])

# Penalty for using unavailable renewable sources (delta)
delta = [100, 100, 30, 10]  # strong penalty for solar/wind unavailability

# Reward for using available renewable sources (epsilon)
epsilon = [20, 15, 10, 5]

# Penalty for mismatch between demand and supplied power (gamma)
gamma = 200.0

In [148]:
# SOLUTION ENCODING
Energy_sources = ['Solar', 'Wind', 'Waste-to-Energy', 'Hydro', 'Fuel']

QPUJob(Q, 200, "OPT")

Initializing QPU solver...
Submitting job to the QPU...

=== QPU Job Completed ===
Label: OPT
Total Time (seconds): 0.1369
Number of Reads: 200

=== Detailed Response ===
Best configuration: [np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0)]
Best energy: -5680199.0
All samples:      0  1  2  3  4  5  6  7  8  9 10 11 12 13 14     energy num_oc. ...
0    1  1  1  1  0  1  1  1  1  0  1  1  1  1  0 -5680199.0      12 ...
1    1  1  1  0  0  1  1  1  1  0  1  1  1  1  0 -5615199.0       6 ...
2    1  1  1  1  0  1  1  1  1  0  1  1  1  0  0 -5612199.0       6 ...
3    1  1  1  1  0  1  0  1  1  0  1  1  1  1  0 -5610301.5       8 ...
4    1  1  1  1  0  1  1  1  0  0  1  1  1  1  0 -5610199.0       6 ...
5    1  1  1  1  0  1  1  1  1  0  1  0  1  1  0 -5584986.5       5 ...
6    1  1  1  1  0  1  1  1  1  0  0  1  1  1  1 -5552170.0       6 ...
7    1  0  1  1

SampleSet(rec.array([([1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0], -5680199. , 12, 0.        ),
           ([1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0], -5615199. ,  6, 0.        ),
           ([1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0], -5612199. ,  6, 0.        ),
           ([1, 1, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0], -5610301.5,  8, 0.        ),
           ([1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0], -5610199. ,  6, 0.        ),
           ([1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 0], -5584986.5,  5, 0.        ),
           ([1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1], -5552170. ,  6, 0.        ),
           ([1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0], -5550186.5,  4, 0.        ),
           ([1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0], -5547199. ,  1, 0.        ),
           ([1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0], -5545301.5,  3, 0.        ),
           ([1, 1, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0], -5545199. ,  5, 0.        ),
           ([1, 1, 1, 

In [149]:
T = 10  # 10 hours

# Cost per unit energy for each source (lower cost is better)
costs = [1.0, 2.5, 3.5, 5.0, 10.0]  # Solar, Wind, WTE, Hydro, Fuel

# Power available from each source at each hour (rows = hours, columns = sources)
P = np.array([
    [50,  8, 18,  5, 100],   # 1
    [55, 10, 20,  5, 100],   # 2
    [60, 12, 20,  5, 100],   # 3
    [65,  5, 18,  5, 100],   # 4 (wind drops)
    [70,  6, 20,  5, 100],   # 5
    [75,  8, 20,  5, 100],   # 6
    [80,  9, 22,  5, 100],   # 7
    [85, 11, 20,  5, 100],   # 8
    [90,  7, 20,  5, 100],   # 9
    [95,  8, 18,  5, 100],   # 10
], dtype=float)

# Demand (kW) for each hour
demand = [70, 75, 80, 85, 90, 95, 100, 100, 105, 110]

# Availability (only for renewables: Solar, Wind, WTE, Hydro) binary 0/1
avail = np.array([
    [1, 1, 1, 1],  # 1
    [1, 1, 1, 1],  # 2
    [1, 0, 1, 1],  # 3 (wind down)
    [1, 0, 1, 1],  # 4
    [1, 1, 1, 1],  # 5
    [1, 1, 1, 1],  # 6
    [1, 1, 1, 1],  # 7
    [1, 0, 1, 1],  # 8
    [1, 0, 1, 1],  # 9
    [1, 1, 1, 1],  # 10
], dtype=int)

# Penalty for using an unavailable renewable
delta = [100, 100, 30, 10]  # [Solar, Wind, WTE, Hydro]

# Incentive to use available renewables
epsilon = [20, 15, 10, 5]

# Penalty for mismatch between total supply and demand
gamma = 200.0


In [150]:
model = QURRENT(T, costs, P, demand, avail, delta, gamma, epsilon)
model.check_input()
model.build_mat()
model.build_Q()
Q = model.Q

ALL IS GOOD!


In [152]:
results = QPUJob(Q, 200, "10H-OPT")

Initializing QPU solver...
Submitting job to the QPU...

=== QPU Job Completed ===
Label: 10H-OPT
Total Time (seconds): 0.5353
Number of Reads: 200

=== Detailed Response ===
Best configuration: [np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(0), np.int8(1), np.int8(1), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(1), np.int8(0), np.int8(1), np.int8(1), np.int8(1), np.int8(1)]
Best energy: -21589833.0
All samples:      0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 ... 49      energy num_oc. ...
0    1  1  1  1  0  1  1  1  1  0  1  1  1  0  0 ...  1 -21589833.0       1 ...
1    1

In [162]:
best_sample = results.first.sample
energy = results.first.energy
n_vars = len(best_sample)
flat = np.array([ best_sample[i] for i in range(n_vars) ], dtype=int)