# Function definitions

Import

In [1]:
import gurobipy as gp
from gurobipy import GRB
import copy
import numpy as np
import plotly.graph_objects as go
from ipywidgets import FloatSlider, VBox, HBox, Tab, Output, Layout, Label, Dropdown
from IPython.display import display, clear_output


Data container

In [2]:
class NetworkData:
    """
    Common input data for the 3-node network.
    This object will be reused across Model 1, 2, 3 and 4.
    """
    def __init__(self):
        # Nodes and lines
        self.nodes = [1, 2, 3]  # N
        self.lines = [(1, 2), (1, 3), (2, 3)]  # undirected, i<j

        # generation costs [€/MWh]
        self.c = {
            1: 30.0,
            2: 500.0,
            3: 200.0,
        }

        # Load demands [MW]
        self.d = {
            1: 80.0,
            2: 400.0,
            3: 100.0,
        }

        # Max generation capacities [MW]
        self.Gmax = {
            1: 500.0,
            2: 150.0,
            3: 250.0,
        }

        # Susceptances B_ij (pu)
        self.B = {
            (1, 2): 1.0,
            (1, 3): 1.0,
            (2, 3): 1.0,
        }

        # Maximum capacity of existing lines [MW]
        self.Pmax = {
            (1, 2): 200.0,
            (1, 3): 150.0,
            (2, 3): 100.0,
        }

        # Extra capacity if reinforcement is built [MW]
        # Per ora permettiamo il rinforzo solo sulla linea (1,2)
        self.DeltaP = {
            (1, 2): 200.0,
            (1, 3): 0.0,
            (2, 3): 0.0,
        }

        # Investment cost [€] for reinforcing line (1,2)
        self.C_inv = 50_000_000.0  # total CAPEX
        r = 0.05     # discount rate
        n = 30       # lifetime
        CRF = r * (1 + r)**n / ((1 + r)**n - 1)
        self.C_inv = self.C_inv * CRF    # annualized CAPEX


        # Node coordinates for plotting (triangle layout)
        self.node_pos = {
            1: (-1.0, 0.0),
            2: (0.0, -1.0),
            3: (1.0, 0.0),
        }


        #Multi-period extension (used by Model 2 and beyond)
        # Time periods t (e.g. t=1,2,3 may represent 2030,2040,2050 in the report).
        self.periods = [1, 2, 3]

        # Default time-dependent parameters:
        # By default, they copy the single-period values for all t.
        self.c_t = {(i, t): self.c[i] for i in self.nodes for t in self.periods}
        # Profili temporali baseline (puoi adattare i numeri come preferisci)
        self.d_t = {
            (1, 1): 80.0,  (2, 1): 200.0, (3, 1): 100.0,   # Periodo 1
            (1, 2): 80.0, (2, 2): 300.0, (3, 2): 100.0,   # Periodo 2
            (1, 3): 80.0, (2, 3): 400.0, (3, 3): 100.0,   # Periodo 3
        }

        self.Gmax_t = {
            (1, 1): 200.0,  (2, 1): 150.0, (3, 1): 250.0,   # Periodo 1
            (1, 2): 350.0, (2, 2): 150.0, (3, 2): 250.0,   # Periodo 2
            (1, 3): 500.0, (2, 3): 150.0, (3, 3): 250.0,   # Periodo 3
        }

        # Number of hours represented by each period (for now, assume full year)
        self.H = 8760  # can be changed if needed

        
        # Scenario data for Model 4
        

        # Scenario set and probabilities
        self.scenarios = ["high", "base", "low"]
        self.prob = {
            "high": 0.3,
            "base": 0.4,
            "low":  0.3
        }

        # Scenario-dependent parameters:
        # c_omega[(i, t, omega)]: marginal cost [€/MWh]
        # d_omega[(i, t, omega)]: demand [MW]
        # Gmax_omega[(i, t, omega)]: maximum generation capacity [MW]
        self.c_omega = {}
        self.d_omega = {}
        self.Gmax_omega = {}

        # Demand scaling factors by period and scenario
        # Assuming periods = [1, 2, 3] corresponding to early, mid, late
        demand_factor = {
            1: {"high": 1.10, "base": 1.00, "low": 0.90},  # early
            2: {"high": 1.15, "base": 1.00, "low": 0.85},  # mid
            3: {"high": 1.20, "base": 1.00, "low": 0.80},  # late
        }

        # RES capacity scaling factors at node 1 by period and scenario
        res_factor = {
            1: {"high": 1.20, "base": 1.00, "low": 0.90},
            2: {"high": 1.30, "base": 1.00, "low": 0.90},
            3: {"high": 1.40, "base": 1.00, "low": 0.85},
        }

        # Scenario-dependent data construction
        for t in self.periods:
            for omega in self.scenarios:
                for i in self.nodes:
                    # Marginal cost: start from deterministic profile
                    base_c = self.c_t[(i, t)]
                    self.c_omega[(i, t, omega)] = base_c

                    # Demand: deterministic profile times scenario factor
                    base_d = self.d_t[(i, t)]
                    self.d_omega[(i, t, omega)] = base_d * demand_factor[t][omega]

                    # Generation capacity:
                    # node 1 treated as RES-rich node with scenario scaling
                    base_G = self.Gmax_t[(i, t)]
                    if i == 1:
                        self.Gmax_omega[(i, t, omega)] = base_G * res_factor[t][omega]
                    else:
                        # other nodes follow the deterministic profile
                        self.Gmax_omega[(i, t, omega)] = base_G


BaseModel

In [3]:
class BaseModel:
    """
    Abstract base class for all models (1–4).
    Each derived model should implement:
      - build()
      - solve()
      - postprocess()
    """
    def __init__(self, data: NetworkData):
        self.data = data
        self.model = None
        self.results = {}

    def build(self):
        raise NotImplementedError

    def solve(self):
        if self.model is None:
            self.build()
        self.model.optimize()

    def postprocess(self):
        raise NotImplementedError

    def run(self):
        """
        Convenience method: build + solve + postprocess.
        Returns results dict.
        """
        self.build()
        self.solve()
        self.postprocess()
        return self.results


Generic analysis tools (for all models)


In [4]:
def parameter_scan(model_cls, invest_indicator_fn, base_data, update_fn, values):
    """
    Generic parameter scan usable for Model1, Model2, Model3, Model4.
    """
    results_list = []
    for val in values:
        data = copy.deepcopy(base_data)
        update_fn(data, val)
        model = model_cls(data)
        res = model.run()
        invests = invest_indicator_fn(res)
        obj_val = res.get("obj", None) if res.get("status") == GRB.OPTIMAL else None

        results_list.append({
            "value": val,
            "invests": invests,
            "obj": obj_val,
            "results": res,
        })
    return results_list


def plot_investment_scan(scan_results, xlabel="parameter", title="Investment scan"):
    """
    Plot 0/1 investment decision vs parameter value, plus objective value (if available).
    """
    xs = [r["value"] for r in scan_results]
    invest = [1 if r["invests"] else 0 for r in scan_results]
    obj = [r["obj"] for r in scan_results]

    fig = go.Figure()

    fig.add_trace(go.Scatter(
        x=xs, y=invest,
        mode="lines+markers",
        name="Invests (0/1)",
        yaxis="y1"
    ))

    if any(v is not None for v in obj):
        fig.add_trace(go.Scatter(
            x=xs, y=obj,
            mode="lines+markers",
            name="Objective",
            yaxis="y2"
        ))
        fig.update_layout(
            xaxis=dict(title=xlabel),
            yaxis=dict(title="Investment (0/1)", range=[-0.1, 1.1]),
            yaxis2=dict(
                title="Objective",
                overlaying="y",
                side="right"
            ),
            title=title,
            height=450
        )
    else:
        fig.update_layout(
            xaxis=dict(title=xlabel),
            yaxis=dict(title="Investment (0/1)", range=[-0.1, 1.1]),
            title=title,
            height=450
        )

    return fig


# Models

Model 1

In [5]:
class Model1(BaseModel):
    """
    Model 1: Myopic deterministic expansion (single-period DC-OPF with investment x).
    """
    def __init__(self, data: NetworkData):
        super().__init__(data)
        self.p = None
        self.delta = None
        self.x = None

    def build(self):
        d = self.data
        m = gp.Model("Model1_MyopicDeterministic")
        m.Params.OutputFlag = 0  # silent

        # Decision variables
        self.p = m.addVars(d.nodes, lb=0.0, name="p")  # generation p_i
        self.delta = m.addVars(d.nodes, lb=-GRB.INFINITY, name="delta")  # voltage angles δ_i
        self.x = m.addVar(vtype=GRB.BINARY, name="x")  # investment decision on line (1,2)

        HOURS = d.H  # hours in one year

        # Objective: annual generation cost + investment cost
        m.setObjective(
            HOURS * gp.quicksum(d.c[i] * self.p[i] for i in d.nodes) + d.C_inv * self.x,
            GRB.MINIMIZE
        )

        # 1. Generation capacity limits: 0 <= p_i <= Gmax_i
        for i in d.nodes:
            m.addConstr(self.p[i] <= d.Gmax[i], name=f"GenCap_{i}")

        # 2. Nodal power balance:
        # p_i - d_i = sum_j B_ij (δ_i - δ_j)
        for i in d.nodes:
            expr = self.p[i] - d.d[i]
            for (u, v) in d.lines:
                if u == i:
                    j = v
                    expr -= d.B[(u, v)] * (self.delta[i] - self.delta[j])
                elif v == i:
                    j = u
                    expr -= d.B[(u, v)] * (self.delta[i] - self.delta[j])
            m.addConstr(expr == 0, name=f"Balance_{i}")

        # 3. Line capacity limits (compact form)
        # - (P_ij + ΔP_ij x) <= B_ij(δ_i - δ_j) <= P_ij + ΔP_ij x
        for (i, j) in d.lines:
            eff_cap = d.Pmax[(i, j)] + d.DeltaP[(i, j)] * self.x
            m.addConstr(d.B[(i, j)] * (self.delta[i] - self.delta[j]) <= eff_cap,
                        name=f"LineCapUp_{i}_{j}")
            m.addConstr(d.B[(i, j)] * (self.delta[i] - self.delta[j]) >= -eff_cap,
                        name=f"LineCapLo_{i}_{j}")

        # 4. Reference angle
        m.addConstr(self.delta[1] == 0.0, name="RefAngle")

        self.model = m

    def postprocess(self):
        """
        Extracts p_i, δ_i, x and computes implied flows B_ij(δ_i - δ_j).
        """
        if self.model.status != GRB.OPTIMAL:
            self.results = {"status": self.model.status}
            return

        d = self.data
        p_sol = {i: self.p[i].X for i in d.nodes}
        delta_sol = {i: self.delta[i].X for i in d.nodes}
        x_sol = self.x.X

        flows = {}
        for (i, j) in d.lines:
            flows[(i, j)] = d.B[(i, j)] * (delta_sol[i] - delta_sol[j])

        self.results = {
            "status": self.model.status,
            "obj": self.model.objVal,
            "p": p_sol,
            "delta": delta_sol,
            "x": x_sol,
            "flows": flows,
        }


