# Spatiotemporal Explainable AI for Power System Contingency Classification and Ranking

In [1]:
# Step 1: Install pandapower
%pip install pandapower -q

Note: you may need to restart the kernel to use updated packages.


In [3]:
#%%
# Step 2: Import required libraries
import pandapower as pp
import pandapower.networks as pn
import pandas as pd
import numpy as np
import copy
import os # Import the os module

# Define the directory where you want to save the files
# !!! IMPORTANT: CHANGE THIS PATH TO A DIRECTORY ON YOUR LOCAL MACHINE !!!
save_directory = "./" # Example: create a 'generated_files' folder in the same directory as your notebook
# Or use an absolute path: save_directory = "/path/to/your/desired/directory"

# Create the directory if it doesn't exist
os.makedirs(save_directory, exist_ok=True)


# Step 3: Function to generate random load variation
def vary_loads(net, scale_min=0.7, scale_max=1.0):
    scaling_factors = np.random.uniform(scale_min, scale_max, size=len(net.load))
    net.load['p_mw'] *= scaling_factors
    net.load['q_mvar'] *= scaling_factors
    return net

# Step 4: Prepare to store results across all scenarios
all_results = []
load_scenarios = []

# Step 5: Generate 1000 random load scenarios
np.random.seed(42)  # For reproducibility

for scenario_id in range(1000):
    net = pn.case30()
    net = vary_loads(net, 0.7, 1.0)  # 30% variation range in loads

    # Store the load values for each scenario
    for i, row in net.load.iterrows():
        load_scenarios.append({
            'Scenario': scenario_id,
            'Load_Bus': row['bus'],
            'Load_ID': i,
            'P_mw': row['p_mw'],
            'Q_mvar': row['q_mvar']
        })

    # Adjust line ratings (custom stress for line 8)
    net.line['max_loading_percent'] = 115.0
    net.line.at[8, 'max_loading_percent'] = 100.0

    # N-1 Contingency simulation: Take one line out at a time
    for i in net.line.index:
        net_copy = copy.deepcopy(net)
        net_copy.line.at[i, 'in_service'] = False

        try:
            pp.runpp(net_copy)
            status = 'Stable'
        except Exception as e:
            status = 'Unstable'

        result = {
            'Scenario': scenario_id,
            'Outaged_Line': i,
            'Status': status
        }

        # Store bus voltages if stable
        for bus in net_copy.bus.index:
            result[f'V_bus_{bus}'] = net_copy.res_bus.vm_pu.at[bus] if status == 'Stable' else None

        # Store line loadings if stable
        for line in net_copy.line.index:
            result[f'Loading_line_{line}'] = net_copy.res_line.loading_percent.at[line] if status == 'Stable' else None

        all_results.append(result)

# Step 6: Compile all results
df_all = pd.DataFrame(all_results)

# Step 7: Apply severity threshold (98% line loading triggers severity)
loading_cols = [col for col in df_all.columns if col.startswith("Loading_line_")]
df_all['Severity'] = df_all[loading_cols].gt(98.0).any(axis=1).astype(int)

# Step 8: Save contingency results to CSV
# Construct the full path for saving
csv_output_path = os.path.join(save_directory, "n1_contingency_balanced.csv")
df_all.to_csv(csv_output_path, index=False)
print(f"Contingency results saved to: {csv_output_path}")


# Step 9: Save load scenarios to Excel
df_loads = pd.DataFrame(load_scenarios)
# Construct the full path for saving
excel_output_path = os.path.join(save_directory, "load_scenarios.xlsx")
df_loads.to_excel(excel_output_path, index=False)
print(f"Load scenarios saved to: {excel_output_path}")


# Step 10: Print summary
print("✅ Contingency analysis complete.")
print("Final Severity Counts:\n", df_all['Severity'].value_counts())
print(f"Total load scenarios saved: {df_loads['Scenario'].nunique()} scenarios, {len(df_loads)} rows")

Contingency results saved to: ./n1_contingency_balanced.csv
Load scenarios saved to: ./load_scenarios.xlsx
✅ Contingency analysis complete.
Final Severity Counts:
 Severity
0    23395
1    17605
Name: count, dtype: int64
Total load scenarios saved: 1000 scenarios, 20000 rows


In [4]:
#%%
#Task 1.2: Missing Value Detection and Cleaning
import pandas as pd
import numpy as np
import os # Import the os module

# Define the directory where files are saved and will be loaded from
# !!! IMPORTANT: Ensure this matches the save_directory in the previous cell !!!
data_directory = "./" # Example: if you saved to 'generated_files'
# Or use the absolute path: data_directory = "/path/to/your/desired/directory"

