In [None]:
# PTDF-Rechnung auf dem gegebenen Mini-Graphen
# ------------------------------------------------------------
# Annahmen:
# - DC-Power-Flow (Spannungen = 1 p.u., nur Wirkleistung, Leitungswiderstände ~ 0)
# - Reaktanzen X_total_ohm werden über Basisgrößen in p.u. umgerechnet
# - Slack-Bus: hier "N1" (kann unten geändert werden)
# - Beispiel-Injektionen P (MW) werden unten gesetzt; Summe != 0 wird via Slack ausgeglichen
#
# Ausgaben:
# - Knoten-/Leitungs-Tabellen
# - Inzidenzmatrix A
# - Leitungs-Suszeptanzen b_l (pu)
# - Bus-Suszeptanzmatrix B, reduzierte Brr, Inverse
# - PTDF (Zeilen: Leitungen, Spalten: Knoten ohne Slack)
# - Spannungswinkel θ (rad)
# - Leitungsflüsse (pu und MW) aus Beispiel-ΔP


### Einlesen des Graphen und base definition:

In [None]:

import json
import numpy as np
from pprint import pprint

# -------- Eingabe: Graph (JSON) ------------------------------------------------
json_path = r"C:\Users\M97947\OneDrive - E.ON\Dokumente\Thesis\Code\fca_leistungsbaender\exploratory\graph\toyexamplegraph.json"

with open(json_path, "r", encoding="utf-8") as f:
        graph_json =json.load(f)

# -------- Basisgrößen (für Umrechnung Ohm -> p.u.) -----------------------------
S_base_MVA = 100.0           # System Base Power in MVA
V_base_kV  = 110.0           # (angenommene) System Base Voltage (L-L) in kV

# Ohm -> p.u.:  X_pu = X_ohm * (S_base / V_base^2)
# mit S_base in MVA, V_base in kV, ergibt p.u. kompatibel
def ohm_to_pu(X_ohm, S_base_MVA, V_base_kV):
    return X_ohm * (S_base_MVA / (V_base_kV**2))

### Knoten und Kanten extrahieren

In [None]:
nodes = []
edges = []

for item in graph_json:
    data = item["data"]
    if "source" in data and "target" in data:
        edges.append({
            "id": data["id"],
            "label": data.get("label", data["id"]),
            "source": data["source"],
            "target": data["target"],
            "X_total_ohm": data["features"]["X_total_ohm"],
            "limit_A": data["features"]["Strom_Limit_in_A"]
        })
    else:
        nodes.append({
            "id": data["id"],
            "label": data.get("label", data["id"]),
            "type": data.get("type", "node"),
            "limit_A": data.get("features", {}).get("Strom_Limit_in_A", None)
        })

# Sortiere Knoten stabil nach ID für reproduzierbare Matrizen
nodes = sorted(nodes, key=lambda x: x["id"])
edges = sorted(edges, key=lambda x: x["id"])

print("\n=== Knoten (Buses) ===")
pprint(nodes)

print("\n=== Leitungen (Edges) ===")
pprint(edges)


# Knotenindex (Mapping) 
bus_ids = [n["id"] for n in nodes]
bus_index = {bus_id: idx for idx, bus_id in enumerate(bus_ids)}


=== Knoten (Buses) ===
[{'id': 'K1', 'label': 'K1', 'limit_A': None, 'type': 'junction'},
 {'id': 'N1', 'label': 'N1', 'limit_A': 800, 'type': 'uw_field'},
 {'id': 'N2', 'label': 'N2', 'limit_A': 800, 'type': 'uw_field'},
 {'id': 'N3', 'label': 'N3', 'limit_A': 800, 'type': 'uw_field'}]

=== Leitungen (Edges) ===
[{'X_total_ohm': 1,
  'id': 'E1',
  'label': 'E1',
  'limit_A': 400,
  'source': 'N1',
  'target': 'K1'},
 {'X_total_ohm': 2,
  'id': 'E2',
  'label': 'E2',
  'limit_A': 400,
  'source': 'K1',
  'target': 'N2'},
 {'X_total_ohm': 1.5,
  'id': 'E3',
  'label': 'E3',
  'limit_A': 500,
  'source': 'K1',
  'target': 'N3'}]


