# PyPSA to SMS++: build a PyPSA model and solve it as an SMS++ UCBlock (dispatch)

This notebook aims to showcase the interface between PyPSA and SMS++. It demonstrates how to convert a PyPSA model into a UCBlock problem that can be solved using the SMS++ solver. The notebook also validates the objective function of the SMS++ solver and compares it to the results obtained from SMS++.

This notebook executes the following procedure:
1. Create and optimize a pypsa model
2. Converts the PyPSA model into a UCBlock problem to be read by SMS++
3. Launches the optimization using the UCBlock solver of SMS++
4. Validate the objective function of SMS++ and compares it to SMS++

## 1. Creation of the PyPSA model

We create a simple PyPSA model arbitrary number of buses, few generators, storages and loads.

Main assumptions are:
- The network is purely radial in the form bus1 -> bus2 -> bus3 -> ... busN
- A load is added to each bus of the network
- The following technologies are supported:
  - fuel-fired diesel generator
  - pv generator
  - wind generator
  - battery
  - hydro unit

The following notebook, performs the following SMS++ to PyPSA conversion:

|Physical object|PyPSA object|SMS++ object|
|----------------|------------|------------|
|diesel generator|Generator|ThermalUnitBlock|
|pv generator|Generator|IntermittentUnitBlock|
|wind generator|Generator|IntermittentUnitBlock|
|battery|StorageUnit|BatteryUnitBlock|
|hydro unit|StorageUnit|HydroUnitBlock|

In [None]:
n_snapshots = 1*24  # number of snapshots of the model

buses_demand = [0]  # list of buses where demand is located
bus_PV = None # Bus where PV is located; if none, no PV is considered
bus_wind = None # Bus where wind is located; if none, no wind is considered
bus_storage = None # Bus where storage is located; if none, no storage is considered
bus_hydro = 0 # Bus where hydro is located; if none, no hydro is considered
bus_diesel = 0 # Bus where diesel is located; if none, no pv is considered


#### Preliminary imports

In [None]:
folder_builder = "../data/SMSpp/UCBlockSolver"
path_SMSpp_pypsa = folder_builder + "/pypsa2SMSpp.nc4"

renewable_carriers = ["pv", "wind"]

In [None]:
import pypsa
from helpers import build_microgrid_model
import netCDF4 as nc
import pandas as pd
import numpy as np
import re

NC_DOUBLE = "f8"
NP_DOUBLE = np.float64
NC_UINT = "u4"
NP_UINT = np.uint32

#### The following code creates the desired pypsa model.

In [None]:
n = build_microgrid_model(
    n_snapshots = n_snapshots,
    buses_demand = buses_demand,
    bus_PV = bus_PV,
    bus_wind = bus_wind,
    bus_storage = bus_storage,
    bus_diesel = bus_diesel,
    bus_hydro=bus_hydro,
    x = 10.389754,
    y = 43.720810,
    hydro_factor=0.4,
)

# n.storage_units.capital_cost *= 2
# n.storage_units.p_min_pu = 0.
# n.storage_units.p_nom_max = 10
# n.storage_units.max_hours = 1

# n.snapshot_weightings["stores"] = 1.0

# n.generators.p_nom = 100

In [None]:
n.optimize(solver_name="gurobi")

#### Analyze the results

In [None]:
# Auxiliary function
def get_bus_idx(n, bus_series, dtype="uint32"):
    """
    Returns the numeric index of the bus in the network n for each element of the bus_series.
    """
    return bus_series.map(n.buses.index.get_loc).astype(dtype)

Calculate the marginal costs. Note: as we are running a UCBlock, only marginal costs can be compared.

In [None]:
marg_cost = pd.concat(
    [
        n.generators_t.p.mul(n.snapshot_weightings.objective, axis=0).sum().mul(n.generators.marginal_cost),
        n.storage_units_t.p.mul(n.snapshot_weightings.objective, axis=0).sum().mul(n.storage_units.marginal_cost),
        n.links_t.p0.mul(n.snapshot_weightings.objective, axis=0).sum().mul(n.links.marginal_cost),
        n.stores_t.p.mul(n.snapshot_weightings.objective, axis=0).sum().mul(n.stores.marginal_cost),
    ],
)
marg_cost

