In [1]:
# import pandapower as pp
# import pandapower.networks as pn
# from pandapower import runpp, runopp

import numpy as np
import pandas as pd
import pyomo.environ as pyo
import copy

import pypower as pp
from pypower.api import runopf, ppoption
ppopt = ppoption(VERBOSE=0, OUT_ALL=0, OPT={'OPF_ALG': 560})

from pypower import case9, case14, case30, case57, case118, case300
cases = {
    "case9": case9,
    "case14": case14,
    "case30": case30,
    "case57": case57,
    "case118": case118,
    "case300": case300,
}

In [14]:
case_name = "case14"

# Get case data
case = cases[case_name].__dict__[case_name]()

# Solve original case
results = runopf(case, ppopt)
cost = results["f"]
print(cost)

8081.526392989466


In [18]:
case["bus"][:, 2:4]

array([[ 0. ,  0. ],
       [21.7, 12.7],
       [94.2, 19. ],
       [47.8, -3.9],
       [ 7.6,  1.6],
       [11.2,  7.5],
       [ 0. ,  0. ],
       [ 0. ,  0. ],
       [29.5, 16.6],
       [ 9. ,  5.8],
       [ 3.5,  1.8],
       [ 6.1,  1.6],
       [13.5,  5.8],
       [14.9,  5. ]])

In [8]:
def run_opf(case, Pd, Qd):

    # Get load bus indices
    inds = np.arange(case["bus"].shape[0])
    load_buses_idx = list(inds[(case["bus"][:,2] != 0) | (case["bus"][:,3] != 0)].astype(object))
    
    # Update case load vector
    case["bus"][load_buses_idx, 2] = Pd
    case["bus"][load_buses_idx,3] = Qd
    
    # Solve case
    results = runopf(case, ppopt)
    if results["success"]:
        cost = results["f"]
    else:
        cost = np.nan

    return cost

In [9]:
results['success']

True

In [None]:
class Component: # Basic component in power system optimization model

    def __init__(self, **kwargs):
        self.baseMVA = 100
        pass

    def create_parameters(self):
        pass

    def update_timeseries_parameters(self, date):
        pass

    def create_variables(self):
        pass

    def create_expressions(self):
        pass

In [None]:
class Bus(Component):

    def __init__(self, name: str, data: pd.Series, load_data: pd.Series, shunt_data: pd.Series):
        super().__init__()

        self.name = name
        self.type = data["type"]
        self.vbase = data["vn_kv"]
        self.vmax = data["max_vm_pu"]
        self.vmin = data["min_vm_pu"]
        self.pD = load_data["p_mw"] / self.baseMVA
        self.qD = load_data["q_mvar"] / self.baseMVA
        self.qShunt = shunt_data["q_mvar"] / self.baseMVA
        self.slack_cost = None # Generation cost for external grid (for slack bus only)

        self.generators = []
        self.in_lines = []
        self.out_lines = []
        
    @classmethod
    def from_series(cls, data: pd.Series, load_data: pd.Series, shunt_data: pd.Series):
        return cls(data.name, data, load_data, shunt_data)

class Generator(Component):

    def __init__(self, name: str, data: pd.Series, cost_data: pd.Series):
        super().__init__()

        self.name = name
        self.bus_ID = data["bus"]
        self.pmin = data["min_p_mw"] / self.baseMVA
        self.pmax = data["max_p_mw"] / self.baseMVA
        self.qmin = data["min_q_mvar"] / self.baseMVA
        self.qmax = data["max_q_mvar"] / self.baseMVA
        self.cost = cost_data[["cp0_eur", "cp1_eur_per_mw", "cp2_eur_per_mw2"]].astype(np.float64).values.flatten()
        
        self.bus = None
        
    @classmethod
    def from_series(cls, data: pd.Series, cost_data: pd.Series):
        return cls(data.name, data, cost_data)

    def link_bus(self, buses: dict):
        # Link node to resource
        self.bus = buses[self.bus_ID]
        # Link resource to node
        buses[self.bus_ID].generators.append(self)