### Inzidenzmatrix A (m x n) 


In [6]:
# Konvention: +1 am "source"-Bus, -1 am "target"-Bus (konsistent beibehalten)
m = len(edges)
n = len(nodes)
A = np.zeros((m, n), dtype=float)

for ell, e in enumerate(edges):
    i = bus_index[e["source"]]
    j = bus_index[e["target"]]
    A[ell, i] =  1.0
    A[ell, j] = -1.0

print("\n=== Inzidenzmatrix A (m x n) ===")
print(A)


=== Inzidenzmatrix A (m x n) ===
[[-1.  1.  0.  0.]
 [ 1.  0. -1.  0.]
 [ 1.  0.  0. -1.]]


### Leitungsreaktanzen (Ohm) -> (p.u.) & Suszeptanzen b_l 


In [7]:
X_ohm = np.array([e["X_total_ohm"] for e in edges], dtype=float)
X_pu  = ohm_to_pu(X_ohm, S_base_MVA, V_base_kV)
b_l   = 1.0 / X_pu  # Leitungs-Suszeptanz (p.u.)

print("\n=== Reaktanzen X (Ohm) ===")
print(X_ohm)
print("\n=== Reaktanzen X (p.u.) ===")
print(X_pu)
print("\n=== Leitungs-Suszeptanzen b_l (1/p.u.) ===")
print(b_l)



=== Reaktanzen X (Ohm) ===
[1.  2.  1.5]

=== Reaktanzen X (p.u.) ===
[0.00826446 0.01652893 0.01239669]

=== Leitungs-Suszeptanzen b_l (1/p.u.) ===
[121.          60.5         80.66666667]


### Bus-Suszeptanzmatrix B = A^T * diag(b_l) * A 

In [None]:
B = A.T @ np.diag(b_l) @ A

print("\n=== Bus-Suszeptanzmatrix B (n x n) ===")
print(B)


=== Bus-Suszeptanzmatrix B (n x n) ===
[[ 262.16666667 -121.          -60.5         -80.66666667]
 [-121.          121.            0.            0.        ]
 [ -60.5           0.           60.5           0.        ]
 [ -80.66666667    0.            0.           80.66666667]]


### Slack wählen & reduzieren 

In [9]:
slack_bus = "N1"                    
slack_idx = bus_index[slack_bus]

non_slack_indices = [k for k in range(n) if k != slack_idx]
Brr = B[np.ix_(non_slack_indices, non_slack_indices)]

print(f"\n=== Slack-Bus: {slack_bus} (Index {slack_idx}) ===")
print("\n=== Reduzierte Suszeptanzmatrix Brr ===")
print(Brr)

# Inverse von Brr
Brr_inv = np.linalg.inv(Brr)
print("\n=== Inverse von Brr (Brr^{-1}) ===")
print(Brr_inv)



=== Slack-Bus: N1 (Index 1) ===

=== Reduzierte Suszeptanzmatrix Brr ===
[[262.16666667 -60.5        -80.66666667]
 [-60.5         60.5          0.        ]
 [-80.66666667   0.          80.66666667]]

=== Inverse von Brr (Brr^{-1}) ===
[[0.00826446 0.00826446 0.00826446]
 [0.00826446 0.02479339 0.00826446]
 [0.00826446 0.00826446 0.02066116]]


### PTDF berechnen 

In [10]:

# PTDF = diag(b_l) @ A[:, non_slack] @ Brr_inv
A_nz = A[:, non_slack_indices]
PTDF = (np.diag(b_l) @ A_nz) @ Brr_inv  # (m x (n-1))

# Für lesbare Ausgabe: Leitungen als Zeilen, Nicht-Slack-Knoten als Spalten
ptdf_cols = [bus_ids[i] for i in non_slack_indices]
print("\n=== PTDF (Zeilen: Leitungen, Spalten: Knoten ohne Slack) ===")
print("Spalten:", ptdf_cols)
print(PTDF)


