# Trabalho de Análise de Redes
Autor: Gabriel Halfeld Limp de Carvalho

## Importando as bibliotecas

In [39]:
import numpy as np
import math
from dataclasses import dataclass, field
from typing import ClassVar, List, Optional, Dict
from __future__ import annotations
import pandas as pd

## Classes do Projeto

### Bus

In [40]:
@dataclass
class Bus:
    network: "Network"
    id: Optional[int] = None
    name: Optional[str] = None
    bus_type: str = "PQ"  # "slack", "PQ", "PV"
    v: float = 1.0 # in pu
    theta: float = 0.0 # in degrees
    Sb: float = 1.0 # Base power in MVA
    Sh: float = 0.0 # Shunt admittance connected to the bus

    # Relacionamentos
    loads: List["Load"] = field(default_factory=list)
    generators: List["Generator"] = field(default_factory=list)

    _id_counter: ClassVar[int] = 0

    def __post_init__(self):
        if self.id is None:
            self.id = Bus._id_counter
            Bus._id_counter += 1
        else:
            self.id = int(self.id)
            if self.id >= Bus._id_counter:
                Bus._id_counter = self.id + 1

        if self.name is None:
            self.name = f"Bus {self.id}"

        # Add the bus to the network
        self.network.buses.append(self)
    
    @property
    def theta_rad(self) -> float:
        return np.deg2rad(self.theta)
    
    @property
    def p(self) -> float:
        """Net active power injection (pu)"""
        return sum(g.p for g in self.generators) - sum(l.p for l in self.loads)

    @property
    def q(self) -> float:
        """Net reactive power injection (pu)"""
        return sum(g.q for g in self.generators) - sum(l.q for l in self.loads) 

    @property
    def shunt(self) -> complex:
        """Shunt admittance connected to the bus (pu)"""
        return (self.Sh*1j)/self.Sb
    
    # add_generator and add_load methods are used inside Generator and Load classes automatically. 
    # You just need to inform wich bus the generator or load is connected to.
    def add_generator(self, generator: 'Generator'):
        if generator not in self.generators:
            self.generators.append(generator)

    def add_load(self, load: 'Load'):
        if load not in self.loads:
            self.loads.append(load)

    def __repr__(self):
        return (f"Bus(id={self.id}, type={self.bus_type}, v={self.v:.3f} pu, "
                 f"theta={self.theta_rad:.3f} rad, p={self.p:.3f} pu, q={self.q:.3f} pu, "
                 f"shunt={self.shunt:.3f} pu, "
                 f"gen={len(self.generators)}, load={len(self.loads)})")
    

### PowerDevice

#### Generator

In [None]:
@dataclass
class Generator:
    bus: 'Bus'
    name: Optional[str] = None
    id: Optional[int] = None

    pb: float = 1.0
    p_input: float = 0.0
    q_input: float = 0.0
    p_max_input: float = float('inf')
    p_min_input: float = 0.0
    q_max_input: Optional[float] = None
    q_min_input: Optional[float] = None
    cost_a_input: float = 0.0
    cost_b_input: float = 0.0
    cost_c_input: float = 0.0

    _id_counter: ClassVar[int] = 0

    def __post_init__(self):
        if self.id is None:
            self.id = Generator._id_counter
            Generator._id_counter += 1
        else:
            self.id = int(self.id)
            if self.id >= Generator._id_counter:
                Generator._id_counter = self.id + 1

        if self.name is None:
            self.name = f"Generator {self.id}"

        self.bus.add_generator(self)
        self.network = self.bus.network
        self.network.generators.append(self)

    @property
    def p(self) -> float:
        return self.p_input / self.pb

    @property
    def q(self) -> float:
        return self.q_input / self.pb

    @property
    def p_max(self) -> float:
        return self.p_max_input / self.pb

    @property
    def p_min(self) -> float:
        return self.p_min_input / self.pb

    @property
    def q_max(self) -> Optional[float]:
        return self.q_max_input / self.pb if self.q_max_input is not None else None

    @property
    def q_min(self) -> Optional[float]:
        return self.q_min_input / self.pb if self.q_min_input is not None else None

    @property
    def cost_a(self) -> float:
        return self.cost_a_input * self.pb

    @property
    def cost_b(self) -> float:
        return self.cost_b_input * self.pb

    @property
    def cost_c(self) -> float:
        return self.cost_c_input * self.pb

    def __repr__(self):
        return (f"Generator(id={self.id}, bus={self.bus.id}, p={self.p:.3f}, q={self.q:.3f}, "
                f"p_range=[{self.p_min:.3f},{self.p_max:.3f}], q_range=[{self.q_min},{self.q_max}]),")

#### Load

In [42]:
@dataclass
class Load:
    bus: 'Bus'
    name: Optional[str] = None
    id: Optional[int] = None

    pb: float = 1.0
    p_input: float = 0.0
    q_input: float = 0.0
    p_max_input: float = float('inf')
    p_min_input: float = 0.0
    q_max_input: Optional[float] = None
    q_min_input: Optional[float] = None
    cost_a_input: float = 0.0
    cost_b_input: float = 0.0
    cost_c_input: float = 0.0

    _id_counter: ClassVar[int] = 0

    def __post_init__(self):
        if self.id is None:
            self.id = Load._id_counter
            Load._id_counter += 1
        else:
            self.id = int(self.id)
            if self.id >= Load._id_counter:
                Load._id_counter = self.id + 1

        if self.name is None:
            self.name = f"Load {self.id}"

        self.bus.add_load(self)
        self.network = self.bus.network
        self.network.loads.append(self)

    @property
    def p(self) -> float:
        return self.p_input / self.pb

    @property
    def q(self) -> float:
        return self.q_input / self.pb

    @property
    def p_max(self) -> float:
        return self.p_max_input / self.pb

    @property
    def p_min(self) -> float:
        return self.p_min_input / self.pb

    @property
    def q_max(self) -> Optional[float]:
        return self.q_max_input / self.pb if self.q_max_input is not None else None

    @property
    def q_min(self) -> Optional[float]:
        return self.q_min_input / self.pb if self.q_min_input is not None else None

    @property
    def cost_a(self) -> float:
        return self.cost_a_input * self.pb

    @property
    def cost_b(self) -> float:
        return self.cost_b_input * self.pb

    @property
    def cost_c(self) -> float:
        return self.cost_c_input * self.pb

    def __repr__(self):
        return (f"Load(id={self.id}, bus={self.bus.id}, p={self.p:.3f}, q={self.q:.3f}, "
                f"p_range=[{self.p_min:.3f},{self.p_max:.3f}], q_range=[{self.q_min},{self.q_max}])")

