<a href="https://colab.research.google.com/github/DarleneJD/ACOPF/blob/main/13barrasOPF_Reg.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import shutil
import sys
import os.path

if not shutil.which("pyomo"):
    !pip install -q pyomo
    assert(shutil.which("pyomo"))

if not (shutil.which("ipopt") or os.path.isfile("ipopt")):
    if "google.colab" in sys.modules:
        !wget -N -q "https://matematica.unipv.it/gualandi/solvers/ipopt-linux64.zip"
        !unzip -o -q ipopt-linux64
    else:
        try:
            !conda install -c conda-forge ipopt
        except:
            pass

In [2]:
import shutil
import sys
import os.path

if not shutil.which("pyomo"):
    !pip install -q pyomo
    assert(shutil.which("pyomo"))

if not (shutil.which("ipopt") or os.path.isfile("ipopt")):
    if "google.colab" in sys.modules:
        !wget -N -q "https://matematica.unipv.it/gualandi/solvers/ipopt-linux64.zip"
        !unzip -o -q ipopt-linux64
    else:
        try:
            !conda install -c conda-forge ipopt
        except:
            pass

In [3]:
!rm -rf ACOPF
!git clone https://github.com/DarleneJD/ACOPF.git

Cloning into 'ACOPF'...
remote: Enumerating objects: 171, done.[K
remote: Counting objects: 100% (18/18), done.[K
remote: Compressing objects: 100% (18/18), done.[K
remote: Total 171 (delta 10), reused 0 (delta 0), pack-reused 153 (from 1)[K
Receiving objects: 100% (171/171), 1.16 MiB | 10.13 MiB/s, done.
Resolving deltas: 100% (101/101), done.


In [4]:
import math
from pathlib import Path
import pandas as pd
import pyomo.environ as pyo
from collections import defaultdict
# Solver (HiGHS)
from pyomo.environ import *
import pandas as pd
import numpy as np


# Loading and processing data

In [5]:
base_path = "/content/ACOPF"

In [6]:
def normalize_phases(value):
    """
    Converts inputs like:

    1
    1.0
    '1,2,3'
    '1.0,2.0,3.0'
    '1.2.3'
    into a list of integers [1], [1,2,3]
    """
    if pd.isna(value):
        return []

    s = str(value).strip()

    # unifies separators
    s = s.replace(".", ",")
    parts = [p.strip() for p in s.split(",") if p.strip() != ""]

    phases = []
    for p in parts:
        try:
            ph = int(float(p))
            if ph in [1, 2, 3]:
                phases.append(ph)
        except ValueError:
            pass

    return sorted(set(phases))




In [None]:
branches_path = f"{base_path}/branches_1.xlsx"
buses_path = f"{base_path}/buses_1.xlsx"

# MT - Medium Voltage
df_branches_mt = pd.read_excel(branches_path, sheet_name="MT")
df_buses_mt = pd.read_excel(buses_path, sheet_name="MT")
df_buses_mt = df_buses_mt[~df_buses_mt["name"].astype(str).str.contains("Source", case=False)]

# BT - Low Voltage
df_branches_bt = pd.read_excel(branches_path, sheet_name="BT")
df_buses_bt = pd.read_excel(buses_path, sheet_name="BT")

# Transformers
df_trafos = pd.read_excel(branches_path, sheet_name="Trafos")
df_trafos.columns = df_trafos.columns.str.strip()

# PV
df_pv = pd.read_excel(buses_path, sheet_name="PV")

#
# NORMALIZATION OF PHASES

df_branches_mt['phase_list'] = df_branches_mt['phase'].apply(normalize_phases)
df_branches_bt['phase_list'] = df_branches_bt['phase'].apply(normalize_phases)
df_trafos['phase_list'] = df_trafos['phases'].apply(normalize_phases)
df_buses_mt['phase_list'] = df_buses_mt['phases'].apply(normalize_phases)
df_buses_bt['phase_list'] = df_buses_bt['phases'].apply(normalize_phases)
df_pv['phase_list'] = df_pv['phases'].apply(normalize_phases)


# MAPPING BARRAS MV

bus_id_mt = dict(zip(df_buses_mt['name'], df_buses_mt['N']))
slack_bus = df_buses_mt.loc[df_buses_mt["tb"] == 3, "N"].iloc[0]
buses_mt = df_buses_mt["N"].astype(int).unique().tolist()


# MAPPING BARRAS LV

bus_id_bt = dict(zip(df_buses_bt['name'], df_buses_bt['N']))
buses_bt = df_buses_bt["N"].astype(int).unique().tolist()


# BRANCHES PREPARATION MV

# Remove branch MT 650->RG60 (It's not a branch, it's an SVR)
df_branches_mt = df_branches_mt[
    ~((df_branches_mt["l"].astype(str) == "650") & (df_branches_mt["k"].astype(str) == "RG60"))
].copy()

# Convert l,k bars to numeric identifier
df_branches_mt['from_bus'] = df_branches_mt['l'].map(bus_id_mt)
df_branches_mt['to_bus'] = df_branches_mt['k'].map(bus_id_mt)

# Remove poorly mapped branches
df_branches_mt = df_branches_mt.dropna(subset=['from_bus', 'to_bus'])

df_branches_mt['from_bus'] = df_branches_mt['from_bus'].astype(int)
df_branches_mt['to_bus'] = df_branches_mt['to_bus'].astype(int)
df_branches_mt['m'] = df_branches_mt['m'].astype(int)
df_branches_mt['Imax'] = df_branches_mt['Imax'].astype(int)

branches_mt = df_branches_mt["m"].unique().tolist()


# BRANCHES PREPARATION LV

df_branches_bt['from_bus'] = df_branches_bt['l'].map(bus_id_bt)
df_branches_bt['to_bus'] = df_branches_bt['k'].map(bus_id_bt)
df_branches_bt = df_branches_bt.dropna(subset=['from_bus', 'to_bus'])

df_branches_bt['from_bus'] = df_branches_bt['from_bus'].astype(int)
df_branches_bt['to_bus'] = df_branches_bt['to_bus'].astype(int)
df_branches_bt['m'] = df_branches_bt['m'].astype(int)

branches_bt = df_branches_bt["m"].unique().tolist()


# SETS BY PHASE

PHASES = [1, 2, 3]

# Branch MV per phase
branch_phase_mt = []
for _, row in df_branches_mt.iterrows():
    m = int(float(row["m"]))
    for ph in row["phase_list"]:
        branch_phase_mt.append((m, ph))
branch_phase_mt = sorted(set(branch_phase_mt))

# Branch LV per phase
branch_phase_bt = []
for _, row in df_branches_bt.iterrows():
    m = int(float(row["m"]))
    for ph in row["phase_list"]:
        branch_phase_bt.append((m, ph))
branch_phase_bt = sorted(set(branch_phase_bt))

# Bus LV per phase
bus_phase_bt = sorted({
    (int(row["N"]), ph)
    for _, row in df_buses_bt.iterrows()
    for ph in row["phase_list"]
})


# Branches MV PARAMETERS
R_mt, X_mt, Imax_mt = {}, {}, {}
from_bus_mt, to_bus_mt, Length_mt, Vbase_mt, Smax_mt = {}, {}, {}, {}, {}

for _, row in df_branches_mt.iterrows():
    m = int(float(row["m"]))
    from_bus_mt[m] = int(row["from_bus"])
    to_bus_mt[m] = int(row["to_bus"])
    Imax_mt[m] = float(row["Imax"])
    Length_mt[m] = float(row["Length_m"])

    # V_base
    fb = from_bus_mt[m]
    Vbase_mt[m] = float(df_buses_mt.loc[df_buses_mt["N"] == fb, "v_nom_kv"].iloc[0])
    Smax_mt[m] = Vbase_mt[m] * Imax_mt[m]  # kVA por fase

    # R,x per phase
    for ph in row["phase_list"]:
        R_mt[(m, ph)] = float(row["R"])
        X_mt[(m, ph)] = float(row["X"])


# Branch LV param
R_bt, X_bt, Imax_bt = {}, {}, {}
from_bus_bt, to_bus_bt, Vbase_bt, Smax_bt = {}, {}, {}, {}

for _, row in df_branches_bt.iterrows():
    m = int(float(row["m"]))
    from_bus_bt[m] = int(row["from_bus"])
    to_bus_bt[m] = int(row["to_bus"])
    Imax_bt[m] = float(row["Imax"])

    # Vbase
    fb = from_bus_bt[m]
    Vbase_bt[m] = float(df_buses_bt.loc[df_buses_bt["N"] == fb, "v_nom_kv"].iloc[0])
    Smax_bt[m] = Vbase_bt[m] * Imax_bt[m]


# Load LV
Pload_bt, Qload_bt = {}, {}

for _, row in df_buses_bt.iterrows():
    b = int(row["N"])
    phs = row["phase_list"]
    if len(phs) == 0:
        continue

    Pd = float(row["P_D"]) if not pd.isna(row["P_D"]) else 0.0
    Qd = float(row["Q_D"]) if not pd.isna(row["Q_D"]) else 0.0

    share = 1.0 / len(phs)

    for ph in phs:
        Pload_bt[(b, ph)] = Pd * share
        Qload_bt[(b, ph)] = Qd * share


# PV

Ppv_bt = {}
Qpv_max = {}

# Initializes with zero for all buses/phases
for (b, ph) in bus_phase_bt:
    Ppv_bt[(b, ph)] = 0.0
    Qpv_max[(b, ph)] = 0.0

for _, row in df_pv.iterrows():
    bus_name = row["Bus"]
    if bus_name not in bus_id_bt:
        continue
    b = int(bus_id_bt[bus_name])

    for ph in row["phase_list"]:
        pcol = f"p_pv_{ph}"
        qcol = f"q_pv_{ph}"
        Ppv_bt[(b, ph)] = float(row[pcol]) if pcol in row and not pd.isna(row[pcol]) else 0.0
        Qpv_max[(b, ph)] = float(row[qcol]) if qcol in row and not pd.isna(row[qcol]) else 0.0


# TRANSFORMERS
# Filter the transformer DataFrame BEFORE creating the sets
df_trafos = df_trafos[df_trafos["trafo_id"] != "Sub"].copy()

# With the filtered DataFrame, create all the sets.
TR = sorted(df_trafos["trafo_id"].unique().tolist())
TR_PH = sorted({
    (tid, ph)
    for _, row in df_trafos.iterrows()
    for tid, phs in [(row["trafo_id"], row["phase_list"])]
    for ph in phs
})

#
# Number of phases per transformer
nph_TR = {row.trafo_id: len(row.phase_list) for _, row in df_trafos.iterrows()}

# Total nominal power (kVA)
Pnom_TR = {row.trafo_id: float(row.kva) for _, row in df_trafos.iterrows()}

# No-load losses (kW)
Pfe_TR = {row.trafo_id: float(row["Perdas Vazio kW"]) for _, row in df_trafos.iterrows()}

# Total losses (kW)
Pt_TR = {row.trafo_id: float(row["Perdas Totais kW"]) for _, row in df_trafos.iterrows()}

# Total copper losses
Pcu_TR = {tid: Pt_TR[tid] - Pfe_TR[tid] for tid in Pnom_TR}

# Copper losses by phase
Pcu_TR_PH = {}
Snom_TR_PH = {}

for _, row in df_trafos.iterrows():
    tid = row.trafo_id
    for ph in row.phase_list:
        Pcu_TR_PH[(tid, ph)] = Pcu_TR[tid] / nph_TR[tid]
        Snom_TR_PH[(tid, ph)] = Pnom_TR[tid] / nph_TR[tid]

# No-load loss per phase
Pfe_TR_PH = {
    (row.trafo_id, ph): Pfe_TR[row.trafo_id] / nph_TR[row.trafo_id]
    for _, row in df_trafos.iterrows()
    for ph in row.phase_list
}

# CLoss coefficients
alpha_tr = {}  # coef perdas P
beta_tr = {}   # coef perdas Q

for _, row in df_trafos.iterrows():
    tid = row["trafo_id"]
    r_per = float(row["r_per"])/100.0 if ("r_per" in row and not pd.isna(row["r_per"])) else 0.0
    xhl = float(row["XHL"])/100.0 if ("XHL" in row and not pd.isna(row["XHL"])) else 0.0

    for ph in row["phase_list"]:
        alpha_tr[(tid, ph)] = r_per
        beta_tr[(tid, ph)] = xhl

# Rated power of transformers
Snom_TR = {
    row["trafo_id"]: float(row["kva"])
    for _, row in df_trafos.iterrows()
}

# Create a phase dictionary per transformer.
trafo_phases_dict = {row["trafo_id"]: row["phase_list"] for _, row in df_trafos.iterrows()}

# Create virtual buses only for valid transformers
base_tr_bus = 10_000
trafo_MT_bus = {}
trafo_BT_bus = {}

for i, (_, row) in enumerate(df_trafos.iterrows()):
    tid = row["trafo_id"]
    trafo_MT_bus[tid] = base_tr_bus + 2 * i
    trafo_BT_bus[tid] = base_tr_bus + 2 * i + 1

# Mapping between transformers and actual busbars
trafo_to_mt_bus = {}
trafo_to_bt_bus = {}

for _, row in df_trafos.iterrows():
    tid = row["trafo_id"]
    mt_bus_name = row["mv_bus"]
    bt_bus_name = row["lv_bus"]

    if mt_bus_name in bus_id_mt:
        trafo_to_mt_bus[tid] = bus_id_mt[mt_bus_name]
    else:
        print(f"AVISO: Barra MT '{mt_bus_name}' não encontrada para trafo {tid}")
        # Set a default value
        trafo_to_mt_bus[tid] = buses_mt[0] if buses_mt else -1

    if bt_bus_name in bus_id_bt:
        trafo_to_bt_bus[tid] = bus_id_bt[bt_bus_name]
    else:
        print(f"AVISO: Barra BT '{bt_bus_name}' não encontrada para trafo {tid}")
       # Set a default value
        trafo_to_bt_bus[tid] = buses_bt[0] if buses_bt else -1

# Create sets of bars including virtual ones
all_mt_buses = buses_mt + [trafo_MT_bus[tid] for tid in TR]
all_bt_buses = buses_bt + [trafo_BT_bus[tid] for tid in TR]

# guarantees numeric values ​​(avoids strings like '4,16', etc.)
df_trafos["kv_mv"] = pd.to_numeric(df_trafos["kv_mv"], errors="coerce")
df_trafos["kv_lv"] = pd.to_numeric(df_trafos["kv_lv"], errors="coerce")

# dictionaries by trafo_id
kv_mv_dict = {
    row["trafo_id"]: float(row["kv_mv"])
    for _, row in df_trafos.iterrows()
}
kv_lv_dict = {
    row["trafo_id"]: float(row["kv_lv"])
    for _, row in df_trafos.iterrows()
}


# Regulador

In [None]:
# Mapeamento explícito regulador → fase
reg_phase_map = {
    "Reg1": 1,
    "Reg2": 2,
    "Reg3": 3,
}

reg_phase = [(rid, ph) for rid, ph in reg_phase_map.items()]

model.REG_PH = Set(dimen=2, initialize=reg_phase)



In [None]:
reg_mv_bus = {row["trafo_id"]: int(bus_id_map[row["mv_bus"]]) for _, row in df_reg.iterrows()}
reg_lv_bus = {row["trafo_id"]: int(bus_id_map[row["lv_bus"]]) for _, row in df_reg.iterrows()}

vreg = {}
band = {}
step = {}
tap_min = {}
tap_max = {}

NTAPS = 16  # <<< DEFINIÇÃO CORRETA (engenharia)

for _, row in df_reg.iterrows():
    rid = row["trafo_id"]
    if rid not in reg_phase_map:
        continue

    ph = reg_phase_map[rid]

    vreg[(rid,ph)] = row["vreg_V"] / (row["kv_mv"] * 1000)
    band[(rid,ph)] = row["band_V"] / (row["kv_mv"] * 1000)

    # Step (% → pu)
    step[(rid,ph)] = float(row["Step"]) / 100.0

    tap_min[(rid,ph)] = -NTAPS
    tap_max[(rid,ph)] =  NTAPS


In [None]:
model.tap = Var(model.REG_PH, model.T, domain=Integers)


In [None]:
def tap_bounds(m, rid, ph, t):
    return (tap_min[(rid,ph)], tap_max[(rid,ph)])

model.tap = Var(model.REG_PH, model.T, domain=Integers, bounds=tap_bounds)


AttributeError: 'ConcreteModel' object has no attribute 'REG_PH'

In [None]:
# NÃO aplicar regulador se j == mv_bus do trafo
def reg_voltage(mm, rid, ph, t):
    i = reg_mv_bus[rid]
    j = reg_lv_bus[rid]
    if j in trafo_mv_bus.values():
        return Constraint.Skip
    return mm.v_MT[j,ph,t] == mm.v_MT[i,ph,t] + 2*step[(rid,ph)]*mm.tap[rid,ph,t]


model.RegVoltage = Constraint(model.REG_PH, model.T, rule=reg_voltage_rule)


In [None]:
model.over_v = Var(model.REG_PH, model.T, domain=Binary)
model.under_v = Var(model.REG_PH, model.T, domain=Binary)


In [None]:
M = 10.0

def over_voltage_rule(m, rid, ph, t):
    j = reg_lv_bus[rid]
    return m.v[j,ph] - (vreg[(rid,ph)] + band[(rid,ph)]/2) <= M * m.over_v[rid,ph,t]

def under_voltage_rule(m, rid, ph, t):
    j = reg_lv_bus[rid]
    return (vreg[(rid,ph)] - band[(rid,ph)]/2) - m.v[j,ph] <= M * m.under_v[rid,ph,t]

model.OverV = Constraint(model.REG_PH, model.T, rule=over_voltage_rule)
model.UnderV = Constraint(model.REG_PH, model.T, rule=under_voltage_rule)


AttributeError: 'ConcreteModel' object has no attribute 'REG_PH'

In [None]:
t0 = T_list[0]
for (rid,ph) in m.REG_PH:
    m.tap[rid,ph,t0].fix(0)
    m.tap_up[rid,ph,t0].fix(0)
    m.tap_down[rid,ph,t0].fix(0)



def tap_evolution_rule(m, rid, ph, t):
    if t == 0:
        return Constraint.Skip
    return (
        m.tap[rid,ph,t]
        == m.tap[rid,ph,t-1]
        + m.tap_up[rid,ph,t]
        - m.tap_down[rid,ph,t]
    )

model.TapEvolution = Constraint(model.REG_PH, model.T, rule=tap_evolution_rule)


In [None]:
def reg_energy_conservation(m, rid, ph, t):
    # Regulador não gera potência
    return m.Preg[rid,ph,t] == 0
m.REG_NoGenP = pyo.Constraint(m.REG_PH, m.T, rule=reg_energy_conservation)


In [None]:
def tap_up_rule(m, rid, ph, t):
    return m.tap_up[rid,ph,t] <= m.under_v[rid,ph,t]

def tap_down_rule(m, rid, ph, t):
    return m.tap_down[rid,ph,t] <= m.over_v[rid,ph,t]

model.TapUpRule = Constraint(model.REG_PH, model.T, rule=tap_up_rule)
model.TapDownRule = Constraint(model.REG_PH, model.T, rule=tap_down_rule)


AttributeError: 'ConcreteModel' object has no attribute 'REG_PH'