=== PTDF (Zeilen: Leitungen, Spalten: Knoten ohne Slack) ===
Spalten: ['K1', 'N2', 'N3']
[[-1.00000000e+00 -1.00000000e+00 -1.00000000e+00]
 [-1.99493200e-17 -1.00000000e+00 -1.38777878e-17]
 [-9.46595113e-17 -9.46595113e-17 -1.00000000e+00]]


### Beispiel

In [1]:
import json
import numpy as np
from collections import defaultdict

# ============== Eingaben / Pfade (wie zuvor) ===================================
graph_json_path = r"C:\Users\M97947\OneDrive - E.ON\Dokumente\Thesis\Code\fca_leistungsbaender\exploratory\graph\example2_withBess.json"
S_base_MVA = 100.0
slack_id = "N1"

# Knoten-Leistungen in MW (K1 = Verbindungsknoten → 0 MW)
P_MW = {"N1": +50.0, "N2": -30.0, "N3": -20.0, "K1": 0.0}

# Default-Leiterspannung, wenn keine Angaben im JSON (kV, Leiterspannung L-L)
V_kV_default = 110.0


# ============== Graph laden =====================================================
with open(graph_json_path, "r", encoding="utf-8") as f:
    graph = json.load(f)

nodes = []
edges = []
node_types = {}
node_features = {}
for item in graph:
    if "data" in item:
        d = item["data"]
        if "source" in d and "target" in d:
            edges.append(d)
        else:
            nid = d["id"]
            nodes.append(nid)
            node_types[nid] = d.get("type", "")
            node_features[nid] = d.get("features", {}) or {}

# Fehlende P auf 0 setzen
for n in nodes:
    P_MW.setdefault(n, 0.0)

# ============== Reaktanzen je Leitung (Ohm) ====================================
def edge_X_total_ohm(e):
    feat = e.get("features", {}) or {}
    if "X_total_ohm" in feat and feat["X_total_ohm"] is not None:
        return float(feat["X_total_ohm"])
    x_per_km = float(feat.get("X_ohm_per_km", 0.0))
    length_km = float(feat.get("length_km", 0.0))
    return x_per_km * length_km

lines = []
for e in edges:
    u, v = e["source"], e["target"]
    X = edge_X_total_ohm(e)
    if X <= 0:
        raise ValueError(f"Nichtpositive Reaktanz auf Leitung {e.get('id','?')}: X={X}")
    lines.append((u, v, X, e.get("id", f"{u}-{v}"), e.get("features", {}) or {}))

# ============== B-Matrix aufbauen ==============================================
n = len(nodes)
node_index = {nid: i for i, nid in enumerate(nodes)}
B = np.zeros((n, n), dtype=float)

# Off-Diagonale
for (u, v, X, eid, feat) in lines:
    i, j = node_index[u], node_index[v]
    b = 1.0 / X
    B[i, j] -= b
    B[j, i] -= b

# Diagonale
for i in range(n):
    B[i, i] = -np.sum(B[i, :])

print("=== Knotenreihenfolge ===")
print(nodes)
print("\n=== B-Matrix ===")
print(B)

# ============== Reduzierte B_rr & P_r (p.u.) ===================================
if slack_id not in node_index:
    raise ValueError(f"Slack-Knoten '{slack_id}' fehlt in nodes.")

slack_idx = node_index[slack_id]
non_slack_idx = [i for i in range(n) if i != slack_idx]
non_slack_nodes = [nodes[i] for i in non_slack_idx]

B_rr = B[np.ix_(non_slack_idx, non_slack_idx)]
P_r_pu = np.array([P_MW[nid] / S_base_MVA for nid in non_slack_nodes], dtype=float)

print("\n=== Slack-Knoten ===")
print(slack_id)
print("\n=== Nicht-Slack-Knoten (Reihenfolge) ===")
print(non_slack_nodes)
print("\n=== P_r (MW) in Nicht-Slack-Reihenfolge ===")
print({k: P_MW[k] for k in non_slack_nodes})
print("\n=== P_r (p.u.) ===")
print(P_r_pu)
print("\n=== B_rr ===")
print(B_rr)