### Line

In [196]:
@dataclass
class Line:
    from_bus: Bus
    to_bus: Bus
    id: Optional[int] = None
    name: Optional[str] = None
    pb: float = 1.0
    vb: float = 1.0
    r: float = 0.0
    x: float = 0.01
    b_half: float = 0.0
    flux_max: float = float('inf')
    tap_ratio: float = 1.0
    tap_phase: float = 0.0

    _id_counter: ClassVar[int] = 0  # ID counter for lines

    def __post_init__(self):
        # Set default values if not provided
        if self.id is None:
            self.id = Line._id_counter
            Line._id_counter += 1
        else:
            self.id = int(self.id)
            if self.id >= Line._id_counter:
                Line._id_counter = self.id + 1

        if self.name is None:
            self.name = f"Line {self.id}"
        else:
            self.name = str(self.name)

        if self.from_bus.network != self.to_bus.network:
            raise ValueError("Both buses must belong to the same network.")
        
        self.network = self.from_bus.network #Add network to line
        self.network.lines.append(self) #Add line to network

    @property
    def zb(self) -> float:
        """Impedância base (pu)"""
        return self.vb**2 / self.pb

    @property
    def resistance(self) -> float:
        """Resistência da linha (pu)"""
        return self.r / self.zb

    @property
    def reactance(self) -> float:
        """Reatância da linha (pu)"""
        return self.x / self.zb

    @property
    def shunt_admittance_half(self) -> float:
        """Admitância shunt (half) (pu)"""
        return self.b_half / self.zb

    @property
    def impedance(self) -> complex:
        """Impedância da linha (ohms)"""
        return complex(self.resistance, self.reactance)

    @property
    def admittance(self) -> complex:
        """Admitância da linha (S)"""
        return 1 / self.impedance

    @property
    def tap_phase_rad(self) -> float:
        """Fase de tap em radianos"""
        return np.deg2rad(self.tap_phase)

    def get_admittance_elements(self, bus_index: Dict[str, int]):
        """Gera os elementos de admitância baseados nos parâmetros da linha"""
        y = self.admittance
        b = self.shunt_admittance_half * 1j
        a = self.tap_ratio * np.exp(1j * self.tap_phase_rad)
        i = bus_index[self.from_bus.id]
        j = bus_index[self.to_bus.id]
        if self.tap_ratio != 1.0 or self.tap_phase != 0.0:
            Yff = y / (a * np.conj(a)) + b
            Yft = -y / np.conj(a)
            Ytf = -y / a
            Ytt = y + b
        else:
            Yff = y + b
            Yft = -y
            Ytf = -y
            Ytt = y + b
        return [((i, i), Yff), ((i, j), Yft), ((j, i), Ytf), ((j, j), Ytt)]

    def __repr__(self):
        return (f"Line(id={self.name}, Barra para:{self.from_bus.id}, Barra de:{self.to_bus.id}, r={self.resistance:.4f}, x={self.reactance:.4f}, tap_ratio={self.tap_ratio:.4f}, tap_phase={self.tap_phase:.4f}, b_half={self.shunt_admittance_half:.4f})")

### Network

In [44]:
@dataclass
class Network:
    id: Optional[int] = None
    name: Optional[str] = None
    buses: List[Bus] = field(default_factory=list)
    lines: List[Line] = field(default_factory=list)
    loads: List[Load] = field(default_factory=list)
    generators: List[Generator] = field(default_factory=list)

    def y_bus(self) -> np.ndarray:
        """
        Returns the Y bus matrix of the network.
        """
        n = len(self.buses)
        bus_idx = {bus.id: i for i, bus in enumerate(self.buses)}
        ybus = np.zeros((n, n), dtype=complex)
        for line in self.lines: #Adiciona os elementos de admitância da linha
            for (i, j), y in line.get_admittance_elements(bus_idx):
                ybus[i, j] += y
        
        for i, bus in enumerate(self.buses):
            ybus[i, i] += bus.shunt
    
        return ybus
        
    def get_G(self):
        return self.y_bus().real
    
    def get_B(self):
        return self.y_bus().imag
    
    def __repr__(self):
        return f"Network(id={self.id}, name={self.name})"

### AC_PF

