# Quantum Neural Networks for Overload Detection and Power Flow Optimization

## UTK REU 2025
### Gianna Baker

<div>
<img src="tennlogo.jpeg", width="500"/>
</div>

# Import Statements

In [None]:
import pandapower.networks as pn
import pandapower as pp
from pandapower.pypower.makeYbus import makeYbus
import numpy as np
import pandas as pd
from itertools import product
from sklearn.model_selection import train_test_split
import pennylane as qml
from pennylane import numpy as pnp
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from collections import defaultdict
import torch
from torch import nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from collections import Counter

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Data Generation

In [None]:
def calcOverload_from_net(net, capacities):
    loading = net.res_line.loading_percent.values
    return (loading > capacities).astype(int)

# --- Helper to generate random P & Q (in per unit) ---
def gen_signed_pq():
    gen_P = np.random.uniform(0.5, 3.5)     # reduced upper bound
    gen_Q = np.random.uniform(0.2, 2.0)

    p2 = -np.random.uniform(2.0, 4.5)        # smaller loads
    q2 = -np.random.uniform(0.5, 1.8)

    p3 = -np.random.uniform(2.0, 4.5)
    q3 = -np.random.uniform(0.5, 1.8)

    return np.array([gen_P, gen_Q, p2, q2, p3, q3])

# --- Create a new net and update bus injections ---
def inject_and_run_pf(pq_vector, capacities = 100):
    gen_P, gen_Q, p2, q2, p3, q3 = pq_vector
    net = pn.case4gs()


    try:
        # Scale back to MW/MVar assuming 100 baseMVA
        net.gen.at[0, 'p_mw'] = gen_P * 100
        net.gen.at[0, 'q_mvar'] = gen_Q * 100

        net.load.at[0, 'p_mw'] = -p2 * 100
        net.load.at[0, 'q_mvar'] = -q2 * 100
        net.load.at[1, 'p_mw'] = -p3 * 100
        net.load.at[1, 'q_mvar'] = -q3 * 100

        # Optional: push line capacity limits lower to provoke overloads
        net.line['max_loading_percent'] = capacities

        pp.runpp(net)
        #print(net.res_line.loading_percent)

        result = calcOverload_from_net(net, capacities)

        return result

    except:
        return None


# --- Main loop for pattern matching ---
all_patterns = list(product([0, 1], repeat=4))
pattern_weights = {
    (0, 0, 0, 0): 1.3,
    (0, 0, 0, 1): 1.3,
    (0, 0, 1, 0): 1.5,
    (0, 0, 1, 1): 2.0,  # lower accuracy → more emphasis
    (0, 1, 0, 0): 1.0,
    (0, 1, 0, 1): 0.8,
    (0, 1, 1, 0): 1.3,
    (0, 1, 1, 1): 2.0,
    (1, 0, 0, 0): 1.0,
    (1, 0, 0, 1): 1.7,
    (1, 0, 1, 0): 1.1,
    (1, 0, 1, 1): 2.0,
    (1, 1, 0, 0): 1.0,
    (1, 1, 0, 1): 1.2,
    (1, 1, 1, 0): 2.0,
    (1, 1, 1, 1): 1.3,
} # Use your preferred weight dict here

base_target = 100
X = np.load('/content/drive/My Drive/XFINAL.npy').tolist()
Y = np.load('/content/drive/My Drive/YFINAL.npy').tolist()
Y_tuples = [tuple(y) for y in Y]
existing_counts = Counter(Y_tuples)

for target in all_patterns:
    target_np = np.array(target)
    count = existing_counts.get(target, 0)  # START with already existing
    tries = 0
    adjusted_target = int(base_target * pattern_weights.get(target, 1.0))
    adjusted_target = max(adjusted_target, 10)

    print(f"🎯 Target pattern {target} → samples: {adjusted_target}")

    while count < adjusted_target and tries < 1000:
        pq_vec = gen_signed_pq()
        result = inject_and_run_pf(pq_vec, [70,72,63,36])

        if result is None:
            tries += 1
            continue


        #print(result)
        if np.array_equal(result, target_np):
            X.append(pq_vec)
            Y.append(result)
            count += 1
            print(f"  ✔ Match {count}/{adjusted_target} → {pq_vec}")

        tries += 1

    if count < 10:
        print(f"⚠️ Only found {count}/10 for pattern {target} after {tries} tries.")