Calculate the capital costs

In [None]:
cap_cost = pd.concat(
    [
        n.generators.eval("p_nom_opt * capital_cost"),
        n.storage_units.eval("p_nom_opt * capital_cost"),
        n.links.eval("p_nom_opt * capital_cost"),
        n.lines.eval("s_nom_opt * capital_cost"),
        n.stores.eval("e_nom_opt * capital_cost"),
    ]
)
cap_cost

## 2. Convert the PyPSA model into a UCBlock problem

The following code converts the pypsa model into a UCBlock problem.
The conversions adopts the covnersion matrix adopted in the example above.

### 2.1 Create the SMS++ problem file

In [None]:
ds = nc.Dataset(path_SMSpp_pypsa, "w")


ds.setncattr("SMS++_file_type", 1)  # Set file type to 1 for problem file

### 2.2 Creates the Inner UCBlock

In [None]:
# Create UCBlock

master = ds.createGroup("Block_0")  # Create the first main block

master.type = "UCBlock"  # mandatory attribute for all blocks

n_timesteps = len(n.snapshots)

master.createDimension("TimeHorizon", n_timesteps)  # Time horizon

n_units = len(n.generators) + len(n.storage_units)  # excluding links for now

master.createDimension("NumberUnits", n_units)  # Number of nodes
master.createDimension("NumberElectricalGenerators", n_units)  # Number of elec. units
# some unitblocks, do have multiple units inside [e.g. cascading hydro or CCGT], not considered now

Add the Network Data to the UCBlock

In [None]:
ndg = master.createGroup("NetworkData")
ndg.createDimension("NumberNodes", len(n.buses))  # Number of nodes


if len(n.buses) > 1: # when there are multiple buses, we need to integrate the network
    master.createDimension("NumberLines", len(n.lines))  # Number of lines

    # generators' node
    all_generators = [bus_PV, bus_wind, bus_storage, bus_hydro, bus_diesel]
    all_generators = [x for x in all_generators if x is not None]
    generator_node = ndg.createVariable("GeneratorNode", NC_UINT, ("NumberElectricalGenerators",))
    generator_node[:] = np.array(all_generators, dtype=NP_UINT)

    # start lines
    start_line = ndg.createVariable("StartLine", NC_UINT, ("NumberLines",))
    start_line[:] = get_bus_idx(n, n.lines.bus0).values
    # end lines
    end_line = ndg.createVariable("EndLine", NC_UINT, ("NumberLines",))
    end_line[:] = get_bus_idx(n, n.lines.bus1).values
    # Min power flow
    min_power_flow = ndg.createVariable("MinPowerFlow", NC_DOUBLE, ("NumberLines",))
    min_power_flow[:] = - n.lines.s_nom.values
    # Max power flow
    max_power_flow = ndg.createVariable("MaxPowerFlow", NC_DOUBLE, ("NumberLines",))
    max_power_flow[:] = n.lines.s_nom.values
    # Susceptance
    susceptance = ndg.createVariable("Susceptance", NC_DOUBLE, ("NumberLines",))
    susceptance[:] = 1 / n.lines.x.values

Add demand to the UCBlock

In [None]:
### Create variables

# demand by node
loads_t = n.loads_t.p_set
active_demand = master.createVariable("ActivePowerDemand", NC_DOUBLE, ("TimeHorizon",)) #("NumberNodes", "TimeHorizon"))
active_demand[:] = loads_t.values.transpose()  # indexing between python and SMS++ is different: transpose

### 2.3 Add the ThermalUnitBlock for each fuel-fired generator

In [None]:
id_thermal = 0

# select thermal generators
thermal_generators = n.generators[~n.generators.index.isin(renewable_carriers)]
n_thermal = len(thermal_generators)