Plot function

In [6]:
def plot_dispatch(data: NetworkData, results: dict):
    """
    Plotly figure:
    - nodes with generation and demand
    - line flows with thickness proportional to |flow|
    """
    from math import fabs

    fig = go.Figure()

    if results.get("status") != GRB.OPTIMAL:
        fig.add_annotation(text="No optimal solution", x=0.5, y=0.5, showarrow=False)
        fig.update_xaxes(visible=False)
        fig.update_yaxes(visible=False)
        fig.update_layout(height=400, margin=dict(l=10, r=10, t=30, b=10))
        return fig

    p = results["p"]
    flows = results["flows"]
    x_sol = results["x"]

    # Node positions and hover text
    xs, ys, texts, labels = [], [], [], []
    for i in data.nodes:
        x, y = data.node_pos[i]
        xs.append(x)
        ys.append(y)
        labels.append(f"N{i}")
        txt = f"N{i}<br>G: {p[i]:.1f} MW<br>D: {data.d[i]:.1f} MW"
        texts.append(txt)

    # Draw lines
    if flows:
        max_flow = max(fabs(v) for v in flows.values())
    else:
        max_flow = 1.0

    for (i, j), f_ij in flows.items():
        x0, y0 = data.node_pos[i]
        x1, y1 = data.node_pos[j]
        width = 2 + 8 * (abs(f_ij) / max_flow if max_flow > 0 else 0)

        fig.add_trace(go.Scatter(
            x=[x0, x1],
            y=[y0, y1],
            mode="lines",
            line=dict(width=width),
            hoverinfo="text",
            text=[f"flow {i}-{j}: {f_ij:.1f} MW"],
            showlegend=False,
        ))

    # Draw nodes
    fig.add_trace(go.Scatter(
        x=xs,
        y=ys,
        mode="markers+text",
        marker=dict(size=20, line=dict(width=2, color="black")),
        text=labels,
        textposition="top center",
        hoverinfo="text",
        hovertext=texts,
        showlegend=False,
    ))

    invest_text = "built" if x_sol > 0.5 else "not built"
    fig.update_layout(
        title=f"Model 1 – Dispatch (line 1–2 {invest_text})",
        xaxis=dict(visible=False),
        yaxis=dict(visible=False),
        height=450,
        margin=dict(l=10, r=10, t=40, b=10),
    )
    return fig


Dasboard model 1

In [7]:
# =========================
# DASHBOARD MODEL 1 (v2)
# =========================

import gurobipy as gp
from gurobipy import GRB
from ipywidgets import FloatSlider, VBox, Tab, Output, Layout
from IPython.display import display, clear_output
import plotly.graph_objects as go


def compute_nodal_prices_lp(data: NetworkData, x_value: int):
    """
    Calcola i nodal prices [€/MWh] risolvendo un LP
    con la decisione di investimento x fissata (0 o 1).

    NOTA: qui NON moltiplichiamo per H, così le duali
    hanno unità €/MWh (marginal cost per MWh).
    """
    m = gp.Model("LP_for_LMP")
    m.Params.OutputFlag = 0

    # Variabili
    p = m.addVars(data.nodes, lb=0.0, name="p")
    delta = m.addVars(data.nodes, lb=-GRB.INFINITY, name="delta")

    # Obiettivo: solo costi marginali (no CAPEX, no H)
    m.setObjective(gp.quicksum(data.c[i] * p[i] for i in data.nodes),
                   GRB.MINIMIZE)

    # Limiti di generazione
    for i in data.nodes:
        m.addConstr(p[i] <= data.Gmax[i], name=f"GenCap_{i}")

    # Bilanci nodali (salviamo i vincoli per leggere le duali)
    balance_constr = {}
    for i in data.nodes:
        expr = p[i] - data.d[i]
        for (u, v) in data.lines:
            if u == i:
                j = v
                expr -= data.B[(u, v)] * (delta[i] - delta[j])
            elif v == i:
                j = u
                expr -= data.B[(u, v)] * (delta[i] - delta[j])
        balance_constr[i] = m.addConstr(expr == 0, name=f"Balance_{i}")

    # Limiti di capacità linee (x fissato)
    for (i, j) in data.lines:
        if (i, j) == (1, 2):
            eff_cap = data.Pmax[(i, j)] + data.DeltaP[(i, j)] * x_value
        else:
            eff_cap = data.Pmax[(i, j)]
        m.addConstr(data.B[(i, j)] * (delta[i] - delta[j]) <= eff_cap,
                    name=f"LineCapUp_{i}_{j}")
        m.addConstr(data.B[(i, j)] * (delta[i] - delta[j]) >= -eff_cap,
                    name=f"LineCapLo_{i}_{j}")

    # Riferimento angolo
    m.addConstr(delta[1] == 0.0, name="RefAngle")

    m.optimize()

    if m.status != GRB.OPTIMAL:
        return None

    # Duali dei bilanci = LMP
    lmp = {i: balance_constr[i].Pi for i in data.nodes}
    return lmp


def plot_dispatch(data: NetworkData, results: dict, lmp=None):
    """
    Plotly figure:
    - nodi con generation e demand (+ opzionale LMP)
    - linee con spessore proporzionale a |flow|
    - testo sul segmento con il valore del flusso [MW]
    """
    from math import fabs

    fig = go.Figure()

    if results.get("status") != GRB.OPTIMAL:
        fig.add_annotation(text="No optimal solution", x=0.5, y=0.5, showarrow=False)
        fig.update_xaxes(visible=False)
        fig.update_yaxes(visible=False)
        fig.update_layout(height=400, margin=dict(l=10, r=10, t=30, b=10))
        return fig

    p = results["p"]
    flows = results["flows"]
    x_sol = results["x"]

    # Node positions and hover text
    xs, ys, texts, labels = [], [], [], []
    for i in data.nodes:
        x, y = data.node_pos[i]
        xs.append(x)
        ys.append(y)
        labels.append(f"N{i}")
        txt = f"N{i}<br>G: {p[i]:.1f} MW<br>D: {data.d[i]:.1f} MW"
        if lmp is not None:
            txt += f"<br>λ: {lmp[i]:.1f} €/MWh"
        texts.append(txt)

    # Line flows: thickness + testo centrale
    if flows:
        max_flow = max(fabs(v) for v in flows.values())
    else:
        max_flow = 1.0

    for (i, j), f_ij in flows.items():
        x0, y0 = data.node_pos[i]
        x1, y1 = data.node_pos[j]
        width = 2 + 8 * (abs(f_ij) / max_flow if max_flow > 0 else 0)

        # segmento
        fig.add_trace(go.Scatter(
            x=[x0, x1],
            y=[y0, y1],
            mode="lines",
            line=dict(width=width),
            hoverinfo="text",
            text=[f"flow {i}-{j}: {f_ij:.1f} MW"],
            showlegend=False,
        ))

        # testo al centro del segmento con il valore del flusso
        xm = 0.5 * (x0 + x1) 
        ym = 0.5 * (y0 + y1) - 0.14
        fig.add_trace(go.Scatter(
            x=[xm],
            y=[ym],
            mode="text",
            text=[f"{f_ij:.1f} MW"],
            hoverinfo="skip",
            showlegend=False,
        ))

    # Nodi
    fig.add_trace(go.Scatter(
        x=xs,
        y=ys,
        mode="markers+text",
        marker=dict(size=20, line=dict(width=2, color="black")),
        text=labels,
        textposition="top center",
        hoverinfo="text",
        hovertext=texts,
        showlegend=False,
    ))

    invest_text = "built" if x_sol > 0.5 else "not built"
    fig.update_layout(
        title=f"Model 1 – Dispatch (line 1–2 {invest_text})",
        xaxis=dict(visible=False),
        yaxis=dict(visible=False),
        height=450,
        margin=dict(l=10, r=10, t=40, b=10),
    )
    return fig


# ========================
# Istanza dati condivisa
# ========================
data = NetworkData()

# Output widget per plot e testo
out_plot = Output()
out_text = Output()

# ---------------- Generators tab ----------------
g_sliders = {}
for i in data.nodes:
    g_sliders[f"Gmax_{i}"] = FloatSlider(
        value=data.Gmax[i],
        min=0.0,
        max=800.0,
        step=10.0,
        description=f"G{i} cap",
        continuous_update=False,
        layout=Layout(width="95%"),
    )
    g_sliders[f"C_{i}"] = FloatSlider(
        value=data.c[i],
        min=0.0,
        max=150.0,
        step=5.0,
        description=f"G{i} cost",
        continuous_update=False,
        layout=Layout(width="95%"),
    )

gen_tab_box = VBox(
    [g_sliders[f"Gmax_{i}"] for i in data.nodes]
    + [g_sliders[f"C_{i}"] for i in data.nodes]
)

# ---------------- Loads tab ----------------
d_sliders = {}
for i in data.nodes:
    d_sliders[f"D_{i}"] = FloatSlider(
        value=data.d[i],
        min=0.0,
        max=800.0,
        step=10.0,
        description=f"D{i}",
        continuous_update=False,
        layout=Layout(width="95%"),
    )

load_tab_box = VBox([d_sliders[f"D_{i}"] for i in data.nodes])

# ---------------- Lines tab ----------------
line_sliders = {}
for (i, j) in data.lines:
    key = f"Pmax_{i}{j}"
    line_sliders[key] = FloatSlider(
        value=data.Pmax[(i, j)],
        min=50.0,
        max=600.0,
        step=10.0,
        description=f"Pmax {i}-{j}",
        continuous_update=False,
        layout=Layout(width="95%"),
    )

DeltaP_slider = FloatSlider(
    value=data.DeltaP[(1, 2)],
    min=0.0,
    max=400.0,
    step=10.0,
    description="ΔP 1-2",
    continuous_update=False,
    layout=Layout(width="95%"),
)

Cinv_slider = FloatSlider(
    value=data.C_inv,
    min=0.0,
    max=20_000_000.0,
    step=500_000.0,
    description="C_inv (annual)",
    continuous_update=False,
    layout=Layout(width="95%"),
)

line_tab_box = VBox(list(line_sliders.values()) + [DeltaP_slider, Cinv_slider])

# ---------------- Tabs container ----------------
tab = Tab(children=[gen_tab_box, load_tab_box, line_tab_box])
tab.set_title(0, "Generators")
tab.set_title(1, "Loads")
tab.set_title(2, "Lines")