# ============== Winkel lösen ====================================================
theta_r = np.linalg.solve(B_rr, P_r_pu)
theta = np.zeros(n, dtype=float)
for idx, ang in zip(non_slack_idx, theta_r):
    theta[idx] = ang
theta[slack_idx] = 0.0

theta_dict = {nodes[i]: theta[i] for i in range(n)}
print("\n=== Spannungswinkel θ (rad) je Knoten (Slack=0) ===")
print(theta_dict)

# ============== Leitungsflüsse (MW / p.u.) =====================================
flows_pu, flows_MW = {}, {}
for (u, v, X, eid, feat) in lines:
    i, j = node_index[u], node_index[v]
    F_pu = (theta[i] - theta[j]) / X
    F_MW = F_pu * S_base_MVA
    flows_pu[eid] = F_pu
    flows_MW[eid] = F_MW

print("\n=== Leitungsflüsse ΔF (p.u.) ===")
print(flows_pu)
print("\n=== Leitungsflüsse ΔF (MW) ===")
print(flows_MW)

# ============== Knotenbilanz-Check =============================================
P_recon_MW = defaultdict(float)
for (u, v, X, eid, feat) in lines:
    i, j = node_index[u], node_index[v]
    F_MW = (theta[i] - theta[j]) / X * S_base_MVA
    P_recon_MW[u] += F_MW
    P_recon_MW[v] -= F_MW

P_recon_ordered = {k: P_recon_MW[k] for k in nodes}
print("\n=== Check: Aus Flüssen rekonstruierte Knoten-P (MW) ===")
print(P_recon_ordered)
print("\nSumme rekonstruierter P (MW):", sum(P_recon_ordered.values()))

# ============== Hilfsfunktionen: Spannungen & Ströme ============================
def edge_voltage_kV(u, v, edge_feat):
    """
    Ermittelt die zu verwendende Spannung (kV) für eine Leitung:
      1) edge.features["Voltage_kV"] falls vorhanden
      2) falls beide Endknoten dieselbe node.features["Voltage_kV"] besitzen → diese
      3) sonst V_kV_default
    """
    if "Voltage_kV" in edge_feat and edge_feat["Voltage_kV"] is not None:
        try:
            return float(edge_feat["Voltage_kV"])
        except:
            pass

    uV = node_features.get(u, {}).get("Voltage_kV", None)
    vV = node_features.get(v, {}).get("Voltage_kV", None)
    try:
        uV = float(uV) if uV is not None else None
    except:
        uV = None
    try:
        vV = float(vV) if vV is not None else None
    except:
        vV = None

    if (uV is not None) and (vV is not None) and abs(uV - vV) < 1e-6:
        return uV
    return V_kV_default

def mw_to_amp(P_MW, V_kV):
    """
    Schätzt Leiterstrom [A] aus Wirkleistung [MW] und Leiterspannung [kV] (Dreiphasig, cosφ≈1):
        I ≈ P / (√3 * U_LL)
      mit P in W, U_LL in V → I in A
    """
    if V_kV is None or V_kV <= 0:
        V_kV = V_kV_default
    P_W = abs(P_MW) * 1e6
    U_V = V_kV * 1e3
    I_A = P_W / (np.sqrt(3.0) * U_V)
    return I_A

# ============== Leitungs-Auslastung ============================================
edge_util = []  # Liste von dicts: id, P_MW, I_A, limit_A, util_%, voltage_kV
for (u, v, X, eid, feat) in lines:
    V_kV = edge_voltage_kV(u, v, feat)
    P_MW_abs = abs(flows_MW[eid])
    I_A = mw_to_amp(P_MW_abs, V_kV)
    limit_A = feat.get("Strom_Limit_in_A", None)
    util = None
    if limit_A is not None:
        try:
            limit_A = float(limit_A)
            util = (I_A / limit_A * 100.0) if limit_A > 0 else None
        except:
            limit_A = None

    edge_util.append({
        "edge_id": eid,
        "u": u,
        "v": v,
        "V_kV": V_kV,
        "P_MW_abs": P_MW_abs,
        "I_A": I_A,
        "limit_A": limit_A,
        "util_percent": util
    })