In [45]:
class AC_PF:
    def __init__(self, network: Network):
        """
        Initializes the AC Power Flow class.
        """
        self.network = network # Network object

        # YBUS
        self.G = self.network.get_G() # Real part of YBUS
        self.B = self.network.get_B() # Imaginary part of YBUS

        # Number of buses
        self.nbus = len(self.network.buses)

        #Organize bus types:
        self.pq_buses = [bus for bus in self.network.buses if bus.bus_type == 'PQ'] # PQ buses
        self.pv_buses = [bus for bus in self.network.buses if bus.bus_type == 'PV'] # PV buses
        self.slack_bus = [bus for bus in self.network.buses if bus.bus_type == 'Slack'] # Slack bus

        # Bus Maps:
        self.bus_idx = {bus.id: i for i, bus in enumerate(self.network.buses)} # Bus Map, key: bus id, value: bus index
        self.pq_idx = [self.bus_idx[bus.id] for bus in self.pq_buses] # PQ buses
        self.pv_idx = [self.bus_idx[bus.id] for bus in self.pv_buses] # PV buses
        self.slack_idx = [self.bus_idx[bus.id] for bus in self.slack_bus] # Slack bus
        self.K = self.get_K_set() # K set: Set of buses connected to each bus including itself
        self.omega = self.get_omega_set() # Omega set: Set of buses connected to each bus excluding itself

        # Initialize voltage angles and magnitudes
        self.theta_0 = np.array([bus.theta_rad for bus in self.network.buses]) # Voltage angles
        self.V_0 = np.array([bus.v for bus in self.network.buses]) # Voltage magnitudes
        self.X_0 = np.concatenate((self.theta_0, self.V_0)) # State vector

        # Initialize P and Q
        self.P_esp = np.array([bus.p for bus in self.network.buses]) # Active power
        self.Q_esp = np.array([bus.q for bus in self.network.buses]) # Reactive power
        self.PQ_esp = np.concatenate((self.P_esp, self.Q_esp)) # Power vector

        # Initialize the final calculated vectors
        self.theta = np.zeros(self.nbus) # Voltage angles
        self.V_ = np.ones(self.nbus) # Voltage magnitudes

    
    def get_K_set(self):
        """
        Returns the K set, which is the set of buses connected to each bus.
        """
        K_set = {}
        for bus in self.network.buses:
            i = self.bus_idx[bus.id]
            connected_indices = {i}    #include the bus itself

            for line in self.network.lines:
                if line.from_bus.id == bus.id:
                    connected_indices.add(self.bus_idx[line.to_bus.id])
                elif line.to_bus.id == bus.id:
                    connected_indices.add(self.bus_idx[line.from_bus.id])
            K_set[i] = connected_indices
        return K_set

    def get_omega_set(self):
        """
        Returns the omega set, which is the set of buses connected to each bus, excluding itself.
        """
        omega_set = {}
        for bus in self.network.buses:
            i = self.bus_idx[bus.id]
            connected_indices = set()

            for line in self.network.lines:
                if line.from_bus.id == bus.id:
                    connected_indices.add(self.bus_idx[line.to_bus.id])
                elif line.to_bus.id == bus.id:
                    connected_indices.add(self.bus_idx[line.from_bus.id])
            omega_set[i] = connected_indices
        
        return omega_set

    # Method for power equations: It receives current V and theta for all buses and returns calculated P's and Q's.
    def pq_calc(self, theta, V):
        P = np.zeros(self.nbus)
        Q = np.zeros(self.nbus)
        for i in range(self.nbus):
            Vi = V[i]
            theta_i = theta[i]
            for j in self.K[i]:
                V_j = V[j]
                sin_diff = np.sin(theta_i - theta[j])
                cos_diff = np.cos(theta_i - theta[j])
                Gij = self.G[i, j]
                Bij = self.B[i, j]
                Vj = V[j]

                P[i] += Vi * Vj * (Gij * cos_diff + Bij * sin_diff)
                Q[i] += Vi * Vj * (Gij * sin_diff - Bij * cos_diff)
        return P, Q

    # Method for Power Mismatch:
    def power_mismatch(self, P, Q):
        dP = self.P_esp - P
        dQ = self.Q_esp - Q

        # Set the mismatch to zero for slack bus:
        for i in self.slack_idx:
            dP[i] = 0
            dQ[i] = 0

        # Set the Q mismatch to zero for PV buses:
        for i in self.pv_idx:
            dQ[i] = 0
        return dP, dQ


    def jacobian(self, theta, V, P, Q):
        n = self.nbus
        G = self.G
        B = self.B
        H = np.zeros((n, n)) # dP/dtheta
        N = np.zeros((n, n)) # dP/dV
        M = np.zeros((n, n)) # dQ/dtheta
        L = np.zeros((n, n)) # dQ/dV

        for i in range(n):
            V_i = V[i]
            theta_i = theta[i]
            B_ii = B[i, i]
            G_ii = G[i, i]
            P_i = P[i]
            Q_i = Q[i]
            for j in self.omega[i]:
                V_j = V[j]
                sin_diff = np.sin(theta_i - theta[j])
                cos_diff = np.cos(theta_i - theta[j])
                Gij = G[i, j]
                Bij = B[i, j]

                H[i, j] =  V_i * V_j * (Gij * sin_diff - Bij * cos_diff)
                N[i, j] = V_i * (Gij * cos_diff + Bij * sin_diff)
                M[i, j] = -V_i * V_j * (Gij * cos_diff + Bij * sin_diff)
                L[i, j] = V_i * (Gij * sin_diff - Bij * cos_diff)

            # Set the diagonal elements of H, N, M, and L
            H[i, i] = -V_i ** 2 * B_ii - Q_i
            N[i, i] = (P_i + V_i ** 2 * G_ii) / V_i
            M[i, i] = P_i - V_i ** 2 * G_ii
            L[i, i] = (Q_i - V_i ** 2 * B_ii) / V_i
            
        J = np.block([[H, N], [M, L]])

        for i in self.slack_idx:
            J[i, :] = 0   # P row equation set to zero
            J[i, i] = 1   # excpect the diagonal element, which is 1

            J[n + i, :] = 0 # Q row equation set to zero
            J[n + i, n + i] = 1 # except the diagonal element, which is 1
        
        for i in self.pv_idx:
            J[n + i, :] = 0 # Q row equation set to zero
            J[n + i, n + i] = 1 # except the diagonal element, which is 1
        
        return J

    def solve(self, tol_P = 1e-6, tol_Q = 1e-6, max_iter = 100, verbose = False):
        """
        Solves the power flow problem using the Newton-Raphson method.
        If verbose is True, prints detailed iteration information.
        """
        V = self.V_0
        theta = self.theta_0
            
        nbus = self.nbus

        for iter in range(max_iter):
            P, Q = self.pq_calc(theta, V)
            dP, dQ = self.power_mismatch(P, Q)
            dX = np.concatenate((dP, dQ))

            if verbose:
                print(f" \n=== Iteration {iter} === ")
                for i, bus in enumerate(self.network.buses):
                    print(f"{bus.name}: P = {P[i]:.4f}pu, Q = {Q[i]:.4f}pu, V = {V[i]:.4f}pu, theta = {np.rad2deg(theta[i]):.4f}°")
                    

            if np.linalg.norm(dP, np.inf)< tol_P and np.linalg.norm(dQ, np.inf) < tol_Q:
                print("Converged in", iter, "iterations.")
                break

            J = self.jacobian(theta, V, P, Q)
            dX = np.linalg.solve(J, dX)
            theta = theta + dX[:nbus]
            V = V + dX[nbus:]

        else:
            print("Failed to converge in", max_iter, "iterations.")

        # Atualize state variables
        self.V = V
        self.theta = np.rad2deg(theta)

        return None