def run_model_and_update(change=None):
    """
    Legge i valori degli slider, aggiorna i dati,
    esegue Model 1, calcola LMP e aggiorna plot + testo.
    """
    # Aggiorna generatori
    for i in data.nodes:
        data.Gmax[i] = g_sliders[f"Gmax_{i}"].value
        data.c[i] = g_sliders[f"C_{i}"].value

    # Aggiorna load
    for i in data.nodes:
        data.d[i] = d_sliders[f"D_{i}"].value

    # Aggiorna capacità linee
    for (i, j) in data.lines:
        key = f"Pmax_{i}{j}"
        data.Pmax[(i, j)] = line_sliders[key].value

    # Aggiorna rinforzo
    data.DeltaP[(1, 2)] = DeltaP_slider.value
    data.C_inv = Cinv_slider.value

    # Esegui Model 1
    model = Model1(data)
    res = model.run()

    # Calcola LMP risolvendo LP con x fissato
    lmp = None
    if res.get("status") == GRB.OPTIMAL:
        x_val = int(round(res["x"]))
        lmp = compute_nodal_prices_lp(data, x_val)

    # Plot
    with out_plot:
        clear_output(wait=True)
        fig = plot_dispatch(data, res, lmp=lmp)
        fig.show()

    # Testo
    with out_text:
        clear_output(wait=True)
        if res.get("status") == GRB.OPTIMAL:
            print(f"Optimal objective value: {res['obj']:.2f} €")
            print("\nGeneration p_i [MW]:")
            for i in data.nodes:
                print(f"  Node {i}: {res['p'][i]:.1f}")
            print("\nLine flows [MW]:")
            for (i, j), f_ij in res["flows"].items():
                print(f"  {i}-{j}: {f_ij:.1f}")

            if lmp is not None:
                print("\nNodal prices λ_i [€/MWh]:")
                for i in data.nodes:
                    print(f"  Node {i}: {lmp[i]:.2f}")
            print(f"\nInvestment decision line 1-2 (0: NO, 1: YES): {int(round(res['x']))}")
        else:
            print("Model not optimal. Status:", res.get("status"))


# Collega tutti gli slider alla funzione di update
for w in list(g_sliders.values()) + list(d_sliders.values()) \
         + list(line_sliders.values()) + [DeltaP_slider, Cinv_slider]:
    w.observe(run_model_and_update, names="value")

# Primo run
run_model_and_update()

# Mostra dashboard
display(tab)
display(out_plot)
display(out_text)


Restricted license - for non-production use only - expires 2026-11-23