print("✅ Data generation complete!")
X = np.array(X)
Y = np.array(Y)
print("X shape:", X.shape)
print("Y shape:", Y.shape)
np.save('/content/drive/MyDrive/X.npy', X)
np.save('/content/drive/MyDrive/Y.npy', Y)

### If all 16 patterns not found:

In [None]:
seen_patterns = set(tuple(y) for y in Y)  # Y is your existing label array

all_patterns = list(product([0, 1], repeat=4))

while len(seen_patterns) < 16:
    pq_vec = gen_signed_pq()
    overload = inject_and_run_pf(pq_vec, capacities=[70,72,63,36]) # 69 68 63 36

    if overload is None:
        continue
    #print(overload)

    pattern = tuple(overload)
    if pattern not in seen_patterns:
        seen_patterns.add(pattern)
        X.append(pq_vec)
        Y.append(overload)
        print(f"✔ Found new pattern {pattern} ({len(seen_patterns)}/16)")

#### Double Check Shape (sanity check)

In [None]:
X = X[:-2]
Y = Y[:-2]

X = X[:, -6:]
print(np.shape(X))
print(np.shape(Y))

### Balance Data if Harsh Generation

In [None]:
# Convert Y rows to tuple keys for grouping
Y_tuples = [tuple(y) for y in Y]

# Group the Xs and Ys by label
grouped = defaultdict(list)
for x, y in zip(X, Y_tuples):
    grouped[y].append(x)

# Get max group size
max_len = max(len(v) for v in grouped.values())

# Oversample each group to match max size
X_balanced = []
Y_balanced = []

for label, x_group in grouped.items():
    x_array = np.array(x_group)
    needed = max_len - len(x_array)

    # Sample with replacement to pad group
    if needed > 0:
        extra = x_array[np.random.choice(len(x_array), needed, replace=True)]
        x_full = np.vstack([x_array, extra])
    else:
        x_full = x_array

    y_full = [np.array(label)] * len(x_full)

    X_balanced.append(x_full)
    Y_balanced.extend(y_full)

# Concatenate all groups together
X = np.vstack(X_balanced)
Y = np.array(Y_balanced)

# Optional: Shuffle before scaling
shuffle_indices = np.random.permutation(len(X))
X = X[shuffle_indices]
Y = Y[shuffle_indices]

## Change Size of Dataset

In [None]:
def select_balanced_subset(X, Y, samples_per_pattern=20):
    pattern_dict = defaultdict(list)

    # Group samples by unique overload pattern
    for idx, y in enumerate(Y):
        pattern_key = tuple(y)  # make it hashable
        pattern_dict[pattern_key].append(idx)

    selected_indices = []
    for indices in pattern_dict.values():
        if len(indices) > samples_per_pattern:
            chosen = np.random.choice(indices, samples_per_pattern, replace=False)
        else:
            chosen = indices  # Keep all if fewer than requested
        selected_indices.extend(chosen)

    # Convert to arrays
    X_selected = np.array(X)[selected_indices]
    Y_selected = np.array(Y)[selected_indices]
    return X_selected, Y_selected

In [None]:
# Change number for how many samples per overload pattern
X, Y = select_balanced_subset(X,Y,70)

# Scale Data, Train and Test Sets

In [None]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y, test_size=0.2, random_state=42)
X_train = torch.tensor(X_train, dtype=torch.float32)
Y_train = torch.tensor(Y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
Y_test = torch.tensor(Y_test, dtype=torch.float32)

# Overload Detection

## Classical Networks

In [None]:
# Same Scale Network
class SimpleNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(6, 10), # Input size is 8 (P and Q for 4 buses)
            nn.ReLU(),
            #nn.Dropout(0.2),
            nn.Linear(10, 10),
            nn.ReLU(),
            #nn.Dropout(0.2),
            nn.Linear(10, 10),
            nn.ReLU(),
            #nn.Dropout(0.2),
            nn.Linear(10, 4),# Output size is 4 (for the 4 overload flags)
            nn.Sigmoid()  # for multilabel classification
        )

    def forward(self, x):
        return self.layers(x)

model = SimpleNN()

# === Loss and optimizer ===
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# === Training loop ===
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    outputs = model(X_train)
    loss = criterion(outputs, Y_train)
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0 or epoch == num_epochs - 1:
        print(f"Epoch {epoch} | Avg Loss: {loss.item():.6f}")