In [None]:
class PWF_Reader:
    """
    Classe para leitura de arquivos de fluxo de potência em formato PWF.
    """

    def __init__(self, caminho_arquivo):
        self.caminho = caminho_arquivo
        self.df_bar = None
        self.df_lin = None
        self.df_dcte = None
        self.df_titu = None
        self.df_dglt = None
        self._ler_arquivo()

    def _ler_arquivo(self):
        with open(self.caminho, 'r') as f:
            linhas = f.readlines()

        i = 0
        while i < len(linhas):
            linha = linhas[i]

            # Leitura do cabeçalho (nome do sistema)
            if linha.strip().startswith('TITU'):
                i += 1  # Avança para a linha que contém o nome do sistema
                self.nome_sistema = linhas[i].strip('*').strip()  # Captura o nome do sistema removendo asteriscos e espaços
                i += 1
                continue

            # Leitura do bloco DBAR
            if linha.strip().startswith('DBAR'):
                i += 2  # Pula cabeçalhos
                dados_barras = []
                while not linhas[i].strip().startswith('99999'):
                    l = linhas[i]
                    dados_barras.append({
                        'NUM': int(l[0:5]),
                        'TPO': int(l[7]),
                        'NOME': l[10:22].strip(),
                        'V': float(l[24:28]) / 1000,
                        'A': float(l[28:32]) if l[28:32].strip() else 0.0,
                        'PG': float(l[32:37]) if l[32:37].strip() else 0.0,
                        'QG': float(l[37:42]) if l[37:42].strip() else 0.0,
                        'QN': float(l[42:47]) if l[42:47].strip() else 0.0,
                        'QM': float(l[47:52]) if l[47:52].strip() else 0.0,
                        'BC': float(l[52:58]) if l[52:58].strip() else 0.0,
                        'PL': float(l[58:63]) if l[58:63].strip() else 0.0,
                        'QL': float(l[63:68]) if l[63:68].strip() else 0.0,
                        'SH': float(l[68:73]) if l[68:73].strip() else 0.0,
                        'ARE': float(l[73:76]) if l[73:76].strip() else 0.0
                    })
                    i += 1
                self.df_bar = pd.DataFrame(dados_barras)

            # Leitura do bloco DLIN
            elif linha.strip().startswith('DLIN'):
                i += 2  # Pula cabeçalhos
                dados_linhas = []
                while not linhas[i].strip().startswith('99999'):
                    l = linhas[i]
                    dados_linhas.append({
                        'DE': int(l[0:5]),                                                    # Barra De
                        'PARA': int(l[11:15]),                                                # Barra Para
                        'R%': float(l[20:26]) / 100 if l[20:26].strip() else 0.0,              # Resistência (% -> p.u.)
                        'X%': float(l[26:32]) / 100 if l[26:32].strip() else 0.0,              # Reatância (% -> p.u.)
                        'B': float(l[32:38]) / 100 if l[32:38].strip() else 0.0,              # Mvar susceptância total
                        'TAP': float(l[38:43]) if l[38:43].strip() else 1.0,                   # Tap
                        'TAP_MIN': float(l[42:48]) if l[43:48].strip() else 0.0,  # Tap mínimo
                        'TAP_MAX': float(l[48:53]) if l[48:53].strip() else 0.0,  # Tap máximo
                        'PHS': float(l[53:58]) if l[53:55].strip() else 0,  # Ângulo de fase
                        'Bc' : float(l[58:64]) if l[55:61].strip() else 0.0,  # Susceptância shunt
                        'Cn' : float(l[64:68]) if l[61:67].strip() else 0.0,  # Susceptância shunt
                        'Ce' : float(l[68:72]) if l[67:73].strip() else 0.0,  # Susceptância shunt
                        'Ns' : float(l[72:74]) if l[70:73].strip() else 0.0,  # Susceptância shunt
                    })
                    i += 1
                self.df_lin = pd.DataFrame(dados_linhas)

            i += 1

    def get_barras(self):
        return self.df_bar

    def get_linhas(self):
        return self.df_lin

    def get_nome_sistema(self):
        return self.nome_sistema