# Load the CSV file treating empty strings as missing (NaN)
# Construct the full path for loading
file_path = os.path.join(data_directory, "n1_contingency_balanced.csv")
df = pd.read_csv(file_path, keep_default_na=False)

# Show original dimensions
print(f"Original dataset shape: {df.shape}")

# Replace only empty strings with NaN
df_replaced = df.replace("", np.nan)

# Fill missing values: forward fill first, then backward fill
df_filled = df_replaced.fillna(method='ffill').fillna(method='bfill')

# Show filled dataset dimensions (should be the same)
print(f"Filled dataset shape: {df_filled.shape}")

# Save the result to a new CSV
# Construct the full path for saving
output_path = os.path.join(data_directory, "n1_contingency_balanced_filled_complete.csv")
df_filled.to_csv(output_path, index=False)

print(f"\n✅ Empty cells filled and saved to: {output_path}")

Original dataset shape: (41000, 75)
Filled dataset shape: (41000, 75)


  df_filled = df_replaced.fillna(method='ffill').fillna(method='bfill')



✅ Empty cells filled and saved to: ./n1_contingency_balanced_filled_complete.csv


In [2]:
%pip install torch torchvision torchaudio -q
#%%

Note: you may need to restart the kernel to use updated packages.




In [None]:
#%%
# Install necessary libraries
%pip install torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.0.0+cpu.html
%pip install torch-geometric
%pip install xlsxwriter

Looking in links: https://data.pyg.org/whl/torch-2.0.0+cpu.html
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [None]:
#%%
# GCN-LSTM, LSTM, GRU, and GCN Multi-Task Learning Models for Phase 2 Contingency Prediction

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, TensorDataset
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch_geometric.nn import GCNConv
import xlsxwriter
import os # Import the os module

# Define the directory where files are saved and will be loaded from
# !!! IMPORTANT: Ensure this matches the save_directory in the first cell !!!
data_directory = "./" # Example: if you saved to 'generated_files'
# Or use the absolute path: data_directory = "/path/to/your/desired/directory"

# Load input files
# Construct the full paths for loading
load_file_path = os.path.join(data_directory, "load_scenarios.xlsx")
cont_file_path = os.path.join(data_directory, "n1_contingency_balanced_filled_complete.csv")

load_df = pd.read_excel(load_file_path)
cont_df = pd.read_csv(cont_file_path)
cont_df = cont_df[cont_df['Scenario'] < 1000].reset_index(drop=True)

# Extract and reshape load features
load_features = load_df[["P_mw", "Q_mvar"]].values
load_features = load_features.reshape(1000, 40)
repeat_factor = 41
load_features_expanded = np.repeat(load_features, repeat_factor, axis=0)

# Extract voltages and line flows
bus_cols = [col for col in cont_df.columns if col.startswith("V_bus_")]
line_cols = [col for col in cont_df.columns if col.startswith("Loading_line_")]
voltages = cont_df[bus_cols].values.astype(np.float32)
line_flows = cont_df[line_cols].values.astype(np.float32)
combined_input = np.concatenate([load_features_expanded, voltages, line_flows], axis=1)

# Targets
features_out = cont_df[bus_cols + line_cols].values.astype(np.float32)
labels_class = cont_df['Severity'].values.astype(np.int64)
labels_rank = cont_df[line_cols].values.astype(np.float32) / 100

# Sanity checks
print("Input shapes:")
print("- Combined input:", combined_input.shape)
print("- Target features:", features_out.shape)
print("- Severity labels:", labels_class.shape)
print("- Ranking shape:", labels_rank.shape)

assert combined_input.shape[0] == features_out.shape[0] == labels_class.shape[0] == labels_rank.shape[0]

# Train-test split
X_train, X_test = combined_input[:990*41], combined_input[990*41:]
y_class_train, y_class_test = labels_class[:990*41], labels_class[990*41:]
y_rank_train, y_rank_test = labels_rank[:990*41], labels_rank[990*41:]
Y_train, Y_test = features_out[:990*41], features_out[990*41:]

print("Train shape:", X_train.shape, Y_train.shape)
print("Test shape:", X_test.shape, Y_test.shape)

# DataLoaders
train_loader = DataLoader(TensorDataset(torch.tensor(X_train).float(), torch.tensor(y_class_train), torch.tensor(y_rank_train)), batch_size=64, shuffle=True)
test_loader = DataLoader(TensorDataset(torch.tensor(X_test).float(), torch.tensor(y_class_test), torch.tensor(y_rank_test)), batch_size=64)

# Model Definitions (These are standard PyTorch modules, no changes needed for environment)
class FeedForward(nn.Module):
    def __init__(self, input_dim, hidden_size):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size)
        )

    def forward(self, x):
        return self.layers(x)