# === Evaluation ===
model.eval()
with torch.no_grad():
    preds = model(X_test)
    preds_bin = (preds >= 0.5).int()
    correct = (preds_bin == Y_test.int()).all(dim=1).sum().item()
    total = Y_test.shape[0]

    print("\n--- Testing on unseen data ---")
    print(f"Accuracy: {correct} / {total} ({100 * correct / total:.2f}%)")
    loss_test = criterion(preds, Y_test).item()
    print(f"Average Test Loss (BCE): {loss_test:.6f}")
    print("-" * 40)

    print("\nSample predictions:")
    for i in range(10):
        print(f"Input:     {X_test[i].numpy()}")
        print(f"True Y:    {Y_test[i].numpy().astype(int)}")
        print(f"Predicted: {preds_bin[i].numpy()}")
        print("-" * 40)
from sklearn.metrics import confusion_matrix

# Convert predictions and ground truth to NumPy arrays
Y_pred_all = preds.numpy()
Y_pred_bin = (Y_pred_all >= 0.5).astype(int)
Y_true_all = Y_test.numpy().astype(int)

# Accuracy per output bit
bit_acc = (Y_pred_bin == Y_true_all).sum(axis=0) / Y_true_all.shape[0]
for i, acc in enumerate(bit_acc):
    print(f"Accuracy for output bit {i} (Line {i+1}): {acc:.2%}")

# Optional: Confusion matrix per output bit
for i in range(4):
    print(f"\nConfusion matrix for bit {i} (Line {i+1}):")
    print(confusion_matrix(Y_true_all[:, i], Y_pred_bin[:, i]))
pred_cases = [tuple(row) for row in Y_pred_bin]
true_cases = [tuple(row) for row in Y_true_all]

# Count how many times each pattern was predicted
pred_counter = Counter(pred_cases)
true_counter = Counter(true_cases)

# Get all unique patterns (present in either true or predicted)
all_patterns = sorted(set(pred_counter.keys()).union(set(true_counter.keys())))

print("\n--- Pattern Breakdown ---")
print(f"{'Pattern':<10} | {'True Count':>10} | {'Pred Count':>10}")
print("-" * 36)
for pattern in all_patterns:
    print(f"{list(pattern)!s:<10} | {true_counter.get(pattern, 0):>10} | {pred_counter.get(pattern, 0):>10}")

In [None]:
# Large Network
class SimpleNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(6, 148), # Input size is 8 (P and Q for 4 buses)
            nn.LeakyReLU(),
            nn.Dropout(0.1),
            nn.Linear(148, 84),
            nn.LeakyReLU(),
            nn.Dropout(0.1),
            nn.Linear(84, 24),
            nn.LeakyReLU(),
            nn.Dropout(0.1),
            nn.Linear(24, 4), # Output size is 4 (for the 4 overload flags)

        )

    def forward(self, x):
        return self.layers(x)

model = SimpleNN()

# === Loss and optimizer ===
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# === Training loop ===
num_epochs = 600
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    outputs = model(X_train)
    loss = criterion(outputs, Y_train)
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0 or epoch == num_epochs - 1:
        print(f"Epoch {epoch} | Avg Loss: {loss.item():.6f}")

# === Evaluation ===
model.eval()
with torch.no_grad():
    logits = model(X_test)
    preds = torch.sigmoid(logits)
    preds_bin = (preds >= 0.5).int()
    correct = (preds_bin == Y_test.int()).all(dim=1).sum().item()
    total = Y_test.shape[0]

    print("\n--- Testing on unseen data ---")
    print(f"Accuracy: {correct} / {total} ({100 * correct / total:.2f}%)")
    loss_test = criterion(preds, Y_test).item()
    print(f"Average Test Loss (BCE): {loss_test:.6f}")
    print("-" * 40)

    print("\nSample predictions:")
    for i in range(10):
        print(f"Input:     {X_test[i].numpy()}")
        print(f"True Y:    {Y_test[i].numpy().astype(int)}")
        print(f"Predicted: {preds_bin[i].numpy()}")
        print("-" * 40)
from sklearn.metrics import confusion_matrix

# Convert predictions and ground truth to NumPy arrays
Y_pred_all = preds.numpy()
Y_pred_bin = (Y_pred_all >= 0.5).astype(int)
Y_true_all = Y_test.numpy().astype(int)