min_power_pypsa = thermal_generators.eval("p_nom_opt * p_min_pu")
max_power_pypsa = thermal_generators.eval("p_nom_opt * p_max_pu")
linear_term_pypsa = thermal_generators.marginal_cost

for (idx_name, row) in thermal_generators.iterrows():

    tub = master.createGroup(f"UnitBlock_{id_thermal}")
    tub.id = str(id_thermal)
    tub.type = "ThermalUnitBlock"

    # Create variables

    # MinPower
    min_power = tub.createVariable("MinPower", NC_DOUBLE) #, ("TimeHorizon",))
    min_power[:] = min_power_pypsa.loc[idx_name]

    # MaxPower
    max_power = tub.createVariable("MaxPower", NC_DOUBLE) #, ("TimeHorizon",))
    max_power[:] = max_power_pypsa.loc[idx_name]

    # StartUpCost
    start_up_cost = tub.createVariable("StartUpCost", NC_DOUBLE)
    start_up_cost[:] = 0.0

    # LinearTerm
    linear_term = tub.createVariable("LinearTerm", NC_DOUBLE, ("TimeHorizon",))
    linear_term[:] = linear_term_pypsa.loc[idx_name] * n.snapshot_weightings.objective.values

    # ConstantTerm
    constant_term = tub.createVariable("ConstantTerm", NC_DOUBLE)
    constant_term[:] = 0.0

    # MinUpTime
    min_up_time = tub.createVariable("MinUpTime", NC_DOUBLE)
    min_up_time[:] = 0.0

    # MinDownTime
    min_down_time = tub.createVariable("MinDownTime", NC_DOUBLE)
    min_down_time[:] = 0.0

    # InitialPower
    initial_power = tub.createVariable("InitialPower", NC_DOUBLE)
    initial_power[:] = n.loads_t.p_set.iloc[0, id_thermal]

    # InitUpDownTime
    init_up_down_time = tub.createVariable("InitUpDownTime", NC_DOUBLE)
    init_up_down_time[:] = 1.0

    # InertiaCommitment
    inertia_commitment = tub.createVariable("InertiaCommitment", NC_DOUBLE)
    inertia_commitment[:] = 1.0

    id_thermal += 1

### 2.4 Add an IntermittentUnitBlock for each renewable generator

In [None]:
renewable_generators = n.generators[n.generators.index.isin(renewable_carriers)]

id_ren = id_thermal

if not renewable_generators.empty:
    for (idx_name, row) in renewable_generators.iterrows():

        tiub = master.createGroup(f"UnitBlock_{id_ren}")
        tiub.id = str(id_ren)
        tiub.type = "IntermittentUnitBlock"

        n_max_power = n.generators_t.p_max_pu.loc[:, idx_name] * row.p_nom_opt
        

        # max power
        max_power = tiub.createVariable("MaxPower", NC_DOUBLE, ("TimeHorizon",))
        max_power[:] = n_max_power

        # marginal cost
        linear_term = tiub.createVariable("ActivePowerCost", NC_DOUBLE, ("TimeHorizon",))
        linear_term[:] = row.marginal_cost * n.snapshot_weightings.objective.values

        # # max capacity
        # max_capacity = tiub.createVariable("MaxCapacity", NC_DOUBLE)
        # max_capacity[:] = row[1].p_nom_max 

        id_ren += 1

### 2.5 Add a BatteryUnitBlock for each battery

In [None]:
hydro_systems_i = n.storage_units_t.inflow.columns  # index of hydro systems (storage units with inflow)
batteries_i = n.storage_units.index.difference(hydro_systems_i)  # index of batteries (storage units without inflow)

hydro_systems = n.storage_units.loc[hydro_systems_i]
batteries = n.storage_units.loc[batteries_i]

id_batt = id_ren