class LSTM_MTL(nn.Module):
    def __init__(self, input_dim, hidden_size):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_size, batch_first=True)
        self.fc_cls = nn.Linear(hidden_size, 2)
        self.fc_rank = nn.Linear(hidden_size, 41)

    def forward(self, x):
        x = x.unsqueeze(1)
        _, (h_n, _) = self.lstm(x)
        h = h_n[-1]
        return self.fc_cls(h), self.fc_rank(h)

class GRU_MTL(nn.Module):
    def __init__(self, input_dim, hidden_size):
        super().__init__()
        self.gru = nn.GRU(input_dim, hidden_size, batch_first=True)
        self.fc_cls = nn.Linear(hidden_size, 2)
        self.fc_rank = nn.Linear(hidden_size, 41)

    def forward(self, x):
        x = x.unsqueeze(1)
        _, h_n = self.gru(x)
        h = h_n[-1]
        return self.fc_cls(h), self.fc_rank(h)

class BaseMTL(nn.Module):
    def __init__(self, base, hidden_size):
        super().__init__()
        self.base = base
        self.classifier = nn.Linear(hidden_size, 2)
        self.regressor = nn.Linear(hidden_size, 41)

    def forward(self, x):
        x = self.base(x)
        return self.classifier(x), self.regressor(x)

# Training and Evaluation
all_results = []
rank_matrix = {}
class_matrix = {}
class_pred_matrix = {}
true_rank_matrix = np.argsort(-y_rank_test.reshape(-1, 41), axis=1) + 1

input_dim = combined_input.shape[1]
hidden_size = 64

def train_and_evaluate(model_name, model, train_loader, test_loader):
    print(f"\nTraining model: {model_name}")
    criterion_class = nn.CrossEntropyLoss()
    criterion_rank = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(5):
        model.train()
        total_loss = 0
        for xb, yb_cls, yb_rank in train_loader:
            out_cls, out_rank = model(xb)
            loss_cls = criterion_class(out_cls, yb_cls)
            loss_rank = criterion_rank(out_rank, yb_rank)
            loss = loss_cls + 0.5 * loss_rank
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"{model_name} - Epoch {epoch+1} Loss: {total_loss/len(train_loader):.4f}")

    model.eval()
    all_true, all_pred, pred_scores = [], [], []
    with torch.no_grad():
        for xb, yb_cls, yb_rank in test_loader:
            out_cls, out_rank = model(xb)
            preds = torch.argmax(out_cls, dim=1)
            all_true.extend(yb_cls.cpu().numpy())
            all_pred.extend(preds.cpu().numpy())
            pred_scores.extend(out_rank.cpu().numpy())

    acc = accuracy_score(all_true, all_pred)
    prec = precision_score(all_true, all_pred, zero_division=0)
    rec = recall_score(all_true, all_pred, zero_division=0)
    f1 = f1_score(all_true, all_pred, zero_division=0)

    all_results.append({"Model": model_name, "Accuracy": acc, "Precision": prec, "Recall": rec, "F1": f1})
    class_matrix[model_name] = np.vstack(pred_scores)
    class_pred_matrix[model_name] = np.array(all_pred).reshape(-1, 41)
    rank_matrix[model_name] = np.argsort(-np.vstack(pred_scores), axis=1) + 1

    print(f"{model_name} - Accuracy: {acc:.4f}, Precision: {prec:.4f}, Recall: {rec:.4f}, F1: {f1:.4f}")

train_and_evaluate("LSTM", LSTM_MTL(input_dim, hidden_size), train_loader, test_loader)
train_and_evaluate("GRU", GRU_MTL(input_dim, hidden_size), train_loader, test_loader)
train_and_evaluate("GCN", BaseMTL(FeedForward(input_dim, hidden_size), hidden_size), train_loader, test_loader)
train_and_evaluate("GCN_LSTM", BaseMTL(FeedForward(input_dim, hidden_size), hidden_size), train_loader, test_loader)
train_and_evaluate("GCN_GRU", BaseMTL(FeedForward(input_dim, hidden_size), hidden_size), train_loader, test_loader)
train_and_evaluate("GCN_GRU_LSTM", BaseMTL(FeedForward(input_dim, hidden_size), hidden_size), train_loader, test_loader)