class Line(Component):

    def __init__(self, name: str, data: pd.Series):
        super().__init__()

        self.name = name
        self.from_bus_ID = data["from_bus"]
        self.to_bus_ID = data["to_bus"]
        # self.r_ohm = data["length_km"] * data["r_ohm_per_km"]
        # self.x_ohm = data["length_km"] * data["x_ohm_per_km"]
        # self.c_nf = data["length_km"] * data["c_nf_per_km"] # Line shunt admittance
        self.imax = data["max_i_ka"]

        # # Per unit line impedance / admittance values
        # self.z, self.r, self.x, self.y, self.g, self.b = None, None, None, None, None, None

        self.from_bus = None
        self.to_bus = None

    # def calculate_admittance(self):
    #     # Calculate base impedance (ohms)
    #     vbase = self.from_bus.vbase * 1e3 # convert kV to V
    #     sbase = self.baseMVA * 1e6 # convert MVA to VA
    #     zbase = vbase**2 / sbase

    #     assert abs(self.from_bus.vbase - self.to_bus.vbase) < 1e-3, "Voltage base mismatch across line ends."

    #     # Convert r and x to per unit
    #     self.r = self.r_ohm / zbase
    #     self.x = self.x_ohm / zbase
    #     self.z = complex(self.r, self.x) # Impedance

    #     # Calculate y, g and b
    #     self.y = 1 / self.z if abs(self.z) > 0 else 0 # Admittance
    #     self.g, self.b = np.real(self.y), np.imag(self.y)

    @classmethod
    def from_series(cls, data: pd.Series):
        return cls(data.name, data)

    def link_buses(self, buses: dict):
        # Link buses to line
        self.from_bus = buses[self.from_bus_ID]
        self.to_bus = buses[self.to_bus_ID]
        # Link line to nodes
        buses[self.from_bus_ID].out_lines.append(self)
        buses[self.to_bus_ID].in_lines.append(self)
        

In [None]:
class Model:

    def __init__(self, network, settings=None):

        # PandaPower network and model settings
        self.net = network
        self.settings = settings
        self.baseMVA = network.sn_mva
        
        # Power system components
        self.buses = {}
        self.generators = {}
        self.lines = {}

    @property
    def components(self):
        return list(self.buses.values() + self.generators.values() + self.lines.values())

    def create_system(self):

        # Run power flow
        runpp(self.net)

        # Pre-process data
        net = copy.copy(self.net)
        # Add loads to all buses
        net.load = net.load.set_index("bus").reindex(net.bus.index)
        net.load[["p_mw", "q_mvar"]] = net.load[["p_mw", "q_mvar"]].fillna(0)
        # Add shunts to all busees
        net.shunt = net.shunt.set_index("bus").reindex(net.bus.index)
        net.shunt["q_mvar"] = net.shunt["q_mvar"].fillna(0)

        # Get generator / slack bus cost data
        gen_cost = net.poly_cost.loc[net.poly_cost["et"] == "gen"].set_index("element")
        slack_cost = net.poly_cost.loc[net.poly_cost["et"] == "ext_grid"].set_index("element")
        
        # Create buses, generators, and line instances from network data
        for bus in net.bus.index:
            self.buses[bus] = Bus.from_series(net.bus.loc[bus], net.load.loc[bus], net.shunt.loc[bus])
        
        for gen in net.gen.index:
            self.generators[gen] = Generator.from_series(net.gen.loc[gen], net.poly_cost.loc[gen])
            self.generators[gen].link_bus(self.buses) # Link generator to node
            
        for line in net.line.index:
            self.lines[line] = Line.from_series(net.line.loc[line])
            self.lines[line].link_buses(self.buses) # Link line to nodes

        # Create admittance matrix
        Y_int = np.array(net._ppc["internal"]["Ybus"].todense())
        ext_to_int = net._pd2ppc_lookups["bus"]
        Y = np.zeros(Y_int.shape, dtype=np.complex128)
        for i in net.bus.index:
            for j in net.bus.index:
                Y[i,j] = Y_int[ext_to_int[i], ext_to_int[j]]
        self.Y = Y
        
        # Split real and imaginary components
        self.G = np.real(self.Y)
        self.B = np.imag(self.Y)

        # Determine slack bus
        self.slack_bus = self.buses[int(self.net.ext_grid.bus.values[0])]
        self.slack_bus.slack_cost = slack_cost[["cp0_eur", "cp1_eur_per_mw", "cp2_eur_per_mw2"]].values.flatten()

    def build_model(self):
        pass

    def solve_model(self):
        pass


class SDP_AC_OPF(Model):
    pass