# Accuracy per output bit
bit_acc = (Y_pred_bin == Y_true_all).sum(axis=0) / Y_true_all.shape[0]
for i, acc in enumerate(bit_acc):
    print(f"Accuracy for output bit {i} (Line {i+1}): {acc:.2%}")

# Optional: Confusion matrix per output bit
for i in range(4):
    print(f"\nConfusion matrix for bit {i} (Line {i+1}):")
    print(confusion_matrix(Y_true_all[:, i], Y_pred_bin[:, i]))
pred_cases = [tuple(row) for row in Y_pred_bin]
true_cases = [tuple(row) for row in Y_true_all]

# Count how many times each pattern was predicted
pred_counter = Counter(pred_cases)
true_counter = Counter(true_cases)

# Get all unique patterns (present in either true or predicted)
all_patterns = sorted(set(pred_counter.keys()).union(set(true_counter.keys())))

print("\n--- Pattern Breakdown ---")
print(f"{'Pattern':<10} | {'True Count':>10} | {'Pred Count':>10}")
print("-" * 36)
for pattern in all_patterns:
    print(f"{list(pattern)!s:<10} | {true_counter.get(pattern, 0):>10} | {pred_counter.get(pattern, 0):>10}")

## Quantum Network 

In [None]:
train_loader = DataLoader(TensorDataset(X_train_tensor, Y_train_tensor), batch_size=16, shuffle=True)

# ==== Quantum Circuit ====
n_qubits = 10
n_layers = 3
n_features = 6

dev = qml.device("default.qubit", wires=n_qubits)

@qml.qnode(dev, interface="torch", diff_method="backprop")
def qnode(inputs, weights):
    for i in range(n_features):
        qml.Hadamard(wires=i)
    qml.AngleEmbedding(inputs, wires=range(n_features), rotation='Z')


    qml.templates.StronglyEntanglingLayers(weights, wires=range(n_qubits))

    # Custom entanglement
    qml.CNOT(wires=[0,1])
    qml.CNOT(wires=[2,3])
    qml.CNOT(wires=[4,5])
    qml.CNOT(wires=[6,7])
    qml.CNOT(wires=[0,4])
    qml.CNOT(wires=[1,5])
    qml.CNOT(wires=[0,2])
    qml.CNOT(wires=[1,3])
    qml.CNOT(wires=[2,6])
    qml.CNOT(wires=[3,7])
    qml.CNOT(wires=[4,6])
    qml.CNOT(wires=[5,7])

    return [qml.expval(qml.PauliZ(i)) for i in range(n_features)]

# ==== QNN Model Class ====
class QNNModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.q_layer = qml.qnn.TorchLayer(qnode, {"weights": (n_layers, n_qubits, 3)})
        self.linear = nn.Linear(n_features, 4)  # Reduce qubits → 4 outputs
        self.tune = nn.Parameter(torch.tensor(1.0))

    def forward(self, x):
        z = self.q_layer(x)  # shape: (batch_size, 8)
        z = self.linear(z)   # shape: (batch_size, 4)
        self.tune = nn.Parameter(torch.tensor(5.0))
        return self.tune * z  # shape: (batch_size, 4)



# ==== Training ====
model = QNNModel()
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# ==== Training Loop ====
n_epochs = 100
for epoch in range(n_epochs):
    total_loss = 0
    for x_batch, y_batch in train_loader:
        optimizer.zero_grad()
        preds = model(x_batch)
        loss = loss_fn(preds, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1:02d} | Avg Loss: {total_loss / len(train_loader):.6f}")


# ==== Evaluation ====
with torch.no_grad():
    logits = model(X_test_tensor)
    preds = torch.sigmoid(logits)
    bin_preds = (preds > 0.5).int()
    correct = (bin_preds == Y_test_tensor.int()).all(dim=1).sum().item()
    print(f"\nExact Match Accuracy: {correct} / {len(Y_test_tensor)} = {100 * correct / len(Y_test_tensor):.2f}%")

    print("\nSample Predictions:")
    for i in range(3):
        x = X_test_tensor[i]
        y_true = Y_test_tensor[i]
        y_pred = model(x.unsqueeze(0)).squeeze().detach().numpy()
        y_bin = (y_pred > 0.5).astype(int)
        print(f"Input:     {x.numpy()}")
        print(f"True Y:    {y_true.numpy()}")
        print(f"Predicted: {y_bin}")
        print("-" * 40)