if not batteries.empty:
    for (idx_name, row) in batteries.iterrows():

        tiub = master.createGroup(f"UnitBlock_{id_batt}")
        tiub.id = str(id_batt)
        tiub.type = "BatteryUnitBlock"
        

        # max power
        max_power = tiub.createVariable("MaxPower", NC_DOUBLE)
        max_power[:] = row.p_nom_opt * row.p_max_pu
        
        # min power
        min_power = tiub.createVariable("MinPower", NC_DOUBLE)
        min_power[:] = row.p_nom_opt * row.p_min_pu

        # max energy
        max_storage = tiub.createVariable("MaxStorage", NC_DOUBLE)
        max_storage[:] = row.p_nom_opt * row.max_hours

        # max energy
        min_storage = tiub.createVariable("MinStorage", NC_DOUBLE)
        min_storage[:] = 0.0

        # Initial storage
        initial_storage = tiub.createVariable("InitialStorage", NC_DOUBLE)
        initial_storage[:] = row.state_of_charge_initial * row.p_nom_opt * row.max_hours

        # Storing battery efficiency
        storing_efficiency = tiub.createVariable("StoringBatteryRho", NC_DOUBLE)
        storing_efficiency[:] = row.efficiency_store

        # Discharge battery efficiency
        storing_efficiency = tiub.createVariable("ExtractingBatteryRho", NC_DOUBLE)
        storing_efficiency[:] = row.efficiency_dispatch

        # # max capacity
        # max_capacity = tiub.createVariable("MaxCapacity", NC_DOUBLE)
        # max_capacity[:] = row[1].p_nom_max 

        id_batt += 1

### 2.6 Add a HydroUnitBlock for each hydro unit

In [None]:
id_hydro = id_batt

energy_factor = 8760/n_snapshots

if not hydro_systems.empty:
    for (idx_name, row) in hydro_systems.iterrows():

        tiub = master.createGroup(f"UnitBlock_{id_hydro}")
        tiub.id = str(id_hydro)
        tiub.type = "HydroUnitBlock"

        tiub.createDimension("NumberReservoirs", 1)  # optional, the number of reservoirs
        N_ARCS = 2
        tiub.createDimension("NumberArcs", N_ARCS)  # optional, the number of arcs connecting the reservoirs
        # No NumberIntervals
        
        MAX_FLOW = 100*n.storage_units_t.inflow.loc[:, idx_name].max()
        P_MAX = row.p_nom_opt * row.p_max_pu
        P_MIN = row.p_nom_opt * row.p_min_pu

        # StartArc
        start_arc = tiub.createVariable("StartArc", NC_UINT, ("NumberArcs",))
        start_arc[:] = np.full((N_ARCS,), 0, dtype=NP_UINT)

        # EndArc
        end_arc = tiub.createVariable("EndArc", NC_UINT, ("NumberArcs",))
        end_arc[:] = np.full((N_ARCS,), 1, dtype=NP_UINT)

        # MaxPower
        max_power = tiub.createVariable("MaxPower", NC_DOUBLE, ("NumberArcs",)) #, ("NumberArcs",)) #, ("TimeHorizon",)) #"NumberArcs"))
        max_power[:] = np.array([P_MAX, 0.], dtype=NP_DOUBLE)

        # MinPower
        min_power = tiub.createVariable("MinPower", NC_DOUBLE, ("NumberArcs",)) #, ("NumberArcs",)) #, ("TimeHorizon",)) #"NumberArcs"))
        min_power[:] = np.array([0., P_MIN], dtype=NP_DOUBLE)

        # MinFlow
        min_flow = tiub.createVariable("MinFlow", NC_DOUBLE, ("NumberArcs",)) #, ("TimeHorizon",))
        min_flow[:] = np.array([0., -MAX_FLOW], dtype=NP_DOUBLE)

        # MaxFlow
        max_flow = tiub.createVariable("MaxFlow", NC_DOUBLE, ("NumberArcs",)) #, ("TimeHorizon",))
        max_flow[:] = np.array([P_MAX * 100., 0.], dtype=NP_DOUBLE)
        
        # MinVolumetric
        min_volumetric = tiub.createVariable("MinVolumetric", NC_DOUBLE) #, ("TimeHorizon",))
        min_volumetric[:] = 0.0

        # MaxVolumetric
        max_volumetric = tiub.createVariable("MaxVolumetric", NC_DOUBLE)
        max_volumetric[:] = row.p_nom_opt * row.max_hours

        # ActivePowerCost
        active_power_cost = tiub.createVariable("ActivePowerCost", NC_DOUBLE)
        active_power_cost[:] = row.marginal_cost * n.snapshot_weightings.objective.iloc[0]

        
        # Inflows
        inflows = tiub.createVariable("Inflows", NC_DOUBLE, ("NumberReservoirs", "TimeHorizon")) #,"NumberReservoirs",))  #"NumberReservoirs", 
        inflows[:] = np.array([n.storage_units_t.inflow.loc[:, idx_name]])

        # InitialVolumetric
        initial_volumetric = tiub.createVariable("InitialVolumetric", NC_DOUBLE) #, ("NumberReservoirs",))
        initial_volumetric[:] = row.state_of_charge_initial * row.max_hours * row.p_nom_opt

        # NumberPieces
        pieces = np.full((N_ARCS,), 1, dtype=NP_UINT)
        number_pieces = tiub.createVariable("NumberPieces", NC_UINT, ("NumberArcs",))
        number_pieces[:] = pieces

        # TotalNumberPieces
        tiub.createDimension("TotalNumberPieces", pieces.sum())

        # LinearTerm
        linear_term = tiub.createVariable("LinearTerm", NC_DOUBLE, ("TotalNumberPieces",))
        # linear_term[:] = np.array([1/n.storage_units.loc[idx_name, "efficiency_dispatch"], 0., n.storage_units.loc[idx_name, "efficiency_store"]], dtype=NP_DOUBLE)
        linear_term[:] = np.array([n.storage_units.loc[idx_name, "efficiency_dispatch"], 1/n.storage_units.loc[idx_name, "efficiency_store"]], dtype=NP_DOUBLE)

        # ConstTerm
        const_term = tiub.createVariable("ConstantTerm", NC_DOUBLE, ("TotalNumberPieces",))
        const_term[:] = np.full((N_ARCS,), 0.0, dtype=NP_DOUBLE)