In [None]:
class Poly_AC_OPF(Model):

    def __init__(self, network, settings=None):
        super().__init__(network, settings)

        # Pyomo Model
        self.model = None

    def build_model(self):

        # Instantiate model
        model = pyo.ConcreteModel()
        self.model = model
        
        # Create sets
        model.buses = pyo.Set(initialize=list(self.buses.keys()))
        model.generators = pyo.Set(initialize=list(self.generators.keys()))
        model.lines = pyo.Set(initialize=list(self.lines.keys()))
        model.slack = pyo.Set(initialize=[self.slack_bus.name])
        
        # Create parameters
        model.P_load = pyo.Param(model.buses, initialize={i: bus.pD for i, bus in self.buses.items()}, mutable=True)
        model.Q_load = pyo.Param(model.buses, initialize={i: bus.qD for i, bus in self.buses.items()}, mutable=True)
        
        # Create variables
        
        # Bus voltage
        model.V_re = pyo.Var(model.buses, within=pyo.Reals, initialize=1.0)
        model.V_im = pyo.Var(model.buses, within=pyo.Reals, initialize=0.0)
        
        # Generation
        model.P_gen = pyo.Var(model.generators, within=pyo.Reals, initialize=0.5)
        model.Q_gen = pyo.Var(model.generators, within=pyo.Reals, initialize=0.0)

        # External grid (slack bus) generation
        model.P_slack = pyo.Var(model.slack, within=pyo.Reals, initialize=0.5)
        model.Q_slack = pyo.Var(model.slack, within=pyo.Reals, initialize=0.0)

        # Write constraints
        
        # Power flow balance constraints
        def P_balance_rule(m, k):
            bus = self.buses[k]
            P_gen = sum(m.P_gen[g.name] for g in bus.generators)
            if k in model.slack:
                P_gen += m.P_slack[k]
            P_flow = m.V_re[k] * sum(self.G[k,i] * m.V_re[i] - self.B[k,i] * m.V_im[i] for i in m.buses) \
                   + m.V_im[k] * sum(self.B[k,i] * m.V_re[i] + self.G[k,i] * m.V_im[i] for i in m.buses)
            return P_gen == P_flow + m.P_load[k]
        model.P_balance = pyo.Constraint(model.buses, rule=P_balance_rule)
        
        def Q_balance_rule(m, k):
            bus = self.buses[k]
            Q_gen = sum(m.Q_gen[g.name] for g in bus.generators)
            if k in model.slack:
                Q_gen += m.Q_slack[k]
            Q_flow = m.V_re[k] * sum(-self.B[k,i] * m.V_re[i] - self.G[k,i] * m.V_im[i] for i in m.buses) \
                   + m.V_im[k] * sum(self.G[k,i] * m.V_re[i] - self.B[k,i] * m.V_im[i] for i in m.buses)
            return Q_gen == Q_flow + m.Q_load[k]
        model.Q_balance = pyo.Constraint(model.buses, rule=Q_balance_rule)
        
        # Voltage magnitude constraints
        def V_mag_rule(m, k):
            bus = self.buses[k]
            return pyo.inequality(bus.vmin**2, m.V_re[k]**2 + m.V_im[k]**2, bus.vmax**2)
        model.V_mag = pyo.Constraint(model.buses, rule=V_mag_rule)

        # Fix slack bus voltage and voltage angle
        for slack in model.slack:
            model.V_re[slack].fix(1.0)
            model.V_im[slack].fix(0.0)
        
        # Generator limits
        def P_gen_limits_rule(m, g):
            gen = self.generators[g]
            return pyo.inequality(gen.pmin, m.P_gen[g], gen.pmax)
        model.P_gen_limits = pyo.Constraint(model.generators, rule=P_gen_limits_rule)
        
        def Q_gen_limits_rule(m, g):
            gen = self.generators[g]
            return pyo.inequality(gen.qmin, m.Q_gen[g], gen.qmax)
        model.Q_gen_limits = pyo.Constraint(model.generators, rule=Q_gen_limits_rule)

        # Set objective
        def objective_rule(m):
            cost = sum(gen.cost[0] + gen.cost[1] * (gen.baseMVA * m.P_gen[i]) + gen.cost[2] * (gen.baseMVA * m.P_gen[i])**2 for i, gen in self.generators.items())
            slack_bus, slack_bus_ID = self.slack_bus, self.slack_bus.name
            cost += slack_bus.slack_cost[0] + slack_bus.slack_cost[1] * (slack_bus.baseMVA * m.P_slack[slack_bus_ID]) + slack_bus.slack_cost[2] * (slack_bus.baseMVA * m.P_slack[slack_bus_ID])**2
            return cost
        model.obj = pyo.Objective(rule=objective_rule, sense=pyo.minimize)

    def initialize_model(self):

        # Relax voltage constraints
        net.bus["max_vm_pu"] *= 1.03
        net.bus["min_vm_pu"] *= 0.97
        
        # Run power flow
        runopp(self.net, numba=False)
        
        # Extract complex bus voltages
        vm = self.net.res_bus.vm_pu  # per unit voltage magnitude
        va = np.deg2rad(self.net.res_bus.va_degree)  # voltage angle in degrees
        v = vm * np.exp(1j * va)
        v_re, v_im = v.map(np.real), v.map(np.imag)
        
        # Extract complex generator power injections
        pg = self.net.res_gen.p_mw
        qg = self.net.res_gen.q_mvar

        # Initialize values
        for k in self.model.buses:
            self.model.V_re[k].value = v_re.loc[k]
            self.model.V_im[k].value = v_im.loc[k]

        # Initialize values
        for g in self.model.generators:
            self.model.P_gen[g].value = pg.loc[g]
            self.model.Q_gen[g].value = qg.loc[g]

        # Fix slack bus voltage and voltage angle
        for slack in self.model.slack:
            self.model.V_re[slack].fix(1.0)
            self.model.V_im[slack].fix(0.0)
        
    def solve_model(self):
        solver = pyo.SolverFactory('ipopt')
        solver.solve(self.model, tee=True)

    def print_solution(self):
        # Display results
        print("\n=== Generator Dispatch (MW) ===")
        for gen in self.generators.values():
            print(f"Generator {gen.name} at bus {gen.bus.name}: P = {pyo.value(self.model.P_gen[gen.name]) * gen.baseMVA:.2f} MW, Q = {pyo.value(self.model.Q_gen[gen.name]) * gen.baseMVA:.2f} MVar")
        
        print("\n=== Bus Voltages (p.u.) ===")
        for bus in self.buses.values():
            V_re = pyo.value(self.model.V_re[bus.name])
            V_im = pyo.value(self.model.V_im[bus.name])
            V_mag = np.sqrt(V_re**2 + V_im**2)
            V_angle = np.degrees(np.arctan2(V_im, V_re))
            print(f"Bus {bus.name}: |V| = {V_mag:.4f} p.u., angle = {V_angle:.2f}°")