In [210]:
class PWF_Reader:
    """
    Class for reading PWF (Power Flow) files and extracting system data into DataFrames.
    """
    def __init__(self, caminho_arquivo):
        self.caminho = caminho_arquivo
        self.df_bar = None      # DataFrame for DBAR block (bus data)
        self.df_lin = None      # DataFrame for DLIN block (line data)
        self.df_dcte = None     # DataFrame for DCTE block (system constants)
        self.df_titu = None     # DataFrame for TITU block (system title)
        self.df_dglt = None     # DataFrame for DGLT block (voltage limits)
        self.nome_sistema = None
        self._ler_arquivo()     # Read and parse the file

    def _ler_arquivo(self):
        # Read all lines from the file
        with open(self.caminho, 'r') as f:
            linhas = f.readlines()

        i = 0
        while i < len(linhas):
            linha = linhas[i]

            # Detect and read TITU block (system name/title)
            if linha.strip().startswith('TITU'):
                self._ler_titu(linhas, i)
                i += len(self.df_titu)  # Move past the TITU block
                continue

            # Detect and read DCTE block (system parameters/constants)
            if linha.strip().startswith('DCTE'):
                self._ler_dcte(linhas, i)
                i += 1  # Move past the DCTE block
                continue

            # Detect and read DBAR block (bus data)
            if linha.strip().startswith('DBAR'):
                self._ler_dbar(linhas, i)
                i += len(self.df_bar)  # Move past the DBAR block
                continue

            # Detect and read DLIN block (line data)
            if linha.strip().startswith('DLIN'):
                self._ler_dlin(linhas, i)
                i += len(self.df_lin)  # Move past the DLIN block
                continue

            # Detect and read DGLT block (voltage limits per generator)
            if linha.strip().startswith('DGLT'):
                self._ler_dglt(linhas, i)
                i += 1  # Move past the DGLT block
                continue

            i += 1

    def _ler_titu(self, linhas, i):
        """ Read TITU block (system title) """
        i += 1  # Skip to the line with the system name
        self.nome_sistema = linhas[i].strip()  # Remove asterisks and whitespace
        self.df_titu = pd.DataFrame({'Nome do Sistema': [self.nome_sistema]})

    def _ler_dcte(self, linhas, i):
        """ Read DCTE block (system constants) """
        dados_dcte = {}
        i += 2  # Skip header line with (Mn)(Val)
        
        while i < len(linhas):
            linha = linhas[i].strip()
            if linha.startswith("9999"):  # End of block
                break
            
            j = 0
            # Read key-value pairs in fixed-width format
            while j + 5 < len(linha):
                chave = linha[j:j+4].strip()
                valor = linha[j+5:j+12].strip()
                try:
                    dados_dcte[chave] = float(valor)
                except ValueError:
                    dados_dcte[chave] = valor  # Keep as string if not a float
                j += 12  # Move to next key-value pair

            i += 1

        self.df_dcte = pd.DataFrame([dados_dcte])

    def _ler_dbar(self, linhas, i):
        """ Read DBAR block (bus data) """
        i += 2  # Skip headers
        dados_barras = []
        while not linhas[i].strip().startswith('99999'):  # End of block
            l = linhas[i]
            dados_barras.append({
                'NUM': int(l[0:6]),
                'TPO': int(l[7]),
                'NOME': l[10:22].strip(),
                'V': float(l[24:28]) / 1000,
                'A': float(l[28:32]) if l[28:32].strip() else 0.0,
                'PG': float(l[32:37]) if l[32:37].strip() else 0.0,
                'QG': float(l[37:42]) if l[37:42].strip() else 0.0,
                'QN': float(l[42:47]) if l[42:47].strip() else 0.0,
                'QM': float(l[47:52]) if l[47:52].strip() else 0.0,
                'BC': float(l[52:58]) if l[52:58].strip() else 0.0,
                'PL': float(l[58:63]) if l[58:63].strip() else 0.0,
                'QL': float(l[63:68]) if l[63:68].strip() else 0.0,
                'SH': float(l[68:73]) if l[68:73].strip() else 0.0,
                'ARE': float(l[73:76]) if l[73:76].strip() else 0.0
            })
            i += 1
        self.df_bar = pd.DataFrame(dados_barras)
        type_map = {
            2: "Slack",
            1: "PV",
            0: "PQ"
        }
        self.df_bar["TPO"] = self.df_bar["TPO"].replace(type_map)

    def _ler_dlin(self, linhas, i):
        """ Read DLIN block (line data) """
        i += 2  # Skip headers
        dados_linhas = []
        while not linhas[i].strip().startswith('99999'):  # End of block
            l = linhas[i]
            dados_linhas.append({
                'DE': int(l[0:5]),
                'PARA': int(l[11:15]),
                'R%': float(l[20:26]) if l[20:26].strip() else 0.0,
                'X%': float(l[26:32]) if l[26:32].strip() else 0.0,
                'B': float(l[32:38]) if l[32:38].strip() else 0.0,
                'TAP': float(l[38:43]) if l[38:43].strip() else 1.0,
                'TAP_MIN': float(l[42:48]) if l[43:48].strip() else 0.0,
                'TAP_MAX': float(l[48:53]) if l[48:53].strip() else 0.0,
                'PHS': float(l[53:58]) if l[53:55].strip() else 0,
                'Bc' : float(l[58:64]) if l[55:61].strip() else 0.0,
                'Cn' : float(l[64:68]) if l[61:67].strip() else 0.0,
                'Ce' : float(l[68:72]) if l[67:73].strip() else 0.0,
                'Ns' : float(l[72:74]) if l[70:73].strip() else 0.0,
            })
            i += 1
        self.df_lin = pd.DataFrame(dados_linhas)

    def _ler_dglt(self, linhas, i):
        """ Read DGLT block (voltage limits per generator) """
        dados_dglt = []
        i += 1  # Skip header line
        
        while i < len(linhas):
            linha = linhas[i].strip()
            if linha.startswith("9999") or linha.upper().startswith("FIM"):  # End of block
                break

            partes = linha.split()
            if len(partes) >= 3:
                try:
                    g = int(partes[0])
                    vmn = float(partes[1])
                    vmx = float(partes[2])
                    dados_dglt.append({'G': g, 'Vmn': vmn, 'Vmx': vmx})
                except ValueError:
                    pass  # Ignore or handle parsing error

            i += 1

        self.df_dglt = pd.DataFrame(dados_dglt)