# Accuracy Breakdown Analysis

In [None]:
# run this to see the accuracy per pattern
# Convert tensors to numpy arrays
y_true_np = Y_test_tensor.int().numpy()
y_pred_np = bin_preds.numpy()

# Create pattern strings for true and predicted labels for easy comparison
true_patterns = ["".join(map(str, row)) for row in y_true_np]
pred_patterns = ["".join(map(str, row)) for row in y_pred_np]

# Get all unique true patterns in test set
unique_patterns = set(true_patterns)

# Store counts per pattern
pattern_counts = defaultdict(int)
pattern_correct = defaultdict(int)

# Loop through all samples
for true_patt, pred_patt in zip(true_patterns, pred_patterns):
    pattern_counts[true_patt] += 1
    if true_patt == pred_patt:
        pattern_correct[true_patt] += 1

# Print accuracy per pattern
for patt in sorted(unique_patterns):
    count = pattern_counts[patt]
    correct = pattern_correct[patt]
    accuracy = correct / count if count > 0 else 0
    print(f"Pattern {patt}: Accuracy {accuracy:.2%} ({correct}/{count})")

# Correction Network

## Generate y-true values

In [None]:
def correct(inputVector, capacities=None):
    step_size = 0.01
    gen_P, gen_Q, p2, q2, p3, q3 = inputVector
    delta_P_range = np.linspace(-5.0, 5.0, 70)  # Try P changes from -5 to +5
    delta_Q_range = np.linspace(-5.0, 5.0, 70)

    for dP in delta_P_range:
      for dQ in delta_Q_range:
            if dP == 0 and dQ == 0:
                continue  # Skip the original point

            trial_P = gen_P + dP
            trial_Q = gen_Q + dQ
            vdelta_vec = np.empty(6)

            if trial_P <= 0 or trial_Q <= 0:
              continue

            net = pn.case4gs()


            try:
                # Solve power flow with new generator settings
                # Update generator P & Q
              net.gen.at[0, 'p_mw'] = trial_P * 100
              net.gen.at[0, 'q_mvar'] = trial_Q * 100

              if capacities is not None:
              # Set tight line loading limits
                net.line['max_loading_percent'] = capacities



              # Update loads
              net.load.at[0, 'p_mw'] = -p2 * 100
              net.load.at[0, 'q_mvar'] = -q2 * 100
              net.load.at[1, 'p_mw'] = -p3 * 100
              net.load.at[1, 'q_mvar'] = -q3 * 100

              pp.runpp(net)
              ppc = net._ppc
              #Ybus, _, _ = makeYbus(ppc["baseMVA"], ppc["bus"], ppc["branch"])

              # Pull voltage magnitudes and angles
              V = net.res_bus.vm_pu.values
              delta = np.deg2rad(net.res_bus.va_degree.values)

              # Strip slack bus (bus 0) and interleave V & δ
              vdelta_vec = np.array([v for pair in zip(V[1:], delta[1:]) for v in pair])



            except:
                print("Didnt work")
                continue  # Skip if power flow doesn't converge

            loading = net.res_line.loading_percent.values
            overload_flags = (loading > capacities).astype(int)
            if np.all(overload_flags == 0):

                output_vector = [dP, dQ]

                return output_vector

    return None  # No fix found

X_fix = []
Y_fix = []

for x, y in zip(X,Y):

    # Skip samples that aren't overloaded to begin with
    if all(y == 0):
      print("Skip")
      continue

    else:
      result = correct(x, [70,72,63,36])

      if result:
          print(x)
          X_fix.append(x)
          Y_fix.append(result)
np.save('/content/drive/MyDrive/Xfix.npy', X_fix)
np.save('/content/drive/MyDrive/Yfix.npy', Y_fix)

In [None]:
# Scale for networks
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_fix)
X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y_fix, test_size=0.2, random_state=42)
X_train = torch.tensor(X_train, dtype=torch.float32)
Y_train = torch.tensor(Y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
Y_test = torch.tensor(Y_test, dtype=torch.float32)

## Classical Network

In [None]:
# === Define the model ===
class SmallNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(6, 10), # Input size is 8 (P and Q for 4 buses)
            nn.ReLU(),
            #nn.Dropout(0.2),
            nn.Linear(10, 10),
            nn.ReLU(),
            #nn.Dropout(0.2),
            nn.Linear(10, 10),
            nn.ReLU(),
            #nn.Dropout(0.2),
            nn.Linear(10, 2),# Output size is 4 (for the 4 overload flags)
            nn.Sigmoid()  # for multilabel classification
        )

    def forward(self, x):
        return self.layers(x)