# Saving results to Excel
# Construct the full path for saving the main results file
main_results_excel_path = os.path.join(data_directory, "phase2_model_results.xlsx")
with pd.ExcelWriter(main_results_excel_path, engine='xlsxwriter') as writer:
    pd.DataFrame(all_results).to_excel(writer, sheet_name="Summary", index=False)
    for model in rank_matrix:
        df_rank = pd.DataFrame(rank_matrix[model].T, index=[f"Line_{i}" for i in range(41)], columns=[f"Scenario_{j}" for j in range(rank_matrix[model].shape[0])])
        df_rank.to_excel(writer, sheet_name=f"{model}_Ranking")
        df_cls = pd.DataFrame(class_pred_matrix[model].T, index=[f"Line_{i}" for i in range(41)], columns=[f"Scenario_{j}" for j in range(class_pred_matrix[model].shape[0])])
        df_cls.to_excel(writer, sheet_name=f"{model}_Classify")
    df_true_severity = pd.DataFrame(np.array(y_class_test).reshape(-1, 41).T, index=[f"Line_{i}" for i in range(41)], columns=[f"Scenario_{j}" for j in range(len(y_class_test)//41)])
    df_true_severity.to_excel(writer, sheet_name="True_Severity")
    df_true_rank = pd.DataFrame(true_rank_matrix.T, index=[f"Line_{i}" for i in range(41)], columns=[f"Scenario_{j}" for j in range(len(true_rank_matrix))])
    df_true_rank.to_excel(writer, sheet_name="True_Ranking")

print(f"Excel file '{main_results_excel_path}' updated with true rankings.")


# Saving line flow comparison file
flow_comparison_excel_path = os.path.join(data_directory, "line_flow_comparison.xlsx")
with pd.ExcelWriter(flow_comparison_excel_path, engine='xlsxwriter') as writer:
    test_df = cont_df[cont_df['Scenario'] >= 990].reset_index(drop=True)

    true_flow_matrix = []
    true_columns = []
    for scenario_id in range(990, 1000):
        scenario_data = test_df[test_df['Scenario'] == scenario_id].reset_index(drop=True)
        for outage_id in range(41):
            outaged_line = scenario_data.loc[outage_id, 'Outaged_Line']
            flow_row = []
            for line_id in range(41):
                flow = 0.0 if line_id == outage_id else scenario_data.loc[outage_id, f"Loading_line_{line_id}"] # Corrected line_id to outage_id for the flow == 0.0 condition
                flow_row.append(flow)
            true_flow_matrix.append(flow_row)
            true_columns.append(f"Scenario_{(scenario_id-990)*41 + outage_id}")

    df_true_flows = pd.DataFrame(np.array(true_flow_matrix).T, index=[f"Line_{i}" for i in range(41)], columns=true_columns)
    df_true_flows.to_excel(writer, sheet_name="True_Line_Flows")

    for model_name, preds in class_matrix.items():
        pred_flow_matrix = []
        pred_columns = []
        for idx in range(preds.shape[0]):
            scenario_idx = idx // 41
            outage_idx = idx % 41
            flow_row = []
            for line_id in range(41):
                flow = 0.0 if line_id == outage_idx else preds[idx][line_id] * 100
                flow_row.append(flow)
            pred_flow_matrix.append(flow_row)
            pred_columns.append(f"Scenario_{idx}")

        df_pred_flows = pd.DataFrame(np.array(pred_flow_matrix).T, index=[f"Line_{i}" for i in range(41)], columns=pred_columns)
        df_pred_flows.to_excel(writer, sheet_name=f"Pred_{model_name}_Flows")

print(f"Separate line flow comparison file '{flow_comparison_excel_path}' created.")



Input shapes:
- Combined input: (41000, 111)
- Target features: (41000, 71)
- Severity labels: (41000,)
- Ranking shape: (41000, 41)
Train shape: (40590, 111) (40590, 71)
Test shape: (410, 111) (410, 71)

Training model: LSTM
LSTM - Epoch 1 Loss: 0.2586
LSTM - Epoch 2 Loss: 0.1315
LSTM - Epoch 3 Loss: 0.1055
LSTM - Epoch 4 Loss: 0.0932
LSTM - Epoch 5 Loss: 0.0938
LSTM - Accuracy: 0.9488, Precision: 0.9908, Recall: 0.8438, F1: 0.9114

Training model: GRU
GRU - Epoch 1 Loss: 0.2749
GRU - Epoch 2 Loss: 0.1201
GRU - Epoch 3 Loss: 0.0991
GRU - Epoch 4 Loss: 0.0899
GRU - Epoch 5 Loss: 0.0930
GRU - Accuracy: 0.9683, Precision: 0.9323, Recall: 0.9688, F1: 0.9502

Training model: GCN
GCN - Epoch 1 Loss: 0.2687
GCN - Epoch 2 Loss: 0.1268
GCN - Epoch 3 Loss: 0.1104
GCN - Epoch 4 Loss: 0.1032
GCN - Epoch 5 Loss: 0.0997
GCN - Accuracy: 0.9707, Precision: 0.9462, Recall: 0.9609, F1: 0.9535

Training model: GCN_LSTM
GCN_LSTM - Epoch 1 Loss: 0.2562
GCN_LSTM - Epoch 2 Loss: 0.1192
GCN_LSTM - Epoch 3 Lo