In [201]:
class PWF_Network_Builder:
    """
    Class for building a network from PWF (Power Flow) data.
    """

    def __init__(self, caminho_arquivo):
        self.reader = PWF_Reader(caminho_arquivo)
        self.net_name = self.reader.nome_sistema
        self.df_bar = self.reader.df_bar
        self.df_lin = self.reader.df_lin
        self.df_dcte = self.reader.df_dcte
        self.df_titu = self.reader.df_titu
        self.df_dglt = self.reader.df_dglt
        self.Sb = self.df_dcte['BASE'].values[0]
        self.bus_dict = {} # Dictionary to store bus objects and its ID
        self.net = Network(name=self.net_name)
    
    def build_buses(self):
        """
        Build bus objects from the DBAR block.
        """
        self.buses = []
        for _, row in self.df_bar.iterrows():
            bus = Bus(
                network = self.net,
                id = row['NUM'],
                name = row['NOME'],
                bus_type = row['TPO'],
                v = row['V'],
                theta = row['A'],
                Sb = self.Sb,
                Sh = row['SH'],
            )
            self.buses.append(bus)
            self.bus_dict[bus.id] = bus # Add bus to the dictionary
        
        return self.net
        
    def build_lines(self):
        """
        Build line objects from the DLIN block.
        """
        self.lines = []
        line_counter = 1 # Counter to assign unique IDs to lines
        for _, row in self.df_lin.iterrows():
            frombus = self.bus_dict[row['DE']]
            tobus = self.bus_dict[row['PARA']]

            line = Line(
                from_bus = frombus,
                to_bus = tobus,
                id = line_counter,
                name = f"Line {line_counter}",
                r = row['R%']/self.Sb,
                x = row['X%']/self.Sb,
                b_half = (row['B'] * 0.5)/self.Sb,
                tap_ratio = row['TAP'],
                tap_phase = row['PHS'],
            )
            self.lines.append(line)
            line_counter += 1
        
        return self.net
    
    def build_generators(self):
        """
        Build generator objects from the DGLT block.
        """
        self.generators = []
        gen_counter = 1  # Counter to assign unique IDs to generators

        for _, row in self.df_bar.iterrows():
            # Verifica se há geração (considerando todos os campos)
            if any(row[key] != 0 for key in ['PG', 'QG', 'QM', 'QN']):
                bus = self.bus_dict[row['NUM']]
                generator = Generator(
                    bus=bus,
                    id=gen_counter,
                    name=f"Generator {gen_counter}",
                    pb = self.Sb,
                    p_input=row['PG'],
                    q_input=row['QG'],
                    q_max_input=row['QM'],
                    q_min_input=row['QN'],
                )
                self.generators.append(generator)
                gen_counter += 1
        return self.net
    
    def build_loads(self):
        """
        Build load objects from the DTITU block.
        """
        self.loads = []
        load_counter = 1  # Counter to assign unique IDs to loads

        for _, row in self.df_bar.iterrows():
            # Verifica se há carga
            if row['PL'] != 0 or row['QL'] != 0:
                bus = self.bus_dict[row['NUM']]
                load = Load(
                    bus=bus,
                    id=load_counter,
                    name=f"Load {load_counter}",
                    pb = self.Sb,
                    p_input=row['PL'],
                    q_input=row['QL'],
                )
                self.loads.append(load)
                load_counter += 1
        return self.net

    def build_network(self):
        """
        Build the network by calling the methods to build buses, lines, generators, and loads.
        """
        self.build_buses()
        self.build_lines()
        self.build_generators()
        self.build_loads()
        return self.net

In [214]:
caminho = 'pwf_systems/IEEE14.pwf'
builder = PWF_Network_Builder(caminho)
net = builder.build_network()
solver = AC_PF(net)
solver.solve(verbose=True)

 
=== Iteration 0 === 
BARRA-1: P = 0.1447pu, Q = 0.4547pu, V = 1.0600pu, theta = 0.0000°
BARRA-2: P = 0.1224pu, Q = 0.3287pu, V = 1.0450pu, theta = 0.0000°
BARRA-3: P = -0.0201pu, Q = -0.1578pu, V = 1.0100pu, theta = 0.0000°
BARRA-4: P = -0.0957pu, Q = -0.1540pu, V = 1.0000pu, theta = 0.0000°
BARRA-5: P = -0.1381pu, Q = -0.5232pu, V = 1.0000pu, theta = 0.0000°
BARRA-6: P = 0.4928pu, Q = 0.9890pu, V = 1.0700pu, theta = 0.0000°
BARRA-7: P = 0.0000pu, Q = -0.6185pu, V = 1.0000pu, theta = 0.0000°
BARRA-8: P = 0.0000pu, Q = 0.5569pu, V = 1.0900pu, theta = 0.0000°
BARRA-9: P = 0.0000pu, Q = -0.2475pu, V = 1.0000pu, theta = 0.0000°
BARRA-10: P = -0.0000pu, Q = 0.0000pu, V = 1.0000pu, theta = 0.0000°
BARRA-11: P = -0.1369pu, Q = -0.2866pu, V = 1.0000pu, theta = 0.0000°
BARRA-12: P = -0.1068pu, Q = -0.2223pu, V = 1.0000pu, theta = 0.0000°
BARRA-13: P = -0.2169pu, Q = -0.4272pu, V = 1.0000pu, theta = 0.0000°
BARRA-14: P = 0.0000pu, Q = 0.0000pu, V = 1.0000pu, theta = 0.0000°
 
=== Iteration 1 =

In [208]:
print(solver.V)
print(solver.theta)

[1.06       1.045      1.01       1.01862342 1.02026375 1.07
 1.06195078 1.09       1.056346   1.05132789 1.05708185 1.05522013
 1.05044256 1.03579477]
[-2.52011255e-14 -4.98094880e+00 -1.27179685e+01 -1.03242222e+01
 -8.78257554e+00 -1.42226532e+01 -1.33682479e+01 -1.33682479e+01
 -1.49466010e+01 -1.51043244e+01 -1.47952641e+01 -1.50774237e+01
 -1.51589390e+01 -1.60389345e+01]


## Exemplo de Uso

### Redes Teste

#### 3 Bus