model = SmallNN()

# === Loss and optimizer ===
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# === Training loop ===
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    outputs = model(X_train)
    loss = criterion(outputs, Y_train)
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0 or epoch == num_epochs - 1:
        print(f"Epoch {epoch} | Avg Loss: {loss.item():.6f}")

# === Evaluation ===
correct = 0
total = len(X_test)

model.eval()
with torch.no_grad():
    preds = model(X_test)  # Shape: (N, 2)

for i in range(total):
    # Step 1: Modify the original X vector with the predicted values
    x_input = X_test[i].clone().numpy()
    x_input = scaler.inverse_transform(x_input.reshape(1, -1)).flatten()

    p_pred, q_pred = preds[i].numpy()

    # Insert predicted values into the first two positions (e.g., P2, Q2)
    x_input[0] += p_pred
    x_input[1] += q_pred
    fixed_vec = x_input

    # Step 2: Reconstruct your net (or copy it fresh if you have a template)
    net = pn.case4gs()

    net.gen.at[0, 'p_mw'] = fixed_vec[0] * 100
    net.gen.at[0, 'q_mvar'] = fixed_vec[1] * 100
    net.load.at[0, 'p_mw'] = -fixed_vec[2] * 100
    net.load.at[0, 'q_mvar'] = -fixed_vec[3] * 100
    net.load.at[1, 'p_mw'] = -fixed_vec[4] * 100
    net.load.at[1, 'q_mvar'] = -fixed_vec[5] * 100
    net.line['max_loading_percent'] = [70,72,63,36]

    # Step 4: Run power flow
    try:
        pp.runpp(net)
        line_loading = net.res_line.loading_percent.values
        overload_flags = line_loading > 100.0

        if not np.any(overload_flags):
            correct += 1

    except pp.LoadflowNotConverged:
        print(f"Power flow did not converge for sample {i}")

print(f"\nPost-simulation accuracy (no overloads after predicted fix): {correct} / {total} ({100 * correct / total:.2f}%)")

In [None]:
# Large classical network
# === Define the model ===
class CorrectionNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(6, 148), # Input size is 8 (P and Q for 4 buses)
            nn.LeakyReLU(),
            nn.Dropout(0.1),
            nn.Linear(148, 84),
            nn.LeakyReLU(),
            nn.Dropout(0.1),
            nn.Linear(84, 24),
            nn.LeakyReLU(),
            nn.Dropout(0.1),
            nn.Linear(24, 2), # Output size is 4 (for the 4 overload flags)

        )

    def forward(self, x):
        return self.layers(x)

model = CorrectionNN()

# === Loss and optimizer ===
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# === Training loop ===
num_epochs = 600
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    outputs = model(X_train)
    loss = criterion(outputs, Y_train)
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0 or epoch == num_epochs - 1:
        print(f"Epoch {epoch} | Avg Loss: {loss.item():.6f}")

# === Evaluation ===
correct = 0
total = len(X_test)

model.eval()
with torch.no_grad():
    preds = model(X_test)  # Shape: (N, 2)

for i in range(total):
    # Step 1: Modify the original X vector with the predicted values
    x_input = X_test[i].clone().numpy()
    x_input = scaler.inverse_transform(x_input.reshape(1, -1)).flatten()

    p_pred, q_pred = preds[i].numpy()

    # Insert predicted values into the first two positions (e.g., P2, Q2)
    x_input[0] += p_pred
    x_input[1] += q_pred
    fixed_vec = x_input

    # Step 2: Reconstruct your net (or copy it fresh if you have a template)
    net = pn.case4gs()

    net.gen.at[0, 'p_mw'] = fixed_vec[0] * 100
    net.gen.at[0, 'q_mvar'] = fixed_vec[1] * 100
    net.load.at[0, 'p_mw'] = -fixed_vec[2] * 100
    net.load.at[0, 'q_mvar'] = -fixed_vec[3] * 100
    net.load.at[1, 'p_mw'] = -fixed_vec[4] * 100
    net.load.at[1, 'q_mvar'] = -fixed_vec[5] * 100
    net.line['max_loading_percent'] = [70,72,63,36]

    # Step 4: Run power flow
    try:
        pp.runpp(net)
        line_loading = net.res_line.loading_percent.values
        overload_flags = line_loading > 100.0

        if not np.any(overload_flags):
            correct += 1

    except pp.LoadflowNotConverged:
        print(f"Power flow did not converge for sample {i}")