In [None]:
net = pn.case300()

In [None]:
model = Poly_AC_OPF(net) # Initialize model object
model.create_system() # Create power system component objects from pandapower case data
for bus in model.buses.values(): # Update bus voltage parameters
    bus.vmax *= 1.03
    bus.vmin *= 0.97
model.build_model() # Write Pyomo model constraints
model.initialize_model() # Initialize values (set starting point)
model.solve_model()
model.print_solution()

In [None]:
net.gen["vm_pu"].max()

In [None]:
net.bus["max_vm_pu"]

In [None]:
net = pn.case300()
net.gen["vm_pu"] = 1.0
#net.bus["max_vm_pu"] *= 1.03
#net.bus["min_vm_pu"] *= 0.97
runopp(net, numba=False)

In [None]:
# Display results
print("\n=== Generator Dispatch (MW) ===")
for gen in net.res_gen.index:
    print(f"Generator {gen} at bus {net.gen.loc[gen, 'bus']}: P = {net.res_gen.loc[gen, 'p_mw']:.2f} MW, Q = {net.res_gen.loc[gen, 'p_mw']:.2f} MVar")

print("\n=== Bus Voltages (p.u.) ===")
for bus in net.res_bus.index:
    V_mag = net.res_bus.loc[bus, "vm_pu"]
    V_angle = net.res_bus.loc[bus, "va_degree"]
    print(f"Bus {bus}: |V| = {V_mag:.4f} p.u., angle = {V_angle:.2f}°")

In [None]:
net.res_cost

In [None]:
model.model.obj()

In [None]:
import inspect
import pandapower

print(inspect.getsource(pandapower.run.runopp))

In [None]:
import pandapower.optimal_powerflow
print(inspect.getsource(pandapower.optimal_powerflow._optimal_powerflow))