# Sortiert nach höchster Auslastung
edge_util_sorted = sorted(edge_util, key=lambda d: (d["util_percent"] if d["util_percent"] is not None else -1), reverse=True)

print("\n=== Leitungs-Auslastung (nach % sortiert) ===")
for d in edge_util_sorted:
    eid = d["edge_id"]
    util_str = f"{d['util_percent']:.1f}%" if d["util_percent"] is not None else "n/a"
    lim_str = f"{d['limit_A']:.1f} A" if d["limit_A"] is not None else "n/a"
    print(f"{eid:>6s}: |P|={d['P_MW_abs']:.2f} MW,  U≈{d['V_kV']:.1f} kV,  I≈{d['I_A']:.1f} A,  Limit={lim_str},  Util={util_str}")

# Warnungen für Überschreitungen
viol_edges = [d for d in edge_util if (d["util_percent"] is not None and d["util_percent"] > 100.0)]
if viol_edges:
    print("\n!!! Warnung: Leitungs-Limits überschritten auf:")
    for d in viol_edges:
        print(f"  - {d['edge_id']} ({d['u']}–{d['v']}): {d['util_percent']:.1f}% von {d['limit_A']:.1f} A")

# ============== Feld-/Knoten-Auslastung ========================================
# Annahme: Feldlimit meint den maximal zul. Strom an diesem UW-Feld.
# Wir nehmen dazu das Maximum der anliegenden Leitungsströme am Knoten.
# (Im Beispiel N1,N2,N3 jeweils nur eine Anbindung → entspricht der Leitungsstromstärke.)
node_util = []
# Map von (u,v)->Strom an u bzw. v
edge_current_A = {}
for (u, v, X, eid, feat) in lines:
    V_kV = edge_voltage_kV(u, v, feat)
    I_A = mw_to_amp(flows_MW[eid], V_kV)  # Vorzeichen egal; nehmen Betrag je Knoten separat
    edge_current_A[(u, eid)] = abs(I_A)
    edge_current_A[(v, eid)] = abs(I_A)

for nid in nodes:
    feat = node_features.get(nid, {}) or {}
    limit_A = feat.get("Strom_Limit_in_A", None)
    # sammle alle anliegenden Kanten
    incident_I = []
    for (u, v, X, eid, efeat) in lines:
        if nid == u and (u, eid) in edge_current_A:
            incident_I.append(edge_current_A[(u, eid)])
        elif nid == v and (v, eid) in edge_current_A:
            incident_I.append(edge_current_A[(v, eid)])

    max_I = max(incident_I) if incident_I else 0.0
    util = None
    if limit_A is not None:
        try:
            limit_A = float(limit_A)
            util = (max_I / limit_A * 100.0) if limit_A > 0 else None
        except:
            limit_A = None

    node_util.append({
        "node": nid,
        "type": node_types.get(nid, ""),
        "max_incident_I_A": max_I,
        "limit_A": limit_A,
        "util_percent": util
    })

node_util_sorted = sorted(node_util, key=lambda d: (d["util_percent"] if d["util_percent"] is not None else -1), reverse=True)

print("\n=== Feld-/Knoten-Auslastung (max. anliegender Strom je Knoten, nach % sortiert) ===")
for d in node_util_sorted:
    util_str = f"{d['util_percent']:.1f}%" if d["util_percent"] is not None else "n/a"
    lim_str = f"{d['limit_A']:.1f} A" if d["limit_A"] is not None else "n/a"
    print(f"{d['node']:>6s} ({d['type'] or 'n/a'}): I_max≈{d['max_incident_I_A']:.1f} A,  Limit={lim_str},  Util={util_str}")