In [50]:
# 3 bus
net1 = Network()
# Criação das barras
buses = [
    Bus(net1, id=1, bus_type='Slack', v=1.0, theta=0.0),
    Bus(net1, id=2, bus_type='PV'), 
    Bus(net1, id=3)
]
# Criação das linhas
lines = [
    Line(id=1, from_bus=buses[0], to_bus=buses[1], r=0.01938, x=0.05917, b_half=0.00264),
    Line(id=2, from_bus=buses[0], to_bus=buses[2], r=0.05403, x=0.22304, b_half=0.00264),
    Line(id=3, from_bus=buses[1], to_bus=buses[2], r=0.04699, x=0.19797, b_half=0.00219),
]
#Criação de Geradores:
generators = [
    Generator(id=1, bus=buses[0]), #Gerador Vtheta
    Generator(id=2, bus=buses[1]), #Compensador Síncrono
]

loads = [
    Load(id=1, bus=buses[1], pb=1000, p_input=240, q_input=120),
    Load(id=2, bus=buses[2], pb=1000, p_input=240, q_input=120)
]

#### IEEE 14 BUS SYSTEM

In [51]:
#14bus
net = Network()

buses = [                                                 
    Bus(net, id= 1, bus_type='Slack', v=1.060, theta=0.0),
    Bus(net, id= 2, bus_type=   'PV', v=1.045, theta=0.0),
    Bus(net, id= 3, bus_type=   'PV', v=1.010, theta=0.0),
    Bus(net, id= 4, bus_type=   'PQ', v=1.000, theta=0.0),
    Bus(net, id= 5, bus_type=   'PQ', v=1.000, theta=0.0),
    Bus(net, id= 6, bus_type=   'PV', v=1.070, theta=0.0),
    Bus(net, id= 7, bus_type=   'PQ', v=1.000, theta=0.0),
    Bus(net, id= 8, bus_type=   'PV', v=1.090, theta=0.0),
    Bus(net, id= 9, bus_type=   'PQ', v=1.000, theta=0.0, Sh=19, Sb=100), ###Faltando shunt!!
    Bus(net, id=10, bus_type=   'PQ', v=1.000, theta=0.0),
    Bus(net, id=11, bus_type=   'PQ', v=1.000, theta=0.0),
    Bus(net, id=12, bus_type=   'PQ', v=1.000, theta=0.0),
    Bus(net, id=13, bus_type=   'PQ', v=1.000, theta=0.0), 
    Bus(net, id=14, bus_type=   'PQ', v=1.000, theta=0.0) 
]

loads = [                                                           
    Load(id= 1, bus=buses[ 1], pb=100, p_input=21.70, q_input=12.70),
    Load(id= 2, bus=buses[ 2], pb=100, p_input=94.20, q_input=19.00),
    Load(id= 3, bus=buses[ 3], pb=100, p_input=47.80, q_input=-3.90),
    Load(id= 4, bus=buses[ 4], pb=100, p_input= 7.60, q_input= 1.60),
    Load(id= 5, bus=buses[ 5], pb=100, p_input=11.20, q_input= 7.50),
    Load(id= 6, bus=buses[ 8], pb=100, p_input=29.50, q_input=16.60),
    Load(id= 7, bus=buses[ 9], pb=100, p_input= 9.00, q_input= 5.80),
    Load(id= 8, bus=buses[10], pb=100, p_input= 3.50, q_input= 1.80),
    Load(id= 9, bus=buses[11], pb=100, p_input= 6.10, q_input= 1.60),
    Load(id=10, bus=buses[12], pb=100, p_input=13.50, q_input= 5.80),
    Load(id=11, bus=buses[13], pb=100, p_input=14.90, q_input= 5.00)
]

#Criação de Geradores:                                        
generators = [                                                
    Generator(id=1, bus=buses[0]),                            
    Generator(id=2, bus=buses[1], pb=100, p_input=40.00),     
    Generator(id=3, bus=buses[2]),                            
    Generator(id=4, bus=buses[4]),                            
    Generator(id=5, bus=buses[5]),                            
    Generator(id=6, bus=buses[7]) 
]

lines = [                                                                                                   
    Line(id= 1, from_bus=buses[ 0], to_bus=buses[ 1], r=0.01938, x=0.05917, b_half=0.0264),                 
    Line(id= 2, from_bus=buses[ 0], to_bus=buses[ 4], r=0.05403, x=0.22304, b_half=0.0246),                 
    Line(id= 3, from_bus=buses[ 1], to_bus=buses[ 2], r=0.04699, x=0.19797, b_half=0.0219),                 
    Line(id= 4, from_bus=buses[ 1], to_bus=buses[ 3], r=0.05811, x=0.17632, b_half=0.0187),                 
    Line(id= 5, from_bus=buses[ 1], to_bus=buses[ 4], r=0.05695, x=0.17388, b_half=0.0170),                 
    Line(id= 6, from_bus=buses[ 2], to_bus=buses[ 3], r=0.06701, x=0.17103, b_half=0.0173),                 
    Line(id= 7, from_bus=buses[ 3], to_bus=buses[ 4], r=0.01335, x=0.04211, b_half=0.0064),                 
    Line(id= 8, from_bus=buses[ 3], to_bus=buses[ 6], r=0.0    , x=0.20912, b_half=0.0    ,tap_ratio=0.978),
    Line(id= 9, from_bus=buses[ 3], to_bus=buses[ 8], r=0.0    , x=0.55618, b_half=0.0    ,tap_ratio=0.969),
    Line(id=10, from_bus=buses[ 4], to_bus=buses[ 5], r=0.0    , x=0.25202, b_half=0.0    ,tap_ratio=0.932),
    Line(id=11, from_bus=buses[ 5], to_bus=buses[10], r=0.09498, x=0.19890, b_half=0.0),                    
    Line(id=12, from_bus=buses[ 5], to_bus=buses[11], r=0.12291, x=0.25581, b_half=0.0),                    
    Line(id=13, from_bus=buses[ 5], to_bus=buses[12], r=0.06615, x=0.13027, b_half=0.0),                    
    Line(id=14, from_bus=buses[ 6], to_bus=buses[ 7], r=0.0    , x=0.17615, b_half=0.0),                    
    Line(id=15, from_bus=buses[ 6], to_bus=buses[ 8], r=0.0    , x=0.11001, b_half=0.0),                    
    Line(id=16, from_bus=buses[ 8], to_bus=buses[ 9], r=0.03181, x=0.08450, b_half=0.0),                    
    Line(id=17, from_bus=buses[ 8], to_bus=buses[13], r=0.12711, x=0.27038, b_half=0.0),                    
    Line(id=18, from_bus=buses[ 9], to_bus=buses[10], r=0.08205, x=0.19207, b_half=0.0),                    
    Line(id=19, from_bus=buses[11], to_bus=buses[12], r=0.22092, x=0.19988, b_half=0.0),                    
    Line(id=20, from_bus=buses[12], to_bus=buses[13], r=0.17093, x=0.34802, b_half=0.0),                    
]