print(f"\nPost-simulation accuracy (no overloads after predicted fix): {correct} / {total} ({100 * correct / total:.2f}%)")

## Quantum Network

In [None]:
# ==== Quantum Circuit ====
n_qubits = 10
n_layers = 3
n_features = 6

dev = qml.device("default.qubit", wires=n_qubits)

@qml.qnode(dev, interface="torch", diff_method="backprop")
def qnode(inputs, weights):
    for i in range(n_features):
        qml.Hadamard(wires=i)
    qml.AngleEmbedding(inputs, wires=range(n_features), rotation='Z')


    qml.templates.StronglyEntanglingLayers(weights, wires=range(n_qubits))

    # Custom entanglement
    qml.CNOT(wires=[0,1])
    qml.CNOT(wires=[2,3])
    qml.CNOT(wires=[4,5])
    qml.CNOT(wires=[6,7])
    qml.CNOT(wires=[6,0])
    qml.CNOT(wires=[7,1])
    qml.CNOT(wires=[6,2])
    qml.CNOT(wires=[7,3])

    qml.CNOT(wires=[0,4])
    qml.CNOT(wires=[1,5])
    qml.CNOT(wires=[2,4])
    qml.CNOT(wires=[3,5])

    return [qml.expval(qml.PauliZ(i)) for i in range(n_features)]

# ==== QNN Model Class ====
class QNNFix(nn.Module):
    def __init__(self):
        super().__init__()
        self.q_layer = qml.qnn.TorchLayer(qnode, {"weights": (n_layers, n_qubits, 3)})
        self.linear = nn.Linear(n_features, 2)  # Reduce qubits → 2 outputs
        self.tune = nn.Parameter(torch.tensor(1.0))

    def forward(self, x):
        z = self.q_layer(x)
        z = self.linear(z)   
        return self.tune * z  



# ==== Training ====
model = QNNFix()
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# ==== Training Loop ====
n_epochs = 100
for epoch in range(n_epochs):
    total_loss = 0
    for x_batch, y_batch in train_loader:
        optimizer.zero_grad()
        preds = model(x_batch)
        loss = loss_fn(preds, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1:02d} | Avg Loss: {total_loss / len(train_loader):.6f}")


# ==== Evaluation ====
correct = 0
total = len(X_test_tensor)
model.eval()
with torch.no_grad():
    preds = model(X_test_tensor).cpu().numpy()  # shape: (N, 2)

for i in range(total):
    x_input = X_test_tensor[i].numpy()
    x_input_unscaled = scaler.inverse_transform(x_input.reshape(1, -1)).flatten()

    # Get predicted correction
    p_pred, q_pred = preds[i]

    # Modify P2 and Q2
    x_input_unscaled[0] += p_pred  # P2
    x_input_unscaled[1] += q_pred  # Q2
    fixed_vec = x_input_unscaled

    # Rebuild your base network (e.g., 4-bus)
    net = pn.case4gs()
    net.gen.at[0, 'p_mw'] = fixed_vec[0] * 100
    net.gen.at[0, 'q_mvar'] = fixed_vec[1] * 100
    net.load.at[0, 'p_mw'] = -fixed_vec[2] * 100
    net.load.at[0, 'q_mvar'] = -fixed_vec[3] * 100
    net.load.at[1, 'p_mw'] = -fixed_vec[4] * 100
    net.load.at[1, 'q_mvar'] = -fixed_vec[5] * 100
    net.line['max_loading_percent'] = [70, 72, 63, 36]

    try:
        pp.runpp(net)
        line_loading = net.res_line.loading_percent.values
        overload_flags = line_loading > 100.0

        if not np.any(overload_flags):
            correct += 1

    except pp.LoadflowNotConverged:
        print(f"Quantum power flow failed for sample {i}")

print(f"\nQuantum Post-fix Accuracy (no overloads): {correct} / {total} ({100 * correct / total:.2f}%)")