viol_nodes = [d for d in node_util if (d["util_percent"] is not None and d["util_percent"] > 100.0)]
if viol_nodes:
    print("\n!!! Warnung: Feld-/Knoten-Limits überschritten auf:")
    for d in viol_nodes:
        print(f"  - {d['node']}: {d['util_percent']:.1f}% von {d['limit_A']:.1f} A")


=== Knotenreihenfolge ===
['N1', 'N2', 'N3', 'K1', 'K2', 'Batt']

=== B-Matrix ===
[[ 1.          0.          0.         -1.          0.          0.        ]
 [ 0.          0.5         0.         -0.5         0.          0.        ]
 [ 0.          0.          0.66666667 -0.66666667  0.          0.        ]
 [-1.         -0.5        -0.66666667  2.83333333 -0.66666667  0.        ]
 [ 0.          0.          0.         -0.66666667  1.66666667 -1.        ]
 [ 0.          0.          0.          0.         -1.          1.        ]]

=== Slack-Knoten ===
N1

=== Nicht-Slack-Knoten (Reihenfolge) ===
['N2', 'N3', 'K1', 'K2', 'Batt']

=== P_r (MW) in Nicht-Slack-Reihenfolge ===
{'N2': -30.0, 'N3': -20.0, 'K1': 0.0, 'K2': 0.0, 'Batt': 0.0}

=== P_r (p.u.) ===
[-0.3 -0.2  0.   0.   0. ]