Tab(children=(VBox(children=(FloatSlider(value=500.0, continuous_update=False, description='G1 cap', layout=La…

Output()

Output()

Model 1 – Threshold analysis


In [8]:
def invests_model1(results):
    """
    Returns True if Model 1 invests in the reinforcement.
    Assumes results['x'] is the investment binary variable.
    """
    if results.get("status") != GRB.OPTIMAL:
        return False
    return results.get("x", 0.0) > 0.5


def scan_load_threshold_model1(base_data, node, load_values):
    """
    Uses parameter_scan for Model 1, varying the demand at 'node'.
    """
    def update_fn(d, val):
        d.d[node] = val

    return parameter_scan(
        model_cls=Model1,
        invest_indicator_fn=invests_model1,
        base_data=base_data,
        update_fn=update_fn,
        values=load_values,
    )


In [9]:
# Esempio: scan over load at node 2 in Model 1
data_M1 = NetworkData()

load_values = list(np.linspace(100, 1000, 200))  # da 100 a 1000 MW

scan_M1 = scan_load_threshold_model1(data_M1, node=2, load_values=load_values)

# finds where to invest
threshold_M1 = None
for r in scan_M1:
    if r["invests"]:
        threshold_M1 = r["value"]
        break

print("Model 1 – first load value where investment occurs at node 2:", threshold_M1)

fig_M1 = plot_investment_scan(
    scan_M1,
    xlabel="Demand at node 2 [MW]",
    title="Model 1 – Investment vs demand at node 2",
)
fig_M1.show()


Model 1 – first load value where investment occurs at node 2: 253.76884422110552


Model 2

In [10]:
# =====================
# MODEL 2
# =====================

class Model2(BaseModel):
    """
    Model 2: Intertemporal / Anticipative Expansion Model
    Multi-period DC-OPF with a single candidate reinforcement (line 1-2).
    Decides WHEN to invest (y_t) and accounts for its availability x_t over time.
    """
    def __init__(self, data: NetworkData):
        super().__init__(data)
        self.T = data.periods  # list of periods (e.g. [1,2,3])

        # Decision variables
        self.p = None       # p[i,t]
        self.delta = None   # delta[i,t]
        self.y = None       # y[t]
        self.x = None       # x[t]

    def build(self):
        d = self.data
        m = gp.Model("Model2_IntertemporalExpansion")
        m.Params.OutputFlag = 0  # silent

        # Decision variables 
        # Generation and angles for each (i,t)
        self.p = m.addVars(d.nodes, self.T, lb=0.0, name="p")
        self.delta = m.addVars(d.nodes, self.T, lb=-GRB.INFINITY, name="delta")

        # Investment decision (when to build)
        self.y = m.addVars(self.T, vtype=GRB.BINARY, name="y")

        # Availability of reinforcement over time
        self.x = m.addVars(self.T, vtype=GRB.BINARY, name="x")

        #  Objective: sum over t of (H * gen cost + C_inv * y_t) 
        H = d.H  # hours per period (e.g. 8760)

        gen_cost = gp.quicksum(
            H * d.c_t[(i, t)] * self.p[i, t]
            for i in d.nodes for t in self.T
        )

        inv_cost = gp.quicksum(
            d.C_inv * self.y[t] for t in self.T
        )

        m.setObjective(gen_cost + inv_cost, GRB.MINIMIZE)

        # ---------- Constraints ----------

        # 1. Generation capacity limits: 0 <= p_{i,t} <= Gmax_{i,t}
        for i in d.nodes:
            for t in self.T:
                m.addConstr(self.p[i, t] <= d.Gmax_t[(i, t)],
                            name=f"GenCap_{i}_{t}")

        # 2. Nodal power balance for each node and period:
        #    p_{i,t} - d_{i,t} = sum_j B_ij (delta_{i,t} - delta_{j,t})
        for i in d.nodes:
            for t in self.T:
                expr = self.p[i, t] - d.d_t[(i, t)]
                for (u, v) in d.lines:
                    if u == i:
                        j = v
                        expr -= d.B[(u, v)] * (self.delta[i, t] - self.delta[j, t])
                    elif v == i:
                        j = u
                        expr -= d.B[(u, v)] * (self.delta[i, t] - self.delta[j, t])
                m.addConstr(expr == 0, name=f"Balance_{i}_{t}")

        # 3. Intertemporal investment logic:
        #    at most one investment, x_t = sum_{tau <= t} y_tau
        m.addConstr(gp.quicksum(self.y[t] for t in self.T) <= 1,
                    name="AtMostOneInvestment")

        for t in self.T:
            m.addConstr(
                self.x[t] == gp.quicksum(self.y[tau] for tau in self.T if tau <= t),
                name=f"InvLogic_{t}"
            )

        # 4. Line capacity limits with time-dependent availability:
        #    - (P_ij + DeltaP_ij x_t) <= B_ij (delta_{i,t} - delta_{j,t}) <= (P_ij + DeltaP_ij x_t)
        for (i, j) in d.lines:
            for t in self.T:
                # solo 1–2 rinforzabile; per le altre DeltaP può anche essere 0
                eff_cap = d.Pmax[(i, j)] + d.DeltaP[(i, j)] * self.x[t]
                m.addConstr(
                    d.B[(i, j)] * (self.delta[i, t] - self.delta[j, t]) <= eff_cap,
                    name=f"LineCapUp_{i}_{j}_{t}"
                )
                m.addConstr(
                    d.B[(i, j)] * (self.delta[i, t] - self.delta[j, t]) >= -eff_cap,
                    name=f"LineCapLo_{i}_{j}_{t}"
                )

        # 5. Reference angle in each period: delta_{1,t} = 0
        for t in self.T:
            m.addConstr(self.delta[1, t] == 0.0, name=f"RefAngle_{t}")

        self.model = m

    def postprocess(self):
        """
        Extracts p_{i,t}, delta_{i,t}, y_t, x_t and computes flows f_{ij,t} = B_ij (delta_{i,t} - delta_{j,t}).
        """
        if self.model.status != GRB.OPTIMAL:
            self.results = {"status": self.model.status}
            return

        d = self.data

        p_sol = {(i, t): self.p[i, t].X for i in d.nodes for t in self.T}
        delta_sol = {(i, t): self.delta[i, t].X for i in d.nodes for t in self.T}
        y_sol = {t: self.y[t].X for t in self.T}
        x_sol = {t: self.x[t].X for t in self.T}

        flows = {}
        for (i, j) in d.lines:
            for t in self.T:
                flows[(i, j, t)] = d.B[(i, j)] * (delta_sol[(i, t)] - delta_sol[(j, t)])

        # Optionally, identify investment period (if any)
        invest_period = None
        for t in sorted(self.T):
            if y_sol[t] > 0.5:
                invest_period = t
                break

        self.results = {
            "status": self.model.status,
            "obj": self.model.objVal,
            "p": p_sol,
            "delta": delta_sol,
            "y": y_sol,
            "x": x_sol,
            "flows": flows,
            "invest_period": invest_period,
        }


plot model 2

In [11]:
# =====================
# PLOT MODEL 2
# =====================

def compute_nodal_prices_model2_lp(data: NetworkData, t_sel, x_t_value):
    """
    Calcola i nodal prices λ_i [€/MWh] per un dato periodo t_sel,
    risolvendo un LP con la decisione di investimento x_t fissata (0 o 1).

    NOTA: non moltiplichiamo per H, così le duali restano in €/MWh.
    """
    m = gp.Model(f"LP_LMP_Model2_t{t_sel}")
    m.Params.OutputFlag = 0

    # Variabili per il solo periodo t_sel
    p = m.addVars(data.nodes, lb=0.0, name="p")
    delta = m.addVars(data.nodes, lb=-GRB.INFINITY, name="delta")

    # Obiettivo: solo costi marginali in t_sel
    m.setObjective(
        gp.quicksum(data.c_t[(i, t_sel)] * p[i] for i in data.nodes),
        GRB.MINIMIZE
    )

    # Limiti di generazione
    for i in data.nodes:
        m.addConstr(
            p[i] <= data.Gmax_t[(i, t_sel)],
            name=f"GenCap_{i}_{t_sel}"
        )

    # Bilanci nodali (salviamo i vincoli per leggere le duali)
    balance_constr = {}
    for i in data.nodes:
        expr = p[i] - data.d_t[(i, t_sel)]
        for (u, v) in data.lines:
            if u == i:
                j = v
                expr -= data.B[(u, v)] * (delta[i] - delta[j])
            elif v == i:
                j = u
                expr -= data.B[(u, v)] * (delta[i] - delta[j])
        balance_constr[i] = m.addConstr(expr == 0, name=f"Balance_{i}_{t_sel}")

    # Limiti di capacità linea per t_sel con x_t fissato
    for (i, j) in data.lines:
        if (i, j) == (1, 2):   # solo la linea 1-2 è rinforzabile
            eff_cap = data.Pmax[(i, j)] + data.DeltaP[(i, j)] * x_t_value
        else:
            eff_cap = data.Pmax[(i, j)]
        m.addConstr(
            data.B[(i, j)] * (delta[i] - delta[j]) <= eff_cap,
            name=f"LineCapUp_{i}_{j}_{t_sel}"
        )
        m.addConstr(
            data.B[(i, j)] * (delta[i] - delta[j]) >= -eff_cap,
            name=f"LineCapLo_{i}_{j}_{t_sel}"
        )

    # Riferimento angolo
    m.addConstr(delta[1] == 0.0, name=f"RefAngle_{t_sel}")

    m.optimize()
    if m.status != GRB.OPTIMAL:
        return None

    # Duali dei bilanci = LMP
    lmp = {i: balance_constr[i].Pi for i in data.nodes}
    return lmp


def plot_dispatch_model2(data: NetworkData, results: dict, t_view, lmp_t=None):
    """
    Plot per Model 2 (periodo t_view):
    - nodi con G, D e (opzionale) λ_i
    - linee con spessore ∝ |flow|
    - testo vicino a ciascuna linea con il valore del flusso [MW]
    """
    from math import fabs

    fig = go.Figure()

    if results.get("status") != GRB.OPTIMAL:
        fig.add_annotation(text="No optimal solution", x=0.5, y=0.5, showarrow=False)
        fig.update_xaxes(visible=False)
        fig.update_yaxes(visible=False)
        fig.update_layout(height=400, margin=dict(l=10, r=10, t=30, b=10))
        return fig

    p = results["p"]
    flows = results["flows"]
    x_sol = results["x"]

    # --- Node positions and hover text for selected period ---
    xs, ys, texts, labels = [], [], [], []
    for i in data.nodes:
        x, y = data.node_pos[i]
        xs.append(x)
        ys.append(y)
        labels.append(f"N{i}")
        txt = (
            f"N{i}<br>"
            f"G: {p[(i, t_view)]:.1f} MW<br>"
            f"D: {data.d_t[(i, t_view)]:.1f} MW"
        )
        if lmp_t is not None:
            txt += f"<br>λ: {lmp_t[i]:.1f} €/MWh"
        texts.append(txt)

    # --- Draw lines for selected period ---
    flows_t = {(i, j): flows[(i, j, t_view)] for (i, j) in data.lines}
    max_flow = max(fabs(v) for v in flows_t.values()) if flows_t else 1.0

    for (i, j), f_ij in flows_t.items():
        x0, y0 = data.node_pos[i]
        x1, y1 = data.node_pos[j]
        width = 2 + 8 * (abs(f_ij) / max_flow if max_flow > 0 else 0)

        # segmento
        fig.add_trace(go.Scatter(
            x=[x0, x1],
            y=[y0, y1],
            mode="lines",
            line=dict(width=width),
            hoverinfo="text",
            text=[f"flow {i}-{j}: {f_ij:.1f} MW"],
            showlegend=False,
        ))

        # testo vicino alla linea (leggermente spostato verso il basso)
        xm = 0.5 * (x0 + x1)
        ym = 0.5 * (y0 + y1) - 0.15  # offset verticale per non sovrapporsi
        fig.add_trace(go.Scatter(
            x=[xm],
            y=[ym],
            mode="text",
            text=[f"{f_ij:.1f} MW"],
            hoverinfo="skip",
            showlegend=False,
        ))

    # --- Draw nodes ---
    fig.add_trace(go.Scatter(
        x=xs,
        y=ys,
        mode="markers+text",
        marker=dict(size=20, line=dict(width=2, color="black")),
        text=labels,
        textposition="top center",
        hoverinfo="text",
        hovertext=texts,
        showlegend=False,
    ))

    invest_active = "active" if x_sol[t_view] > 0.5 else "inactive"
    fig.update_layout(
        title=f"Model 2 – Dispatch in period t={t_view} (reinforcement {invest_active})",
        xaxis=dict(visible=False),
        yaxis=dict(visible=False),
        height=450,
        margin=dict(l=10, r=10, t=40, b=10),
    )
    return fig


Dashboard model 2

In [None]:
# =====================
# DASHBOARD MODEL 2
# =====================

# Instanza dati per Model 2
data2 = NetworkData()  # puoi usare lo stesso di Model 1, ma qui ne istanzio uno separato

out_plot2 = Output()
out_text2 = Output()

# ---------------- Per-period sliders ----------------
g_sliders_2 = {}
d_sliders_2 = {}
period_boxes = []

for t in data2.periods:
    g_sliders_2[t] = {}
    d_sliders_2[t] = {}

    for i in data2.nodes:
        g_sliders_2[t][f"Gmax_{i}"] = FloatSlider(
            value=data2.Gmax_t[(i, t)],
            min=0.0,
            max=1500.0,
            step=10.0,
            description=f"G{i} cap (t={t})",
            continuous_update=False,
            layout=Layout(width="95%"),
        )
        g_sliders_2[t][f"C_{i}"] = FloatSlider(
            value=data2.c_t[(i, t)],
            min=0.0,
            max=700.0,
            step=5.0,
            description=f"G{i} cost (t={t})",
            continuous_update=False,
            layout=Layout(width="95%"),
        )

    for i in data2.nodes:
        d_sliders_2[t][f"D_{i}"] = FloatSlider(
            value=data2.d_t[(i, t)],
            min=0.0,
            max=800.0,
            step=10.0,
            description=f"D{i} (t={t})",
            continuous_update=False,
            layout=Layout(width="95%"),
        )

    # ordine: tutti i cap, poi tutti i cost, poi tutti i load
    box_t = VBox(
        [g_sliders_2[t][f"Gmax_{i}"] for i in data2.nodes] +
        [g_sliders_2[t][f"C_{i}"] for i in data2.nodes] +
        [d_sliders_2[t][f"D_{i}"] for i in data2.nodes]
    )
    period_boxes.append(box_t)

period_tab = Tab(children=period_boxes)
for idx, t in enumerate(data2.periods):
    period_tab.set_title(idx, f"Period t={t}")

# ---------------- Lines & investment sliders ----------------
line_sliders_2 = {}
for (i, j) in data2.lines:
    key = f"Pmax_{i}{j}"
    line_sliders_2[key] = FloatSlider(
        value=data2.Pmax[(i, j)],
        min=50.0,
        max=600.0,
        step=10.0,
        description=f"Pmax {i}-{j}",
        continuous_update=False,
        layout=Layout(width="95%"),
    )

DeltaP_slider_2 = FloatSlider(
    value=data2.DeltaP[(1, 2)],
    min=0.0,
    max=400.0,
    step=10.0,
    description="ΔP 1-2",
    continuous_update=False,
    layout=Layout(width="95%"),
)

Cinv_slider_2 = FloatSlider(
    value=data2.C_inv,
    min=0.0,
    max=10_000_000_000.0,
    step=1_000_000.0,
    description="C_inv (annual)",
    continuous_update=False,
    layout=Layout(width="95%"),
)

lines_box_2 = VBox(list(line_sliders_2.values()) + [DeltaP_slider_2, Cinv_slider_2])

# ---------------- Main tabs: per-period data vs lines ----------------
main_tab_2 = Tab(children=[period_tab, lines_box_2])
main_tab_2.set_title(0, "Per-period data")
main_tab_2.set_title(1, "Lines & Investment")

# ---------------- Period selector for visualization ----------------
period_view = Dropdown(
    options=data2.periods,
    value=data2.periods[0],
    description="View period t",
    layout=Layout(width="40%"),
)


def run_model2_and_update(change=None):
    # 1) Aggiorna data2 da tutti gli slider
    for t in data2.periods:
        for i in data2.nodes:
            data2.Gmax_t[(i, t)] = g_sliders_2[t][f"Gmax_{i}"].value
            data2.c_t[(i, t)] = g_sliders_2[t][f"C_{i}"].value
            data2.d_t[(i, t)] = d_sliders_2[t][f"D_{i}"].value

    for (i, j) in data2.lines:
        key = f"Pmax_{i}{j}"
        data2.Pmax[(i, j)] = line_sliders_2[key].value

    data2.DeltaP[(1, 2)] = DeltaP_slider_2.value
    data2.C_inv = Cinv_slider_2.value

    # 2) Risolvi Model 2
    model2 = Model2(data2)
    res2 = model2.run()

    t_sel = period_view.value

    # 3) Calcola LMP per il periodo selezionato
    lmp_t = None
    if res2.get("status") == GRB.OPTIMAL:
        x_t_val = int(round(res2["x"][t_sel]))
        lmp_t = compute_nodal_prices_model2_lp(data2, t_sel, x_t_val)

    # 4) Aggiorna plot
    with out_plot2:
        clear_output(wait=True)
        fig = plot_dispatch_model2(data2, res2, t_sel, lmp_t=lmp_t)
        fig.show()

    # 5) Aggiorna testo
    with out_text2:
        clear_output(wait=True)
        if res2.get("status") == GRB.OPTIMAL:
            print(f"TOTAL objective value (sum over t): {res2['obj']:.2f} DKK")
            print(f"Investment period (if any): {res2['invest_period']}")
            print("\nInvestment decisions y_t and x_t:")
            for t in data2.periods:
                print(f"  t={t}: y_t = {res2['y'][t]:.0f}, x_t = {res2['x'][t]:.0f}")

            print(f"\nDispatch in viewed period t={t_sel}:")
            for i in data2.nodes:
                print(f"  Node {i}: p[i,t] = {res2['p'][(i, t_sel)]:.1f} MW, "
                      f"d[i,t] = {data2.d_t[(i, t_sel)]:.1f} MW")

            print(f"\nLine flows in t={t_sel}:")
            for (i, j) in data2.lines:
                f_ij = res2["flows"][(i, j, t_sel)]
                print(f"  {i}-{j}: {f_ij:.1f} MW")

            if lmp_t is not None:
                print(f"\nNodal prices λ_i in t={t_sel} [€/MWh]:")
                for i in data2.nodes:
                    print(f"  Node {i}: {lmp_t[i]:.2f}")
        else:
            print("Model 2 not optimal. Status:", res2.get("status"))


# Collega tutti gli slider + il period_view a run_model2_and_update
widgets_to_link = []
for t in data2.periods:
    widgets_to_link.extend(list(g_sliders_2[t].values()))
    widgets_to_link.extend(list(d_sliders_2[t].values()))

widgets_to_link.extend(list(line_sliders_2.values()))
widgets_to_link.extend([DeltaP_slider_2, Cinv_slider_2, period_view])

for w in widgets_to_link:
    w.observe(run_model2_and_update, names="value")

# Primo run
run_model2_and_update()

# Mostra dashboard
display(main_tab_2)
display(period_view)
display(out_plot2)
display(out_text2)


Tab(children=(Tab(children=(VBox(children=(FloatSlider(value=200.0, continuous_update=False, description='G1 c…

Dropdown(description='View period t', layout=Layout(width='40%'), options=(1, 2, 3), value=1)

Output()

Output()

Model 2 – Threshold analysis


In [13]:
def invests_model2_in_period(target_period):
    """
    Returns a function that checks if Model 2 invests in the given target_period.
    """
    def indicator(results):
        if results.get("status") != GRB.OPTIMAL:
            return False
        # invest_period must be stored in Model2.postprocess()
        return results.get("invest_period") == target_period
    return indicator


def scan_load_threshold_model2_in_period(base_data, node, period, load_values, target_period):
    """
    Scans demand at (node, period) and checks if the optimal solution
    invests in target_period.
    """
    def update_fn(d, val):
        # modify demand at given node and period
        d.d_t[(node, period)] = val

    invest_indicator_fn = invests_model2_in_period(target_period)

    return parameter_scan(
        model_cls=Model2,
        invest_indicator_fn=invest_indicator_fn,
        base_data=base_data,
        update_fn=update_fn,
        values=load_values,
    )

In [14]:


# Range of demand values at node 2 in period 1
load_values_2_t1 = list(np.linspace(0, 2000, 100))

scan_M2_t1 = scan_load_threshold_model2_in_period(
    base_data=data2,      # IMPORTANT: same data as dashboard
    node=2,
    period=1,             # perturb demand in t=1
    load_values=load_values_2_t1,
    target_period=1,      # we want investment to occur in t=1
)

threshold_M2_t1 = None
for r in scan_M2_t1:
    if r["invests"]:
        threshold_M2_t1 = r["value"]
        break

print("Model 2 – first load at node 2 in period 1 where investment occurs in t=1:",
      threshold_M2_t1)

fig_M2_t1 = plot_investment_scan(
    scan_M2_t1,
    xlabel="Demand at node 2 in period 1 [MW]",
    title="Model 2 – Investment in t=1 vs demand at node 2 (t=1)",
)
fig_M2_t1.show()




# Range of demand values at node 2 in period 2
load_values_2_t2 = list(np.linspace(0, 2000, 100))

scan_M2_t2 = scan_load_threshold_model2_in_period(
    base_data=data2,
    node=2,
    period=2,             # perturb demand in t=2
    load_values=load_values_2_t2,
    target_period=2,      # we want investment to occur in t=2
)

threshold_M2_t2 = None
for r in scan_M2_t2:
    if r["invests"]:
        threshold_M2_t2 = r["value"]
        break

print("Model 2 – first load at node 2 in period 2 where investment occurs in t=2:",
      threshold_M2_t2)

fig_M2_t2 = plot_investment_scan(
    scan_M2_t2,
    xlabel="Demand at node 2 in period 2 [MW]",
    title="Model 2 – Investment in t=2 vs demand at node 2 (t=2)",
)
fig_M2_t2.show()




# Range of demand values at node 2 in period 3
load_values_2_t3 = list(np.linspace(100, 2000, 100))

scan_M2_t3 = scan_load_threshold_model2_in_period(
    base_data=data2,
    node=2,
    period=3,             # perturb demand in t=3
    load_values=load_values_2_t3,
    target_period=3,      # we want investment to occur in t=3
)

threshold_M2_t3 = None
for r in scan_M2_t3:
    if r["invests"]:
        threshold_M2_t3 = r["value"]
        break

print("Model 2 – first load at node 2 in period 3 where investment occurs in t=3:",
      threshold_M2_t3)

fig_M2_t3 = plot_investment_scan(
    scan_M2_t3,
    xlabel="Demand at node 2 in period 3 [MW]",
    title="Model 2 – Investment in t=3 vs demand at node 2 (t=3)",
)
fig_M2_t3.show()


Model 2 – first load at node 2 in period 1 where investment occurs in t=1: None


Model 2 – first load at node 2 in period 2 where investment occurs in t=2: 262.6262626262626


Model 2 – first load at node 2 in period 3 where investment occurs in t=3: None


Model 3

In [15]:
# =====================
# MODEL 3
# =====================

class Model3(BaseModel):
    """
    Model 3: Single-period market equilibrium (KKT with SOS1).
    Same 3-node network and single investment x on line (1,2),
    but the market-clearing DC-OPF is represented via its KKT conditions.
    """
    def __init__(self, data: NetworkData):
        super().__init__(data)
        # primal variables
        self.p = None       # generation at each node i
        self.delta = None   # voltage angles at nodes
        self.f = None       # line flows f_{ij}
        self.x = None       # investment decision (line 1-2)
        # dual variables
        self.lmbda = None   # nodal prices (dual of nodal balance)
        self.phi_min = None # dual of lower gen bound
        self.phi_max = None # dual of upper gen bound
        self.mu_min = None  # dual of lower line limit
        self.mu_max = None  # dual of upper line limit
        self.sigma = None   # dual of DC flow-angle constraint
        self.rho = None     # dual of reference angle
        # slack / auxiliary variables for complementarity
        self.s_pmin = None
        self.s_pmax = None
        self.s_lmin = None
        self.s_lmax = None

    def build(self):
        d = self.data
        m = gp.Model("Model3_Equilibrium_KKT_SOS1")
        m.Params.OutputFlag = 0  # silent

        N = d.nodes
        L = d.lines
        H = d.H  # hours represented by the single period

        # ---------- Primal variables ----------
        self.p = m.addVars(N, lb=0.0, name="p")  # 0 <= p_i
        self.delta = m.addVars(N, lb=-GRB.INFINITY, name="delta")
        self.f = m.addVars(L, lb=-GRB.INFINITY, name="f")
        self.x = m.addVar(vtype=GRB.BINARY, name="x")

        # ---------- Dual variables ----------
        self.lmbda = m.addVars(N, lb=-GRB.INFINITY, name="lambda")
        self.phi_min = m.addVars(N, lb=0.0, name="phi_min")
        self.phi_max = m.addVars(N, lb=0.0, name="phi_max")
        self.mu_min = m.addVars(L, lb=0.0, name="mu_min")
        self.mu_max = m.addVars(L, lb=0.0, name="mu_max")
        self.sigma = m.addVars(L, lb=-GRB.INFINITY, name="sigma")
        self.rho = m.addVar(lb=-GRB.INFINITY, name="rho")

        # ---------- Slack / auxiliary variables for complementarity ----------
        # For generator bounds
        self.s_pmin = m.addVars(N, lb=0.0, name="s_pmin")  # slack for p_i >= 0
        self.s_pmax = m.addVars(N, lb=0.0, name="s_pmax")  # slack for p_i <= Gmax_i

        # For line limits
        self.s_lmin = m.addVars(L, lb=0.0, name="s_lmin")  # slack for lower line capacity
        self.s_lmax = m.addVars(L, lb=0.0, name="s_lmax")  # slack for upper line capacity

        # ---------- Objective (planner) ----------
        gen_cost = H * gp.quicksum(d.c[i] * self.p[i] for i in N)
        inv_cost = d.C_inv * self.x
        m.setObjective(gen_cost + inv_cost, GRB.MINIMIZE)

        # ---------- Primal feasibility constraints ----------

        # Generation capacity: 0 <= p_i <= Gmax_i
        for i in N:
            # upper bound
            m.addConstr(self.p[i] <= d.Gmax[i], name=f"GenCapUp_{i}")
            # define slack variables for bounds:
            # s_pmin[i] = p_i  >= 0
            m.addConstr(self.s_pmin[i] == self.p[i], name=f"S_pmin_def_{i}")
            # s_pmax[i] = Gmax_i - p_i >= 0
            m.addConstr(self.s_pmax[i] == d.Gmax[i] - self.p[i], name=f"S_pmax_def_{i}")

        # Nodal power balance: p_i - d_i = sum of inflows (using oriented flows f_ij)
        for i in N:
            expr = self.p[i] - d.d[i]
            for (u, v) in L:
                if u == i:
                    # flow leaves node i: inflow contribution -f_{uv}
                    expr -= self.f[(u, v)]
                elif v == i:
                    # flow enters node i: inflow contribution +f_{uv}
                    expr += self.f[(u, v)]
            m.addConstr(expr == 0.0, name=f"Balance_{i}")

        # DC flow-angle relation: f_{ij} = B_ij (delta_i - delta_j)
        for (i, j) in L:
            m.addConstr(
                self.f[(i, j)] == d.B[(i, j)] * (self.delta[i] - self.delta[j]),
                name=f"DC_{i}_{j}"
            )

        # Line capacity with investment: -Pcap <= f_ij <= Pcap
        for (i, j) in L:
            cap_ij = d.Pmax[(i, j)] + d.DeltaP[(i, j)] * self.x
            # lower bound: f_ij >= -cap_ij  ->  -cap_ij - f_ij <= 0
            m.addConstr(
                -cap_ij - self.f[(i, j)] <= 0.0,
                name=f"LineCapLo_{i}_{j}"
            )
            # upper bound: f_ij <= cap_ij -> f_ij - cap_ij <= 0
            m.addConstr(
                self.f[(i, j)] - cap_ij <= 0.0,
                name=f"LineCapUp_{i}_{j}"
            )
            # define slacks:
            # s_lmin = cap_ij + f_ij >= 0   (slack for lower bound)
            m.addConstr(
                self.s_lmin[(i, j)] == d.Pmax[(i, j)] + d.DeltaP[(i, j)] * self.x + self.f[(i, j)],
                name=f"S_lmin_def_{i}_{j}"
            )
            # s_lmax = cap_ij - f_ij >= 0   (slack for upper bound)
            m.addConstr(
                self.s_lmax[(i, j)] == d.Pmax[(i, j)] + d.DeltaP[(i, j)] * self.x - self.f[(i, j)],
                name=f"S_lmax_def_{i}_{j}"
            )

        # Reference angle
        ref_node = 1
        m.addConstr(self.delta[ref_node] == 0.0, name="RefAngle")

        # ---------- Stationarity conditions (KKT) ----------

        # dL/dp_i = 0: H c_i + lambda_i + phi_max_i - phi_min_i = 0
        for i in N:
            m.addConstr(
                d.H * d.c[i] + self.lmbda[i] + self.phi_max[i] - self.phi_min[i] == 0.0,
                name=f"Stat_p_{i}"
            )

        # dL/df_ij = 0: -lambda_i + lambda_j + sigma_ij - mu_min_ij + mu_max_ij = 0
        for (i, j) in L:
            m.addConstr(
                -self.lmbda[i] + self.lmbda[j] + self.sigma[(i, j)] - self.mu_min[(i, j)] + self.mu_max[(i, j)] == 0.0,
                name=f"Stat_f_{i}_{j}"
            )

        # dL/ddelta_i = 0:
        # sum over incident lines of B_ij * sigma terms + rho for reference node
        for i in N:
            expr = 0.0
            for (u, v) in L:
                if u == i:
                    expr -= self.sigma[(u, v)] * d.B[(u, v)]
                elif v == i:
                    expr += self.sigma[(u, v)] * d.B[(u, v)]
            if i == ref_node:
                expr += self.rho  # derivative of rho * delta_ref
            m.addConstr(expr == 0.0, name=f"Stat_delta_{i}")

        # ---------- Complementarity via SOS1 ----------

        # For each generator bound:
        #   phi_min[i] * s_pmin[i] = 0,  s_pmin[i] >= 0, phi_min[i] >= 0
        #   phi_max[i] * s_pmax[i] = 0,  s_pmax[i] >= 0, phi_max[i] >= 0
        for i in N:
            m.addSOS(GRB.SOS_TYPE1, [self.phi_min[i], self.s_pmin[i]], [1.0, 1.0])
            m.addSOS(GRB.SOS_TYPE1, [self.phi_max[i], self.s_pmax[i]], [1.0, 1.0])

        # For each line capacity:
        #   mu_min[ij] * s_lmin[ij] = 0
        #   mu_max[ij] * s_lmax[ij] = 0
        for (i, j) in L:
            m.addSOS(GRB.SOS_TYPE1, [self.mu_min[(i, j)], self.s_lmin[(i, j)]], [1.0, 1.0])
            m.addSOS(GRB.SOS_TYPE1, [self.mu_max[(i, j)], self.s_lmax[(i, j)]], [1.0, 1.0])

        self.model = m

    def postprocess(self):
        """
        Estrae p, delta, f, x e i moltiplicatori duali.
        Le lambda raw sono in [€/MW·year]; salviamo anche lambda_MWh = lambda/H in [€/MWh].
        """
        if self.model.status != GRB.OPTIMAL:
            self.results = {"status": self.model.status}
            return

        d = self.data
        N = d.nodes
        L = d.lines

        p_sol = {i: self.p[i].X for i in N}
        delta_sol = {i: self.delta[i].X for i in N}
        f_sol = {(i, j): self.f[(i, j)].X for (i, j) in L}
        x_sol = self.x.X

        lambda_raw = {i: self.lmbda[i].X for i in N}
        # scala per ottenere LMP in €/MWh
        lambda_MWh = {i: lambda_raw[i] / d.H for i in N}

        mu_min_raw = {(i, j): self.mu_min[(i, j)].X for (i, j) in L}
        mu_max_raw = {(i, j): self.mu_max[(i, j)].X for (i, j) in L}

        # scala per ottenere valori orari (analoghi alle λ in €/MWh)
        mu_min = {(i, j): mu_min_raw[(i, j)] / d.H for (i, j) in L}
        mu_max = {(i, j): mu_max_raw[(i, j)] / d.H for (i, j) in L}

        self.results = {
            "status": self.model.status,
            "obj": self.model.objVal,
            "p": p_sol,
            "delta": delta_sol,
            "f": f_sol,
            "x": x_sol,
            "lambda": lambda_MWh,      # LMP in €/MWh
            "lambda_raw": lambda_raw,  # opzionale
            "mu_min": mu_min,          # €/MW·h ~ €/MWh
            "mu_max": mu_max,
            "mu_min_raw": mu_min_raw,  # opzionale se ti servono annuali
            "mu_max_raw": mu_max_raw,
        }


Plot Model 3

In [16]:
# =====================
# PLOT MODEL 3
# =====================

def plot_dispatch_model3(data: NetworkData, results: dict):
    """
    Plotly figure for Model 3:
    - nodes with generation, demand and LMP (in €/MWh)
    - line flows with thickness proportional to |flow|
    - testo vicino ad ogni linea con il valore del flusso [MW]
    """
    from math import fabs

    fig = go.Figure()

    if results.get("status") != GRB.OPTIMAL:
        fig.add_annotation(text="No optimal solution", x=0.5, y=0.5, showarrow=False)
        fig.update_xaxes(visible=False)
        fig.update_yaxes(visible=False)
        fig.update_layout(height=400, margin=dict(l=10, r=10, t=30, b=10))
        return fig

    p = results.get("p", {})
    flows = results.get("f", {})
    x_sol = results.get("x", 0.0)
    lmp = results.get("lambda", {})  # in €/MWh

    # --- Draw nodes ---
    xs, ys, texts, labels = [], [], [], []
    for i in data.nodes:
        x, y = data.node_pos[i]
        xs.append(x)
        ys.append(y)
        labels.append(f"N{i}")
        lam = lmp.get(i, 0.0)
        txt = (
            f"N{i}<br>"
            f"G: {p.get(i, 0.0):.1f} MW<br>"
            f"D: {data.d[i]:.1f} MW<br>"
            f"λ: {lam:.2f} €/MWh"
        )
        texts.append(txt)

    fig.add_trace(
        go.Scatter(
            x=xs,
            y=ys,
            mode="markers+text",
            text=labels,
            textposition="top center",
            marker=dict(size=18, line=dict(width=2, color="black")),
            hoverinfo="text",
            hovertext=texts,
            showlegend=False,
        )
    )

    # --- Draw lines ---
    if flows:
        max_flow = max(fabs(v) for v in flows.values())
    else:
        max_flow = 1.0

    for (i, j), f_ij in flows.items():
        x0, y0 = data.node_pos[i]
        x1, y1 = data.node_pos[j]
        width = 2 + 8 * (abs(f_ij) / max_flow if max_flow > 1e-6 else 0.0)

        # linea
        fig.add_trace(
            go.Scatter(
                x=[x0, x1],
                y=[y0, y1],
                mode="lines",
                line=dict(width=width),
                hoverinfo="text",
                hovertext=f"{i}-{j}: {f_ij:.1f} MW",
                showlegend=False,
            )
        )

        # testo vicino alla linea (leggermente spostato verso il basso)
        xm = 0.5 * (x0 + x1)
        ym = 0.5 * (y0 + y1) - 0.15
        fig.add_trace(
            go.Scatter(
                x=[xm],
                y=[ym],
                mode="text",
                text=[f"{f_ij:.1f} MW"],
                hoverinfo="skip",
                showlegend=False,
            )
        )

    invest_text = "built" if x_sol > 0.5 else "not built"
    fig.update_layout(
        title=f"Model 3 – Dispatch and LMPs (line 1–2 {invest_text})",
        xaxis=dict(visible=False),
        yaxis=dict(visible=False),
        height=450,
        margin=dict(l=10, r=10, t=40, b=10),
    )
    return fig


Dashboard Model 3

In [17]:
# =====================
# DASHBOARD MODEL 3
# =====================

# Istanza dati per Model 3
data3 = NetworkData()

# Output per plot e testo
out_plot3 = Output()
out_text3 = Output()

# ---------------- Generators tab ----------------
g_sliders_3 = {}
for i in data3.nodes:
    g_sliders_3[f"Gmax_{i}"] = FloatSlider(
        value=data3.Gmax[i],
        min=0.0,
        max=800.0,
        step=10.0,
        description=f"G{i} cap",
        continuous_update=False,
        layout=Layout(width="95%"),
    )
    g_sliders_3[f"C_{i}"] = FloatSlider(
        value=data3.c[i],
        min=0.0,
        max=150.0,
        step=5.0,
        description=f"G{i} cost",
        continuous_update=False,
        layout=Layout(width="95%"),
    )

gen_tab_3 = VBox(
    [g_sliders_3[f"Gmax_{i}"] for i in data3.nodes]   # tutti i cap
    + [g_sliders_3[f"C_{i}"] for i in data3.nodes]    # poi tutti i cost
)


# ---------------- Load tab ----------------
d_sliders_3 = {}
for i in data3.nodes:
    d_sliders_3[f"D_{i}"] = FloatSlider(
        value=data3.d[i],
        min=0.0,
        max=1000.0,
        step=10.0,
        description=f"D{i}",
        continuous_update=False,
        layout=Layout(width="95%"),
    )

load_tab_3 = VBox(list(d_sliders_3.values()))

# ---------------- Lines tab ----------------
line_sliders_3 = {}
for (i, j) in data3.lines:
    key = f"Pmax_{i}{j}"
    line_sliders_3[key] = FloatSlider(
        value=data3.Pmax[(i, j)],
        min=50.0,
        max=600.0,
        step=10.0,
        description=f"Pmax {i}-{j}",
        continuous_update=False,
        layout=Layout(width="95%"),
    )

DeltaP_slider_3 = FloatSlider(
    value=data3.DeltaP[(1, 2)],
    min=0.0,
    max=400.0,
    step=10.0,
    description="ΔP 1-2",
    continuous_update=False,
    layout=Layout(width="95%"),
)

Cinv_slider_3 = FloatSlider(
    value=data3.C_inv,
    min=0.0,
    max=10_000_000_000.0,
    step=1_000_000.0,
    description="C_inv (annual)",
    continuous_update=False,
    layout=Layout(width="95%"),
)

lines_tab_3 = VBox(list(line_sliders_3.values()) + [DeltaP_slider_3, Cinv_slider_3])

# ---------------- Tab container ----------------
main_tab_3 = Tab(children=[gen_tab_3, load_tab_3, lines_tab_3])
main_tab_3.set_title(0, "Generators")
main_tab_3.set_title(1, "Loads")
main_tab_3.set_title(2, "Lines & investment")

# ---------------- Callback ----------------
def run_model3_and_update(change=None):
    # 1) Aggiorna parametri da slider
    # Generatori
    for i in data3.nodes:
        data3.Gmax[i] = g_sliders_3[f"Gmax_{i}"].value
        data3.c[i] = g_sliders_3[f"C_{i}"].value

    # Domanda
    for i in data3.nodes:
        data3.d[i] = d_sliders_3[f"D_{i}"].value

    # Linee
    for (i, j) in data3.lines:
        key = f"Pmax_{i}{j}"
        data3.Pmax[(i, j)] = line_sliders_3[key].value

    data3.DeltaP[(1, 2)] = DeltaP_slider_3.value
    data3.C_inv = Cinv_slider_3.value

    # 2) Risolvi Model 3
    model3 = Model3(data3)
    res3 = model3.run()

    # 3) Plot
    with out_plot3:
        clear_output(wait=True)
        fig = plot_dispatch_model3(data3, res3)
        fig.show()

    # 4) Testo
    with out_text3:
        clear_output(wait=True)
        if res3.get("status") == GRB.OPTIMAL:
            print(f"Optimal objective value: {res3['obj']:.2f} €")
            print(f"Investment decision line 1-2 (0: NO, 1: YES): {int(round(res3['x']))}")

            print("\nGeneration p_i [MW]:")
            for i in data3.nodes:
                print(f"  Node {i}: {res3['p'][i]:.1f}")

            print("\nLine flows f_ij [MW]:")
            for (i, j), f_ij in res3["f"].items():
                print(f"  {i}-{j}: {f_ij:.1f}")

            print("\nLocational marginal prices λ_i [€/MWh]:")
            for i in data3.nodes:
                print(f"  Node {i}: {res3['lambda'][i]:.2f}")

            print("\nCongestion shadow prices (μ_min, μ_max) on line limits [€/MWh]:")
            for (i, j) in data3.lines:
                print(f"  {i}-{j}: μ_min={res3['mu_min'][(i, j)]:.2f}, "
                    f"μ_max={res3['mu_max'][(i, j)]:.2f}")
        else:
            print("Model 3 not optimal. Status:", res3.get("status"))

# Collega tutti gli slider alla funzione di update
widgets_3 = (
    list(g_sliders_3.values())
    + list(d_sliders_3.values())
    + list(line_sliders_3.values())
    + [DeltaP_slider_3, Cinv_slider_3]
)

for w in widgets_3:
    w.observe(run_model3_and_update, names="value")

# Primo run
run_model3_and_update()

# Mostra dashboard
display(main_tab_3)
display(out_plot3)
display(out_text3)


Tab(children=(VBox(children=(FloatSlider(value=500.0, continuous_update=False, description='G1 cap', layout=La…

Output()

Output()

Model 3 Trheshold analytics

In [18]:
def invests_model3(results: dict) -> bool:
    """
    Returns True if Model 3 decides to invest (x ~ 1).
    """
    if results.get("status") != GRB.OPTIMAL:
        return False
    return results.get("x", 0.0) > 0.5

def scan_load_threshold_model3(base_data: NetworkData, node: int, load_values):
    """
    Uses the generic parameter_scan for Model 3, varying the demand at 'node'.

    base_data: istanza di NetworkData usata come base
    node: indice del nodo su cui variare il carico (1, 2 o 3)
    load_values: lista di valori di domanda [MW] da testare
    """
    def update_fn(d: NetworkData, val):
        d.d[node] = val

    return parameter_scan(
        model_cls=Model3,
        invest_indicator_fn=invests_model3,
        base_data=base_data,
        update_fn=update_fn,
        values=load_values,
    )


In [19]:
# Threshold analysis per Model 3:
# quanto deve crescere la domanda al nodo 2 perché convenga rinforzare la linea 1-2?

data_M3 = NetworkData()

# lista di valori di domanda al nodo 2 (ad esempio da 100 a 2000 MW)
load_values_M3 = list(np.linspace(100, 1000, 100))

scan_M3 = scan_load_threshold_model3(
    base_data=data_M3,
    node=2,
    load_values=load_values_M3,
)

# Trova il primo valore di load per cui il modello investe (x=1)
threshold_M3 = None
for r in scan_M3:
    if r["invests"]:
        threshold_M3 = r["value"]
        break

print("Model 3 – first load value at node 2 where investment occurs:", threshold_M3)

fig_M3 = plot_investment_scan(
    scan_M3,
    xlabel="Demand at node 2 [MW]",
    title="Model 3 – Investment vs demand at node 2",
)
fig_M3.show()


Model 3 – first load value at node 2 where investment occurs: 254.54545454545456


Model 4

In [20]:
# =====================
# MODEL 4
# =====================

class Model4(BaseModel):
    """
    Model 4: Two-stage stochastic intertemporal expansion model.
    First-stage (here-and-now): investment timing y_t and availability x_t.
    Second-stage (wait-and-see): scenario-dependent dispatch p_{i,t}^ω,
    voltage angles δ_{i,t}^ω, and flows f_{ij,t}^ω.
    """

    def __init__(self, data: NetworkData):
        super().__init__(data)

        # First-stage variables
        self.y = None   # y[t] ∈ {0,1}: investment in period t
        self.x = None   # x[t] ∈ {0,1}: reinforcement available in period t

        # Second-stage variables (scenario dependent)
        self.p = None       # p[i, t, omega]
        self.delta = None   # delta[i, t, omega]
        self.f = None       # f[i, j, t, omega] only for (i,j) in lines

    def build(self):
        d = self.data
        m = gp.Model("Model4_StochasticIntertemporal")
        m.Params.OutputFlag = 0  # silent

        N = d.nodes
        L = d.lines
        T = d.periods
        Omega = d.scenarios
        pi = d.prob
        H = d.H

        # -------------------- First-stage variables --------------------
        self.y = m.addVars(T, vtype=GRB.BINARY, name="y")
        self.x = m.addVars(T, vtype=GRB.BINARY, name="x")

        # -------------------- Second-stage variables --------------------
        # p[i,t,omega] ≥ 0
        self.p = m.addVars(N, T, Omega, lb=0.0, name="p")
        # delta[i,t,omega] free
        self.delta = m.addVars(N, T, Omega, lb=-GRB.INFINITY, name="delta")
        # f[i,j,t,omega] free solo per le linee in L
        self.f = m.addVars(L, T, Omega, lb=-GRB.INFINITY, name="f")

        # -------------------- Objective: investment + expected operating cost --------------------
        inv_cost = gp.quicksum(d.C_inv * self.y[t] for t in T)

        exp_oper_cost = gp.quicksum(
            pi[omega] * H * gp.quicksum(
                d.c_omega[(i, t, omega)] * self.p[i, t, omega]
                for i in N for t in T
            )
            for omega in Omega
        )

        m.setObjective(inv_cost + exp_oper_cost, GRB.MINIMIZE)

        # -------------------- 1. Intertemporal investment logic --------------------
        # At most one investment period
        m.addConstr(gp.quicksum(self.y[t] for t in T) <= 1, name="AtMostOneInvestment")

        # Availability: x_t = sum_{τ ≤ t} y_τ
        for idx, t in enumerate(T):
            m.addConstr(
                self.x[t] == gp.quicksum(self.y[T[k]] for k in range(idx + 1)),
                name=f"Availability_{t}"
            )

        # -------------------- 2. Second-stage constraints (for all t, omega) --------------------
        for omega in Omega:
            for t in T:

                # 2.a Generation limits: 0 <= p_{i,t}^ω <= G_{i,t}^ω
                for i in N:
                    m.addConstr(
                        self.p[i, t, omega] <= d.Gmax_omega[(i, t, omega)],
                        name=f"GenCap_{i}_{t}_{omega}"
                    )

                # 2.b Nodal power balance: p_{i,t}^ω - d_{i,t}^ω = sum_j f_{ij,t}^ω
                for i in N:
                    expr = self.p[i, t, omega] - d.d_omega[(i, t, omega)]
                    for (u, v) in L:
                        if u == i:
                            expr -= self.f[u, v, t, omega]   # flow leaving i
                        elif v == i:
                            expr += self.f[u, v, t, omega]   # flow entering i
                    m.addConstr(expr == 0.0, name=f"Balance_{i}_{t}_{omega}")

                # 2.c DC power flow: f_{ij,t}^ω = B_ij (delta_i - delta_j)
                for (i, j) in L:
                    m.addConstr(
                        self.f[i, j, t, omega]
                        == d.B[(i, j)] * (self.delta[i, t, omega] - self.delta[j, t, omega]),
                        name=f"DC_{i}_{j}_{t}_{omega}"
                    )

                # 2.d Line capacity with time-dependent availability:
                # - (P_ij + ΔP_ij x_t) <= f_ij <= P_ij + ΔP_ij x_t
                for (i, j) in L:
                    cap = d.Pmax[(i, j)] + d.DeltaP[(i, j)] * self.x[t]
                    m.addConstr(
                        self.f[i, j, t, omega] <= cap,
                        name=f"CapUp_{i}_{j}_{t}_{omega}"
                    )
                    m.addConstr(
                        self.f[i, j, t, omega] >= -cap,
                        name=f"CapLo_{i}_{j}_{t}_{omega}"
                    )

                # 2.e Reference angle: δ_{1,t}^ω = 0
                m.addConstr(
                    self.delta[1, t, omega] == 0.0,
                    name=f"RefAngle_{t}_{omega}"
                )

        self.model = m

    def postprocess(self):
        if self.model.status != GRB.OPTIMAL:
            self.results = {"status": self.model.status}
            return

        d = self.data
        N = d.nodes
        L = d.lines
        T = d.periods
        Omega = d.scenarios

        # First-stage solutions
        y_sol = {t: self.y[t].X for t in T}
        x_sol = {t: self.x[t].X for t in T}

        # Second-stage solutions
        p_sol = {(i, t, omega): self.p[i, t, omega].X
                 for i in N for t in T for omega in Omega}
        delta_sol = {(i, t, omega): self.delta[i, t, omega].X
                     for i in N for t in T for omega in Omega}
        f_sol = {(i, j, t, omega): self.f[i, j, t, omega].X
                 for (i, j) in L for t in T for omega in Omega}

        # Identify investment period (if any)
        invest_period = None
        for t in sorted(T):
            if y_sol[t] > 0.5:
                invest_period = t
                break

        self.results = {
            "status": self.model.status,
            "obj": self.model.objVal,
            "y": y_sol,
            "x": x_sol,
            "p": p_sol,
            "delta": delta_sol,
            "f": f_sol,
            "invest_period": invest_period,
        }


Plot model 4

In [21]:
# =====================
# PLOT MODEL 4
# =====================

def plot_dispatch_model4(data: NetworkData, results: dict, t_view, omega_view):
    """
    Plot per Model 4 (stile coerente con Model 1–2–3):
    - nodi con G, D (scenario-dependent)
    - linee con spessore proporzionale a |flow|
    - testo vicino a ciascuna linea con il valore del flusso [MW]
    """
    from math import fabs
    fig = go.Figure()

    if results.get("status") != GRB.OPTIMAL:
        fig.add_annotation(text="No optimal solution", x=0.5, y=0.5, showarrow=False)
        fig.update_xaxes(visible=False)
        fig.update_yaxes(visible=False)
        fig.update_layout(height=400, margin=dict(l=10, r=10, t=30, b=10))
        return fig

    N = data.nodes
    L = data.lines
    pos = data.node_pos

    p_sol = results["p"]
    f_sol = results["f"]
    x_sol = results["x"]

    # --- Nodes ---
    xs, ys, texts, labels = [], [], [], []
    for i in N:
        x_i, y_i = pos[i]
        xs.append(x_i)
        ys.append(y_i)
        labels.append(f"N{i}")

        p_itw = p_sol[(i, t_view, omega_view)]
        d_itw = data.d_omega[(i, t_view, omega_view)]

        txt = (
            f"N{i}<br>"
            f"G: {p_itw:.1f} MW<br>"
            f"D: {d_itw:.1f} MW"
        )
        texts.append(txt)

    # --- Line flows in selected (t, ω) ---
    flows_t = {(i, j): f_sol[(i, j, t_view, omega_view)] for (i, j) in L}
    max_flow = max(fabs(v) for v in flows_t.values()) if flows_t else 1.0

    for (i, j), f_ij in flows_t.items():
        x0, y0 = pos[i]
        x1, y1 = pos[j]
        width = 2 + 8 * (abs(f_ij) / max_flow if max_flow > 1e-6 else 0.0)

        # linea
        fig.add_trace(go.Scatter(
            x=[x0, x1],
            y=[y0, y1],
            mode="lines",
            line=dict(width=width),
            hoverinfo="text",
            hovertext=f"Line {i}-{j}: {f_ij:.1f} MW",
            showlegend=False,
        ))

        # testo vicino alla linea (leggermente spostato verso il basso)
        xm = 0.5 * (x0 + x1)
        ym = 0.5 * (y0 + y1) - 0.14
        fig.add_trace(go.Scatter(
            x=[xm],
            y=[ym],
            mode="text",
            text=[f"{f_ij:.1f} MW"],
            hoverinfo="skip",
            showlegend=False,
        ))

    # --- Nodes as markers + labels ---
    fig.add_trace(go.Scatter(
        x=xs,
        y=ys,
        mode="markers+text",
        marker=dict(size=20, line=dict(width=2, color="black")),
        text=labels,
        textposition="top center",
        hoverinfo="text",
        hovertext=texts,
        showlegend=False,
    ))

    invest_active = "active" if x_sol[t_view] > 0.5 else "inactive"

    fig.update_layout(
        title=(f"Model 4 – Dispatch in period t={t_view}, scenario ω={omega_view} "
               f"(reinforcement {invest_active})"),
        xaxis=dict(visible=False),
        yaxis=dict(visible=False),
        height=450,
        margin=dict(l=10, r=10, t=40, b=10),
    )

    return fig


Dashboard model 4

In [None]:
# =====================
# DASHBOARD MODEL 4
# =====================

# Istanza dati per Model 4
data4 = NetworkData()

out_plot4 = Output()
out_text4 = Output()

# ---------------- Scenario probability sliders ----------------
prob_sliders_4 = {}
for omega in data4.scenarios:
    prob_sliders_4[omega] = FloatSlider(
        value=data4.prob[omega],
        min=0.0,
        max=1.0,
        step=0.05,
        description=f"π_{omega}",
        continuous_update=False,
        layout=Layout(width="30%"),
    )

prob_box_4 = HBox(list(prob_sliders_4.values()))

# ---------------- Line capacity & investment sliders ----------------
line_sliders_4 = {}
for (i, j) in data4.lines:
    key = f"Pmax_{i}{j}"
    line_sliders_4[key] = FloatSlider(
        value=data4.Pmax[(i, j)],
        min=50.0,
        max=600.0,
        step=10.0,
        description=f"Pmax {i}-{j}",
        continuous_update=False,
        layout=Layout(width="95%"),
    )

DeltaP_slider_4 = FloatSlider(
    value=data4.DeltaP[(1, 2)],
    min=0.0,
    max=400.0,
    step=10.0,
    description="ΔP 1-2",
    continuous_update=False,
    layout=Layout(width="95%"),
)

Cinv_slider_4 = FloatSlider(
    value=data4.C_inv,
    min=0.0,
    max=50_000_000.0,
    step=1_000_000.0,
    description="C_inv (annual)",
    continuous_update=False,
    layout=Layout(width="95%"),
)

lines_box_4 = VBox(list(line_sliders_4.values()) + [DeltaP_slider_4, Cinv_slider_4])

# ---------------- Period & scenario selector for visualization ----------------
period_view_4 = Dropdown(
    options=data4.periods,
    value=data4.periods[0],
    description="View t",
    layout=Layout(width="30%"),
)

scenario_view_4 = Dropdown(
    options=data4.scenarios,
    value=data4.scenarios[0],
    description="View ω",
    layout=Layout(width="30%"),
)

view_box_4 = HBox([period_view_4, scenario_view_4])

# ---------------- Overall controls layout ----------------
controls_box_4 = VBox([
    Label("Model 4 – Scenario probabilities & investment parameters"),
    prob_box_4,
    lines_box_4,
    view_box_4,
])


def run_model4_and_update(change=None):
    # 1) Update scenario probabilities (normalized)
    raw = {omega: prob_sliders_4[omega].value for omega in data4.scenarios}
    total = sum(raw.values())
    if total <= 0:
        # Fallback: equal probabilities
        for omega in data4.scenarios:
            data4.prob[omega] = 1.0 / len(data4.scenarios)
    else:
        for omega in data4.scenarios:
            data4.prob[omega] = raw[omega] / total

    # 2) Update line capacities and investment cost
    for (i, j) in data4.lines:
        key = f"Pmax_{i}{j}"
        data4.Pmax[(i, j)] = line_sliders_4[key].value

    data4.DeltaP[(1, 2)] = DeltaP_slider_4.value
    data4.C_inv = Cinv_slider_4.value

    # 3) Solve Model 4
    model4 = Model4(data4)
    res4 = model4.run()

    t_sel = period_view_4.value
    omega_sel = scenario_view_4.value

    # 4) Update plot
    with out_plot4:
        clear_output(wait=True)
        fig = plot_dispatch_model4(data4, res4, t_sel, omega_sel)
        fig.show()

    # 5) Update textual output
    with out_text4:
        clear_output(wait=True)
        if res4.get("status") == GRB.OPTIMAL:
            print(f"Stochastic objective (investment + expected operating cost): {res4['obj']:.2f} €")
            print(f"Investment period (if any): {res4['invest_period']}")
            print("\nInvestment decisions y_t and x_t:")
            for t in data4.periods:
                print(f"  t={t}: y_t = {res4['y'][t]:.0f}, x_t = {res4['x'][t]:.0f}")

            print("\nScenario probabilities (normalized):")
            for omega in data4.scenarios:
                print(f"  π_{omega} = {data4.prob[omega]:.3f}")

            print(f"\nDispatch in viewed (t={t_sel}, ω={omega_sel}):")
            for i in data4.nodes:
                p_itw = res4["p"][(i, t_sel, omega_sel)]
                d_itw = data4.d_omega[(i, t_sel, omega_sel)]
                print(f"  Node {i}: p={p_itw:.1f} MW, d={d_itw:.1f} MW")

            print(f"\nLine flows in (t={t_sel}, ω={omega_sel}):")
            for (i, j) in data4.lines:
                f_ij = res4["f"][(i, j, t_sel, omega_sel)]
                print(f"  {i}-{j}: {f_ij:.1f} MW")
        else:
            print("Model 4 not optimal. Status:", res4.get("status"))


# Attacca i callback
widgets_to_link_4 = list(prob_sliders_4.values())
widgets_to_link_4.extend(list(line_sliders_4.values()))
widgets_to_link_4.extend([DeltaP_slider_4, Cinv_slider_4, period_view_4, scenario_view_4])

for w in widgets_to_link_4:
    w.observe(run_model4_and_update, names="value")

# Primo run
run_model4_and_update()

# Mostra dashboard
display(controls_box_4)
display(out_plot4)
display(out_text4)


VBox(children=(Label(value='Model 4 – Scenario probabilities & investment parameters'), HBox(children=(FloatSl…

Output()

Output()

Model 4 Trheshold analytics

In [23]:
# ============================
# MODEL 4 THRESHOLD ANALYTICS
# ============================

def invests_model4_in_period(target_period: int):
    """
    Ritorna una funzione che verifica se Model 4 investe
    nel periodo target_period (cioè invest_period == target_period).
    """
    def indicator(results: dict) -> bool:
        if results.get("status") != GRB.OPTIMAL:
            return False
        return results.get("invest_period") == target_period
    return indicator


def scan_load_threshold_model4_in_period(
    base_data: NetworkData,
    node: int,
    period: int,
    scenario: str,
    load_values,
    target_period: int,
):
    """
    Usa la funzione generica parameter_scan per Model 4,
    variando la domanda d_omega[(node, period, scenario)].

    base_data : istanza di NetworkData usata come base (es. data4)
    node      : nodo su cui perturbare la domanda (1, 2 o 3)
    period    : periodo t in cui perturbare la domanda
    scenario  : scenario ω in cui perturbare la domanda ("high","base","low")
    load_values : lista/array di valori [MW] da testare
    target_period : periodo in cui vogliamo che avvenga l'investimento (1,2,3)
    """
    def update_fn(d: NetworkData, val):
        # Modifichiamo solo la domanda nello scenario scelto
        d.d_omega[(node, period, scenario)] = val

    invest_indicator_fn = invests_model4_in_period(target_period)

    return parameter_scan(
        model_cls=Model4,
        invest_indicator_fn=invest_indicator_fn,
        base_data=base_data,
        update_fn=update_fn,
        values=load_values,
    )


In [24]:
# ============================
# THRESHOLD PRINT – MODEL 4
# ============================

# Usiamo come base la stessa istanza dati della dashboard
data_M4 = data4  # oppure NetworkData() se vuoi partire da zero

# Range di valori di domanda al nodo 2 nello scenario "base"
load_values_M4_t1 = list(np.linspace(0, 2000, 80))
load_values_M4_t2 = list(np.linspace(0, 2000, 80))
load_values_M4_t3 = list(np.linspace(0, 2000, 80))

# ---- Periodo 1 ----
scan_M4_t1 = scan_load_threshold_model4_in_period(
    base_data=data_M4,
    node=2,
    period=1,           # perturbiamo d_omega(2,1,"base")
    scenario="base",
    load_values=load_values_M4_t1,
    target_period=1,    # vogliamo che l'investimento avvenga in t=1
)

threshold_M4_t1 = None
for r in scan_M4_t1:
    if r["invests"]:
        threshold_M4_t1 = r["value"]
        break

print("Model 4 – first load at node 2 in period 1 (scenario base) ",
      "where investment occurs in t=1:", threshold_M4_t1)

fig_M4_t1 = plot_investment_scan(
    scan_M4_t1,
    xlabel="Demand at node 2 in period 1 (scenario base) [MW]",
    title="Model 4 – Investment in t=1 vs demand at node 2 (t=1, ω=base)",
)
fig_M4_t1.show()


# ---- Periodo 2 ----
scan_M4_t2 = scan_load_threshold_model4_in_period(
    base_data=data_M4,
    node=2,
    period=2,
    scenario="base",
    load_values=load_values_M4_t2,
    target_period=2,
)

threshold_M4_t2 = None
for r in scan_M4_t2:
    if r["invests"]:
        threshold_M4_t2 = r["value"]
        break

print("Model 4 – first load at node 2 in period 2 (scenario base) ",
      "where investment occurs in t=2:", threshold_M4_t2)

fig_M4_t2 = plot_investment_scan(
    scan_M4_t2,
    xlabel="Demand at node 2 in period 2 (scenario base) [MW]",
    title="Model 4 – Investment in t=2 vs demand at node 2 (t=2, ω=base)",
)
fig_M4_t2.show()


# ---- Periodo 3 ----
scan_M4_t3 = scan_load_threshold_model4_in_period(
    base_data=data_M4,
    node=2,
    period=3,
    scenario="base",
    load_values=load_values_M4_t3,
    target_period=3,
)

threshold_M4_t3 = None
for r in scan_M4_t3:
    if r["invests"]:
        threshold_M4_t3 = r["value"]
        break

print("Model 4 – first load at node 2 in period 3 (scenario base) ",
      "where investment occurs in t=3:", threshold_M4_t3)

fig_M4_t3 = plot_investment_scan(
    scan_M4_t3,
    xlabel="Demand at node 2 in period 3 (scenario base) [MW]",
    title="Model 4 – Investment in t=3 vs demand at node 2 (t=3, ω=base)",
)
fig_M4_t3.show()


Model 4 – first load at node 2 in period 1 (scenario base)  where investment occurs in t=1: 0.0


Model 4 – first load at node 2 in period 2 (scenario base)  where investment occurs in t=2: None


Model 4 – first load at node 2 in period 3 (scenario base)  where investment occurs in t=3: None