### Rodando o Fluxo de Potência

In [52]:
# Calling the AC_PF class
solver_3bus = AC_PF(net1) # 3 Bus: named net1
solver_14bus = AC_PF(net) #14 Bus: named net

# Solving the power flow
solver_3bus.solve(verbose=True) # verbose=True to print the results
solver_14bus.solve(verbose=True)

 
=== Iteration 0 === 
Bus 1: P = 0.0000pu, Q = -0.0053pu, V = 1.0000pu, theta = 0.0000°
Bus 2: P = -0.0000pu, Q = -0.0048pu, V = 1.0000pu, theta = 0.0000°
Bus 3: P = 0.0000pu, Q = -0.0048pu, V = 1.0000pu, theta = 0.0000°
 
=== Iteration 1 === 
Bus 1: P = 0.4790pu, Q = -0.0611pu, V = 1.0000pu, theta = -0.0000°
Bus 2: P = -0.2399pu, Q = 0.1767pu, V = 1.0000pu, theta = -1.2012°
Bus 3: P = -0.2350pu, Q = -0.1152pu, V = 0.9818pu, theta = -1.9128°
 
=== Iteration 2 === 
Bus 1: P = 0.4843pu, Q = -0.0595pu, V = 1.0000pu, theta = -0.0000°
Bus 2: P = -0.2400pu, Q = 0.1803pu, V = 1.0000pu, theta = -1.2107°
Bus 3: P = -0.2400pu, Q = -0.1200pu, V = 0.9812pu, theta = -1.9425°
 
=== Iteration 3 === 
Bus 1: P = 0.4843pu, Q = -0.0595pu, V = 1.0000pu, theta = -0.0000°
Bus 2: P = -0.2400pu, Q = 0.1803pu, V = 1.0000pu, theta = -1.2107°
Bus 3: P = -0.2400pu, Q = -0.1200pu, V = 0.9812pu, theta = -1.9425°
Converged in 3 iterations.
 
=== Iteration 0 === 
Bus 1: P = 0.1447pu, Q = 0.4547pu, V = 1.0600pu, thet

In [53]:
print(solver_14bus.V)

print(solver_14bus.theta)

[1.06       1.045      1.01       1.01862342 1.02026375 1.07
 1.06195078 1.09       1.056346   1.05132789 1.05708185 1.05522013
 1.05044256 1.03579477]
[-2.47576988e-14 -4.98094880e+00 -1.27179685e+01 -1.03242222e+01
 -8.78257554e+00 -1.42226532e+01 -1.33682479e+01 -1.33682479e+01
 -1.49466010e+01 -1.51043244e+01 -1.47952641e+01 -1.50774237e+01
 -1.51589390e+01 -1.60389345e+01]


In [54]:
print(solver_3bus.V)


[1.         1.         0.98118448]


In [55]:
Y = net.y_bus()

In [56]:
Y = pd.DataFrame(Y)
Y

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13
0,6.025029-19.447070j,-4.999132+15.263087j,0.000000+0.000000j,0.000000+ 0.000000j,-1.025897+ 4.234984j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.00000+0.00000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+0.000000j
1,-4.999132+15.263087j,9.521324-30.270715j,-1.135019+4.781863j,-1.686033+ 5.115838j,-1.701140+ 5.193927j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.00000+0.00000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+0.000000j
2,0.000000+ 0.000000j,-1.135019+ 4.781863j,3.120995-9.811480j,-1.985976+ 5.068817j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.00000+0.00000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+0.000000j
3,0.000000+ 0.000000j,-1.686033+ 5.115838j,-1.985976+5.068817j,10.512990-38.635171j,-6.840981+21.578554j,0.000000+ 0.000000j,0.000000+ 4.889513j,0.00000+0.00000j,0.000000+ 1.855500j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+0.000000j
4,-1.025897+ 4.234984j,-1.701140+ 5.193927j,0.000000+0.000000j,-6.840981+21.578554j,9.568018-35.527539j,0.000000+ 4.257445j,0.000000+ 0.000000j,0.00000+0.00000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+0.000000j
5,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+ 4.257445j,6.579923-17.340733j,0.000000+ 0.000000j,0.00000+0.00000j,0.000000+ 0.000000j,0.000000+ 0.000000j,-1.955029+4.094074j,-1.525967+3.175964j,-3.098927+ 6.102755j,0.000000+0.000000j
6,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+ 4.889513j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000-19.549006j,0.00000+5.67698j,0.000000+ 9.090083j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+0.000000j
7,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+ 5.676980j,0.00000-5.67698j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+0.000000j
8,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+ 1.855500j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+ 9.090083j,0.00000+0.00000j,5.326055-24.092506j,-3.902050+10.365394j,0.000000+0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,-1.424005+3.029050j
9,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.000000+ 0.000000j,0.00000+0.00000j,-3.902050+10.365394j,5.782934-14.768338j,-1.880885+4.402944j,0.000000+0.000000j,0.000000+ 0.000000j,0.000000+0.000000j