=== B_rr ===
[[ 0.5         0.         -0.5         0.          0.        ]
 [ 0.          0.66666667 -0.66666667  0.          0.        ]
 [-0.5        -0.66666667  2.83333333 -0.66666667  0.        ]
 [ 0.    

In [2]:
# ============== PTDF-Analyse ====================================================
# Hinweis zu Einheiten:
# - Du hast P_r_pu = P_MW / S_base_MVA verwendet und f_pu = b*(θ_i-θ_j) mit b=1/X (X in Ohm).
# - Dann gilt:  Δf_MW = PTDF * ΔP_MW  (die Matrix PTDF ist dimensionslos "MW pro MW").

print("\n=== PTDF-Analyse ===")

# 1) Hilfsindizes
m_lines = len(lines)
k_nodes = len(non_slack_nodes)
node_pos = {nid: i for i, nid in enumerate(nodes)}  # Knoten → Index gesamt
ns_pos = {nid: j for j, nid in enumerate(non_slack_nodes)}  # Nicht-Slack-Knoten → Spaltenindex in A_r

# 2) Reduzierte Inzidenzmatrix A_r (Zeilen=Leitungen, Spalten=Knoten ohne Slack)
A_r = np.zeros((m_lines, k_nodes), dtype=float)
for ell, (u, v, X, eid, feat) in enumerate(lines):
    if u != slack_id:
        A_r[ell, ns_pos[u]] = +1.0
    if v != slack_id:
        A_r[ell, ns_pos[v]] = -1.0

# 3) Leitungs-Suszeptanzmatrix B_ell = diag(1/X)
b_vec = np.array([1.0 / X for (_, _, X, _, _) in lines], dtype=float)
B_ell = np.diag(b_vec)

# 4) PTDF-Matrix berechnen (m_lines x k_nodes)
#    Achtung: B_rr ist bereits (N-1 x N-1) und invertierbar.
PTDF = B_ell @ A_r @ np.linalg.inv(B_rr)

print("Formen:  B_ell", B_ell.shape, "  A_r", A_r.shape, "  B_rr", B_rr.shape, "  PTDF", PTDF.shape)
print("\n=== PTDF (MW pro MW) – Ausschnitt ===")
# Kleiner Ausschnitt: erste 5 Leitungen × alle Nicht-Slack-Knoten
for ell in range(min(5, m_lines)):
    row = {non_slack_nodes[j]: float(PTDF[ell, j]) for j in range(k_nodes)}
    print(f"Leitung {lines[ell][3]}:", row)

# 5) Komfort: Liste der BESS-Knoten (sofern im JSON als type 'battery'/'BESS' o.ä. markiert)
bess_like = {"battery", "BESS", "bess", "Battery"}
bess_nodes = [nid for nid in non_slack_nodes if (node_types.get(nid, "") in bess_like)]

print("\n=== BESS-Knoten (Nicht-Slack) ===")
print(bess_nodes if bess_nodes else "(keine markierten BESS-Knoten unter den Nicht-Slack-Knoten)")

# 6) PTDF-Spalten für BESS-Knoten ausgeben (Sensitivität je Leitung bei +1 MW am BESS gegen Slack)
if bess_nodes:
    for nid in bess_nodes:
        j = ns_pos[nid]
        col = PTDF[:, j]
        # Als Liste: [(edge_id, ptdf_ℓ,nid), ...]
        col_list = [(lines[ell][3], float(col[ell])) for ell in range(m_lines)]
        print(f"\n--- PTDF-Spalte für BESS {nid} (MW pro MW) ---")
        for eid, val in col_list:
            print(f"{eid:>10s}: {val:+.4f}")
else:
    # Falls du alle Knoten sehen willst:
    pass

# 7) Funktion: Δf für beliebiges ΔP am (Nicht-Slack-)Knoten (ΔP in MW, Ergebnis in MW je Leitung)
def delta_flows_for(node_id: str, deltaP_MW: float):
    """
    Liefert dict {edge_id: Δf_MW} für eine Leistungsänderung am Knoten (gegen Slack).
    Nur für Nicht-Slack-Knoten definiert.
    """
    if node_id == slack_id:
        raise ValueError("ΔP am Slack ist in dieser Konvention nicht definiert (Slack kompensiert).")
    if node_id not in ns_pos:
        raise ValueError(f"Knoten {node_id} ist kein Nicht-Slack-Knoten in dieser Rechnung.")
    j = ns_pos[node_id]
    delta_f_MW = PTDF[:, j] * deltaP_MW   # (m_lines,) MW
    return {lines[ell][3]: float(delta_f_MW[ell]) for ell in range(m_lines)}

# Beispiel: Δf bei +5 MW am ersten verfügbaren BESS (falls vorhanden)
if bess_nodes:
    test_bess = bess_nodes[0]
    df = delta_flows_for(test_bess, +5.0)
    print(f"\n=== Beispiel Δf bei +5 MW am BESS {test_bess} (MW je Leitung) ===")
    for eid, dfmw in df.items():
        print(f"{eid:>10s}: {dfmw:+.3f} MW")

# 8) (Optional) Aus PTDF + Stromlimits eine grobe zulässige ΔP-Bandbreite ableiten
#    Wir projizieren Δf_MW → ΔI_A über I ≈ P/(√3 U). Das ist grob (cosφ≈1), genügt für eine erste Bandabschätzung.
def headroom_mw_to_amp(deltaP_MW_per_line):
    """Hilfsfunktion: map Δf_MW je Leitung-id -> ΔI_A je Leitung-id (Betrag, Näherung)."""
    out = {}
    for (u, v, X, eid, feat) in lines:
        V_kV = edge_voltage_kV(u, v, feat)
        dP = abs(deltaP_MW_per_line.get(eid, 0.0))
        out[eid] = mw_to_amp(dP, V_kV)
    return out

def max_deltaP_for_node_wrt_line_limits(node_id: str, direction: float = +1.0):
    """
    Bestimmt eine konservative maximal zulässige ΔP (MW) für den Knoten, sodass keine Leitungs-Stromlimits verletzt werden.
    direction: +1.0 (Einspeisen/Entladen) oder -1.0 (Aufnahme/Laden)
    Rückgabe: (ΔP_max_MW, begrenzende_Leitungen[list])
    """
    # Sammle pro Leitung: aktueller Strom (aus flows_MW), Limit_A
    limiting = []
    # PTDF-Koeffizienten (MW/MW) je Leitung für diesen Knoten
    if node_id not in ns_pos:
        raise ValueError(f"Knoten {node_id} ist kein Nicht-Slack-Knoten.")
    j = ns_pos[node_id]
    ptdf_col = PTDF[:, j] * direction  # wir betrachten nur diese Richtung

    # Für jede Leitung berechnen wir, wie groß ΔP max sein darf, bevor I_limit erreicht wird.
    candidates = []
    for (u, v, X, eid, feat) in lines:
        limit_A = feat.get("Strom_Limit_in_A", None)
        if limit_A is None:
            continue  # keine Begrenzung hinterlegt
        # aktueller Strom
        V_kV = edge_voltage_kV(u, v, feat)
        I_now = mw_to_amp(abs(flows_MW[eid]), V_kV)
        I_free = max(0.0, float(limit_A) - I_now)  # Rest-Spielraum in Ampere

        # Δf pro 1 MW an node_id
        df_per_MW = ptdf_col[[e[3] for e in lines].index(eid)]  # MW pro MW
        # ΔI pro 1 MW
        dI_per_MW = mw_to_amp(abs(df_per_MW), V_kV)  # Ampere pro MW (Näherung)
        if dI_per_MW <= 0:
            continue  # diese Leitung begrenzt nicht in dieser Richtung
        # erlaubte ΔP, bevor Limit erreicht wird
        dP_max_line = I_free / dI_per_MW  # MW
        candidates.append((eid, dP_max_line))

    if not candidates:
        return float("inf"), []

    # globales Minimum ist die System-Grenze
    dP_max = min(val for _, val in candidates)
    limiting = [eid for eid, val in candidates if abs(val - dP_max) < 1e-9]
    return dP_max, limiting

# Beispiel: ΔP_max (Entladen +) für ersten BESS (falls Limits gepflegt sind)
if bess_nodes:
    dP_max_plus, crit_plus = max_deltaP_for_node_wrt_line_limits(bess_nodes[0], direction=+1.0)
    dP_max_minus, crit_minus = max_deltaP_for_node_wrt_line_limits(bess_nodes[0], direction=-1.0)
    print(f"\n=== Zulässige ΔP am BESS {bess_nodes[0]} (grob, aus Stromlimits) ===")
    print(f"Entladen  (+): ΔP_max ≈ {dP_max_plus:.2f} MW, begrenzend: {crit_plus or '-'}")
    print(f"Aufnahme (−): ΔP_max ≈ {dP_max_minus:.2f} MW, begrenzend: {crit_minus or '-'}")



=== PTDF-Analyse ===
Formen:  B_ell (5, 5)   A_r (5, 5)   B_rr (5, 5)   PTDF (5, 5)

=== PTDF (MW pro MW) – Ausschnitt ===
Leitung E1: {'N2': -1.0000000000000002, 'N3': -1.0000000000000002, 'K1': -1.0000000000000002, 'K2': -1.0, 'Batt': -1.0}
Leitung E2: {'N2': -0.9999999999999999, 'N3': 0.0, 'K1': 0.0, 'K2': 0.0, 'Batt': 0.0}
Leitung E3: {'N2': 1.4802973661668753e-16, 'N3': -0.9999999999999998, 'K1': 1.4802973661668753e-16, 'K2': 0.0, 'Batt': 0.0}
Leitung E4: {'N2': 3.7007434154171876e-17, 'N3': 3.7007434154171876e-17, 'K1': 3.7007434154171876e-17, 'K2': 1.0, 'Batt': 1.0}
Leitung E5: {'N2': 0.0, 'N3': 0.0, 'K1': 0.0, 'K2': 0.0, 'Batt': 1.0}

=== BESS-Knoten (Nicht-Slack) ===
['Batt']

--- PTDF-Spalte für BESS Batt (MW pro MW) ---
        E1: -1.0000
        E2: +0.0000
        E3: +0.0000
        E4: +1.0000
        E5: +1.0000

=== Beispiel Δf bei +5 MW am BESS Batt (MW je Leitung) ===
        E1: -5.000 MW
        E2: +0.000 MW
        E3: +0.000 MW
        E4: +5.000 MW
        E5

# Example smallest actual graph