Finally close the netcdf problem file

In [None]:
ds.close()

## 3. Execute UCBlockSolver

In [None]:
import subprocess
import os

PROJECT_PATH = "../../smspp-project"   # Path of the SMS++ project
COMPILE_MODE = "Release"              # Compilation mode (Debug/Release)

DATA_FOLDER = folder_builder  # Folder where all data inputs are contained (cwd will be moved here)
BSC_NAME = "uc_solverconfig.txt"          # Name of the file describing the BlockSolverConfig
UCFILE_NAME = "pypsa2smspp.nc4"      # Name of the UC-file to test

PARENT_ABSPATH_UCS = os.path.abspath(PROJECT_PATH + "/build/tools/ucblock_solver/" + COMPILE_MODE)
UCS_ABSPATH = os.path.abspath(PARENT_ABSPATH_UCS + "/ucblock_solver.exe")

result = subprocess.run(
    [UCS_ABSPATH, UCFILE_NAME, "-S", BSC_NAME],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    cwd=DATA_FOLDER,
)
result_ascii = result.stdout.decode('ascii')
print(result_ascii)

## 4. Validate the objective function of SMS++ and compare it to SMS++

We read the output of the ucblock_solver and compare it to the marginal costs obtained from PyPSA

In [None]:
res = re.search("Upper bound = (.*)\n", result_ascii)
SMSpp_obj = float(res.group(1).replace("\r", ""))
print("SMS++ obj         : %.6f" % SMSpp_obj)
print("PyPSA dispatch obj: %.6f" % marg_cost.sum())
print("Error SMS++ - PyPSA dispatch [%%]: %.5f" % (100*(SMSpp_obj - marg_cost.sum())/marg_cost.sum()))