# Sandbox
For testing and developing new Cyber Security Assessment tools in an interactive and persistent development environment.

In [None]:
import itertools
import json
import copy
import random
import warnings
import math
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import scipy.stats.distributions as distr
import seaborn as sns
import pandapower
from pathlib import Path as p

from cyber.assets import Defence, Vulnerability, CommmonDefences, CyberDevice
from communication.graph import CommNode, CommEdge
from communication.network import Aggregator, Device, CommNetwork
from attackers.random_attacker import RandomAttacker
from cyber.analysis import Analyzer
from visualization.network import plot_communication_network
from visualization.grid import plot_physical_grid

## Procedural Generation
### Abstract Tree
Consists of Devices and Aggregators. 
* Aggregators (internal nodes) require a **Hard** amount of effort to compromise and have a 50% chance of being compromised if the necessary effort is spent
* Devices (leaf nodes) require an **Easy** amount of effort to compromise and also have a 50% chance of being compromised if the necesssary effort is spent
* Control Center (root node) is **Very Hard** to compromise

Controllable parameters include:
* Number of devices (leaf nodes)
* Number of Entrypoints (points where cyberattacks can originate)
* Number of children per parent node (inversely proportional to redundancy)
* Random deviation in number of children
* Sibling to Sibling communication (lateral edges between nodes on the same level)

In [None]:
seed = np.random.randint(low=0, high=52600)
np.random.seed(seed); random.seed(seed)
print(f"Seed: {np.random.get_state()[1][0]}")

with warnings.catch_warnings():
    warnings.filterwarnings(action="ignore", category=FutureWarning)
    grid=pandapower.networks.create_cigre_network_mv(with_der="all")
    pandapower.runpp(grid, algorithm="nr", calculate_voltage_angles=True, init="dc", trafo_model="t", trafo_loading="power",
                enforce_q_lims=True, voltage_depend_loads=True, numba=False, consider_line_temperature=False)
    print(grid)
    criticality = criticality_by_power_flow(grid, verbose=False)[0] if "criticality_by_power_flow" in locals() else None
    spec_path = p.cwd() / "specifications" / "Default_specifications.json"
    network = CommNetwork(n_devices=3, n_entrypoints=1, children_per_parent=0, child_no_deviation=3, enable_sibling_to_sibling_comm=True,
                    network_specs=spec_path, 
                    # "Default_specifications.json", "SmartMeter_specifications.json", "SCADA_specifications.json", ...
                    # "WAMS_specifications.json", "Protection_specifications.json"
                    grid=grid, criticality=criticality
                    )
print(CommNetwork.show_tree(network.root))
print(f"Number of Components: {network.n_components}")

analyzer = Analyzer(network)
attacker = RandomAttacker(budget=52, verbose=False)

## Visualization
Plot the structure of the communication network and physical grid (if present).

In [None]:
from matplotlib.patches import ConnectionPatch
from visualization.patches import ElectricalPatchHandler

if network.grid is not None:
    for attempt_no in range(20):
        attacker.attack_network(network)

    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(12,20))
    handles1, labels1, tree_pos = plot_communication_network(network, attacker=None, palette="tab10", save_name=None, ax=axes[0], show=False, show_legend=False, invert=True) 
    handles2, labels2, coords = plot_physical_grid(network, size=0.2, distance=0.5, displace=False, ax=axes[1], show=False, show_legend=False)

    # Visualize connections between communication devices and physical equipment
    for device, pos in tree_pos.items():
        if isinstance(device, Device):
            kind, idx = device.equipment.kind, int(device.equipment.name)
            con = ConnectionPatch(xyA=coords[kind][idx], xyB=pos, coordsA="data", coordsB="data",
                                axesA=axes[1], axesB=axes[0], color="red" if device.is_compromised else "purple", alpha=0.6, linestyle=(0, (3, 10, 1, 10, 1, 10)), lw=1, zorder=-10)
            axes[1].add_artist(con)

    # Combine the 2 legends
    handles = list(handles1) + list(handles2)
    labels = list(labels1) + list(labels2)
    ITEMS_PER_ROW = 5
    ncol = min(ITEMS_PER_ROW, len(labels))
    nrows = 1 + (len(labels) // ITEMS_PER_ROW)
    fig.legend(labels=labels, handles=handles, loc="lower center", bbox_to_anchor=(0.5, 0.05-0.01*nrows), ncol=ncol,
            handler_map={patch_maker:ElectricalPatchHandler() for patch_maker in handles2},
            title="Legend", fancybox=True, fontsize='large', title_fontsize='larger')
    plt.tight_layout()
    file_name = "Test" # input('Name of file')
    fig.savefig(p.cwd() / "media" / f"{file_name}.pdf")
    fig.savefig(p.cwd() / "media" / f"{file_name}.png")
    plt.show()
    network.reset()
else:
     plot_communication_network(network,
                                attacker=None, # Visualize compromise
                                palette="tab10", # "tab10" (default), "Set2", "Paired", "flare"
                                save_name="Test", # input('Name of file'), # If provided, saves to media / SAVE_NAME.pdf 
                                figsize=(10,20),
                                show_legend=False, invert=True) 

### Electrical Grid
Visualize the electrical grid that the communication network is connected to.

In [None]:
from visualization.patches import ElectricalPatchMaker

symbols = ["trafo", "ext_grid", "switch", "sgen", "load", "bus"]
nrows = len(symbols) // 2 + (1 if len(symbols) % 2 != 0 else 0)
ncols = min(2, 1 + (len(symbols) // 2))
fig, axes = plt.subplots(figsize=(ncols*5,nrows*5), sharex=True, sharey=True,
                         nrows=nrows, ncols=ncols)
for i, patch_maker in enumerate(symbols):
    pm = ElectricalPatchMaker(symbol=patch_maker, edgecolor="black", facecolor="None", lw=3)
    ax = axes[i] if nrows == 1 else axes[i // 2, i % 2]
    ax.add_patch(pm.patch)
    
    ax.plot([pm.centroid[0]], [pm.centroid[1]], marker="*", markersize=5, color="black", zorder=10)
    ax.set(xlim=(-2, 2), ylim=(-2, 2), title=patch_maker, xticks=[], yticks=[])
plt.show()

## Analysis

### Monte Carlo
Build an approximate profile of the network's cyber security by launching many cyber attacks. The higher N_ATTACKS the more precise the resulting distribution is, however this comes at the cost of increased computation time.
The more nodes are compromised, the more successful the attack.

#### Active Graph Only
Only perform Monte Carlo simulation on the currently active network.

In [None]:
import pandas as pd

N_ATTACKS = 1000
BUDGET = 52
ATTACKER_VARIANT = RandomAttacker
compromised_array, effort_array, critical_array = analyzer.monte_carlo_analysis(n_attacks=N_ATTACKS, attacker_variant=ATTACKER_VARIANT, budget=BUDGET, device_only=False, vary_entrypoints=False, criticality=criticality)
# compromised_array, effort_array, critical_array = analyzer.monte_carlo_multi_analysis(0, param_name="children_per_parent",param_values=[int(network.n_components // 2), network.n_components], criticality=criticality)

In [None]:
comp_vals, comp_counts = np.unique(compromised_array, return_counts=True)
comp_percents = 100.0 * (comp_counts / np.sum(comp_counts))
comp_map = ", ".join([("\n" if (i+1) % 6 == 0 else "") + f"{val}: {count} ({comp_percents[i]:.1f}%)" for i, (val, count) in enumerate(zip(comp_vals, comp_counts))])
print(f"No. of Attacks: {N_ATTACKS}\n{'Compromise Distribution'.center(max(len(line) for line in comp_map.split("\n")))}\n{comp_map}")

In [None]:
import matplotlib as mpl

def plot_monte(results, network, info:bool=False, palette:str="Dark2"):
    sns.set_context('paper', font_scale=2.0)
    if results != {}:
        # Multiple Monte Carlo Processes
        if "param_values" in results:
            fig = plt.figure(figsize=(16,12))
            ax = fig.add_subplot()
            df = pd.DataFrame(np.squeeze(results["compromised"]), columns=results["param_values"])
            df = df.melt(var_name=results["param_name"])
            sns.histplot(df, x="value", hue=results["param_name"], discrete=True, stat="probability", common_norm=False, ax=ax)
            sns.move_legend(ax, "upper right", ncols=4, title=" ".join(results["param_name"].split("_")).capitalize())
            ax.set(xlabel="No. of Components Compromised", xlim=(-0.5, np.max(results["compromised"])+0.5))
            plt.show()
        # Single Monte Carlo Process
        else:
            fig = plt.figure(figsize=(14,12))
            has_varied_entrypoints = True if results["compromised"].shape[1] > 1 else False
            gs = mpl.gridspec.GridSpec(nrows=3, ncols=2, figure=fig, width_ratios=(0.95, 0.05))
            
            if info:
                print(f"Attacker: {results['attacker_variant'].__name__}, Budget: {results['budget']}\n" + 
                      f"Network Size: {network.n_components}, No. of Devices: {network.n_devices}, " + 
                      f"No. of Entrypoints: {network.n_entrypoints}")
            
            N = results["compromised"].shape[1]
            palette = sns.color_palette(palette=palette, n_colors=N, as_cmap=False)
            cmap = mpl.colors.ListedColormap(palette)

            # Compromise Distribution
            ax = fig.add_subplot(gs[0, 0] if has_varied_entrypoints else gs[0, :])
            sns.histplot(results["compromised"], discrete=True, stat="probability", palette=palette, ax=ax)
            ax.set(xlabel="No. of Components Compromised", xlim=(-0.5, np.max(results["compromised"])+0.5),
                        yscale="log")
            ax.get_legend().remove()

            # Effort Distribution
            ax = fig.add_subplot(gs[1, 0] if has_varied_entrypoints else gs[1, :])
            sns.histplot(results["effort"], binwidth=1, stat="percent", palette=palette, ax=ax)
            ax.set(xlabel="Effort Spent", xlim=(0, np.max(results["effort"])))
            
            # Criticality Distribution
            if "criticality" in results:
                ax = fig.add_subplot(gs[2, 0] if has_varied_entrypoints else gs[2, :])
                sns.histplot(results["criticality"], stat="probability", palette=palette, ax=ax)
                ax.set(xlabel="Criticality")

            if has_varied_entrypoints:
                norm = mpl.colors.BoundaryNorm(np.linspace(0, N, N+1), cmap.N)
                sm = mpl.cm.ScalarMappable(cmap=cmap, norm=norm)
                fig.colorbar(sm, cax=fig.add_subplot(gs[:, 1]), label="Entrypoint",
                             ticks=np.arange(1, N+1))
            plt.tight_layout()
            plt.show()

# analyzer.plot_monte()
plot_monte(analyzer.res_monte,network)

#### Varied Parameter
Perform monte carlo simulation while varying particular parameter, such as the level of redundancy in the network. 

In [None]:
import os
import multiprocess as mp
N_ATTACKS = 1000
N_DEVICES = 30
BUDGET = 52
SPEC = p.cwd() / "specifications" / "SmartMeter_specifications.json" 
SEED = np.random.randint(low=0, high=52600)
N_ENTRYPOINTS = 1 # Total budget is multiplied by this!
MIN_CHILDREN = 2
MAX_CHILDREN = N_DEVICES
CHILD_NO_STEP = 2
CHILD_NO_DEVIATION = 0
no_of_children = np.arange(MIN_CHILDREN, MAX_CHILDREN, CHILD_NO_STEP)
network_specs = dict(n_devices=N_DEVICES,
                     n_entrypoints=N_ENTRYPOINTS,
                     network_specs=SPEC,
                     child_no_deviation=CHILD_NO_DEVIATION,
                     enable_sibling_to_sibling_comm=True)

compromised_array, effort_array = analyzer.monte_carlo_multi_analysis(seed, "children_per_parent", no_of_children, budget=BUDGET, n_attacks=N_ATTACKS, **network_specs)
analyzer.plot_monte()

### Static Analysis
Given an infinite budget, breaksdown the probability of compromising components in the network. The resulting probabilities are exact (except for floating point precision issues) but do not scale well to larger communication networks (> 5 nodes). Useful as a static feature of a communication network. 

In [None]:
n_probs = analyzer.static_analysis(show_paths=False, verbose=True)
analyzer.plot_static()

In [None]:
sns.barplot(n_probs)
plt.gca().set(xlabel="No. of nodes compromised", ylabel="Probability")
plt.gcf().savefig(p.cwd() / "media" / f"StaticAnalysis.pdf")

In [None]:
# Adjacency Matrix
# Does not handle self-loops / backtracking
# Consequently, probabilities will differ from combinatorial approach

def superscript(num:int):
    sup_map = {0: f"\N{SUPERSCRIPT ZERO}", 1: f"\N{SUPERSCRIPT ONE}", 2: f"\N{SUPERSCRIPT TWO}", 3: f"\N{SUPERSCRIPT THREE}", 4: f"\N{SUPERSCRIPT FOUR}", 
           5: f"\N{SUPERSCRIPT FIVE}",  6: f"\N{SUPERSCRIPT SIX}", 7: f"\N{SUPERSCRIPT SEVEN}", 8: f"\N{SUPERSCRIPT EIGHT}", 9: f"\N{SUPERSCRIPT NINE}"}
    return "".join(sup_map[digit] for digit in map(int, str(num)))

np.set_printoptions(precision=2, floatmode="maxprec_equal")
nodes = sorted(network.graph.nodes(), key=lambda node: node.id)
prob_lookup = [node.get_prob_to_compromise() for node in nodes]
print(f"Probabilities: {prob_lookup}")


A = nx.adjacency_matrix(network.graph, nodelist=nodes, weight="p").todense()
n_probs = {}
oldA = np.eye(A.shape[0])
for i in range(len(nodes)):
    newA = oldA@A
    # np.fill_diagonal(newA, val=0)
    print(f"A{superscript(i+1)}\n", newA)
    n_probs[i+1] = np.triu(newA, k=1).sum()
    print(f"A{superscript(i+1)}: {n_probs[i+1]}")
    oldA = newA

In [None]:
# Mutually Exclusive Approach
# Assumes you can jump and independently attack any node (i.e. ignores communication connections!)

time_required = 0.0
nodes = network.graph.nodes()
node_probs = {node: node.get_prob_to_compromise() for node in nodes}

n_probs = {}
all_nodes = set(nodes)
cumulative = 0.0
for n_devices in range(network.n_components, 0, -1):
    n_probs[n_devices] = cumulative
    for combination in itertools.combinations(nodes, n_devices):
        probability_to_compromise = 1.0
        combination = set(combination)
        missing_nodes = all_nodes.difference(combination)
        for node in combination:
            probability_to_compromise *= node_probs[node]
        for node in missing_nodes:
            probability_to_compromise *= (1 - node_probs[node])
        n_probs[n_devices] += probability_to_compromise 
    cumulative += n_probs[n_devices]
print("\n".join(f"{k} devices: {v}" for k,v in sorted(n_probs.items(), key=lambda item: item[0])))
print("Sum:", sum(n_probs.values()))

In [None]:
# If the probability of compromising all components is the same,
# we can use the Binomial distribution function
# Takes: 12.6 µs
N = network.n_components
k = 2
p = 0.5
cumulative = 0.0
for k in range(N, 0, -1):
    prob = math.comb(N, k)*math.pow(p, k)*math.pow(1-p,N-k)
    print(f"{k} Devices: {cumulative + prob}")
    cumulative += prob

In [None]:
import scipy.stats.distributions as distr
distr_lookup = {
    "TruncNorm": distr.truncnorm, # Continuous, loc=mean (float), scale=standard deviation (float)
    "Exponential": distr.expon, # Continuous, scale = 1 / lambda (float)
    "Gamma": distr.gamma, # Continuous, a = shape parameter (integer)
    "Bernoulli": distr.bernoulli, # Discrete
}
n_attacks = 20
is_successful = distr.bernoulli(0.5).rvs(size=n_attacks).astype(bool)
time_taken = distr.expon(scale=0.0).rvs(size=n_attacks)[is_successful]
print(f"Successful Attacks {sum(is_successful)}/{n_attacks}\nTime Taken per Successful Attack: {time_taken}")

## Communication Network Specifications
Explores how we can supply structured information to our procedural network generation algorithm. Includes information such as the types of components and defences we expect to see in the communication network.

In [None]:
# seed = np.random.randint(low=0, high=52600)
seed = 27194
print(f"Seed: {seed}")
np.random.seed(seed)
pcn = CommNetwork(n_devices=15, n_entrypoints=1, children_per_parent=5, child_no_deviation=1,
                  network_specs="SmartMeterNetworkSpecifications.json",
                  enable_sibling_to_sibling_comm=True)


## Power System Component Association

In [None]:
import inspect
import warnings
import numpy as np
import pandapower as pp
import pandapower.networks as grids
grid = pp.create_empty_network()
grid_filter = lambda module: inspect.isfunction(module) and not module.__name__.startswith("_")
grid_map = {grid_name:grid_creator for grid_name, grid_creator in \
            inspect.getmembers(grids, predicate=grid_filter)}
grid_options = list(grid_map.keys())
print(", ".join(grid_map.keys()))
CHOSEN_GRID = "mv_oberrhein" # "create_cigre_network_mv" # Can be None
kwargs = dict(scenario="generation", include_substations=True) #  dict(with_der="all")
with warnings.catch_warnings():
    warnings.filterwarnings(action="ignore", category=FutureWarning)
    grid_name = np.random.choice(grid_options) if CHOSEN_GRID is None else CHOSEN_GRID
    print(f"Grid: {grid_name}")
    grid = grid_map[grid_name](**kwargs)
    print(grid)
    # Controllable
    n_controllable = sum(getattr(grid, attr).shape[0] for attr in ["gen", "shunt", "trafo", "switch"])
    print(f"No. of controllable elements: {n_controllable} (generators, shunts, transformers and switches)")
    # Sensor-Only
    n_sensor_only = sum(getattr(grid, attr).shape[0] for attr in ["bus", "load", "line"])
    print(f"No. of sensor-only elements: {n_sensor_only} (buses, loads and lines)")
    print(f"Total (possible) no. of devices: {n_sensor_only+n_controllable} (generators, shunts, transformers and switches, buses, loads and lines)")

# grid.switch.closed = True
pp.plotting.simple_plot(grid, respect_switches=True, plot_line_switches=True, plot_loads=True, plot_gens=True, plot_sgens=True, )


In [None]:
seed = np.random.randint(low=0, high=52600)
np.random.seed(seed); random.seed(seed)
print(f"Seed: {np.random.get_state()[1][0]}")

with warnings.catch_warnings():
    warnings.filterwarnings(action="ignore", category=FutureWarning)
    grid = pandapower.networks.mv_oberrhein(scenario="generation") # pandapower.networks.case14()
    print(grid)
    pcn = CommNetwork(n_devices=30, n_entrypoints=1, children_per_parent=0, child_no_deviation=5, 
                    network_specs=p.cwd() / "specifications" / "SCADA_specifications.json", 
                    # "Default_specifications.json", "SmartMeter_specifications.json", "SCADA_specifications.json", "WAMS_specifications.json"
                    grid=grid,
                    enable_sibling_to_sibling_comm=True)
print(CommNetwork.show_tree(pcn.root))
print(f"Number of Components: {pcn.n_components}")

## Criticality

### Parameters of the node 

In [None]:
r_ohm = grid.line.loc[:, "r_ohm_per_km"] * grid.line.loc[:, "length_km"] * grid.line.loc[:, "parallel"] # Ohms
max_p = r_ohm * (grid.line.loc[:, "max_i_ka"] * 1000)
x_ohm = grid.line.loc[:, "x_ohm_per_km"] * grid.line.loc[:, "length_km"] * grid.line.loc[:, "parallel"] # Ohms
max_q = x_ohm * (grid.line.loc[:, "max_i_ka"] * 1000)
max_s = np.sqrt(np.power(max_p, 2)+np.power(max_q,2))
print(max_s) # VA

Z = (grid.line.loc[:, "r_ohm_per_km"] + 1j * grid.line.loc[:, "x_ohm_per_km"]) * (grid.line.loc[:, "length_km"]/grid.line.loc[:, "parallel"])
Y = (grid.line.loc[:, "g_us_per_km"]*math.pow(10, -6) + 1j*(2*np.pi*50)*grid.line.loc[:, "c_nf_per_km"]*math.pow(10, -9)) * (grid.line.loc[:, "length_km"] * grid.line.loc[:, "parallel"])
max_s = np.power(grid.line.loc[:, "max_i_ka"] * 1000, 2) * Z
print(np.abs(max_s) / math.pow(10, 6))

In [None]:
# line: capacity
# generator: size / output
# load: size / output
# bus: voltage level
# storage: max output
# switch:
# transformer: vn_hv_kv

def minmax(values):
    return np.nan_to_num((values - values.min()) / (values.max() - values.min()), nan=0)

def meannorm(values):
    return np.nan_to_num((values - values.mean()) / (values.max() - values.min()), nan=0)

def znorm(values):
    return np.nan_to_num((values - values.mean()) / values.std(), nan=0)

def criticality_by_size(grid:pandapower.pandapowerNet, norm:[minmax,meannorm,znorm]=None):
    criticality = {}
    for kind, attribute in zip(["sgen", "gen", "storage", "load", "trafo", "line", "bus"], 
                               ["sn_mva", "sn_mva", "sn_mva", "sn_mva", "sn_mva", "max_i_ka", "vn_kv"]):
        values = getattr(getattr(grid, kind), attribute)
        criticality[kind] = values / values.min() if norm is None else norm(values)
    return criticality

def criticality_by_degree(grid:pandapower.pandapowerNet, degree=nx.degree, norm:[minmax,meannorm,znorm]=None):
    """
    Args:
        grid (pandapower.pandapowerNet): Power grid representation in PandaPower
        degree (function): Accepts Networkx graph as input and returns the degree of the node
        norm ([minmax,meannorm,znorm]): Normalization function to use for remapping the degree values
    Returns:
        dict<str:np.ndarray>: 1D array containing criticality values per PandaPower element type
        float: Lowest criticality value encountered 
        float: Highest criticality value encountered
    """
    criticality = {}
    G = pandapower.topology.create_nxgraph(grid, include_out_of_service=True, include_lines=True, respect_switches=True,
                                       include_impedances=False, include_dclines=True, include_trafos=True, include_trafo3ws=True)
    bus_to_degree = degree(G)
    degree = np.array([value for _, value in sorted(bus_to_degree, key=lambda x:x[0])])
    if norm is not None:
        degree = norm(degree)
        bus_to_degree = {i:val for i,val in enumerate(degree)}
    criticality["bus"] = degree # np.array([bus_to_degree[i] for i in G.nodes])
    for kind in ["sgen", "gen", "storage", "load", "trafo", "line"]:
        df = getattr(grid, kind)
        if hasattr(df, "bus"):
            criticality[kind] = getattr(df, "bus").map(bus_to_degree)
        elif hasattr(df, "from_bus"):
            criticality[kind] = getattr(df, "from_bus").map(bus_to_degree).add(getattr(df, "to_bus").map(bus_to_degree))
        elif hasattr(df, "hv_bus"):
            criticality[kind] = getattr(df, "hv_bus").map(bus_to_degree).add(getattr(df, "lv_bus").map(bus_to_degree))
    return criticality, np.min(degree), np.max(degree)

def criticality_by_power_flow(grid:pandapower.pandapowerNet, verbose:bool=True):
    """
    Determines the criticality of each component based on the apparent power flow. Requires a single
    power flow to be performed first. The apparent power is used as a proxy for criticality. Since it
    is based on a power flow, it only holds for that particular state of the power grid.
    It is assumed that greater apparent power corresponds to greater criticality. 

    Args:
        grid (pandapower.pandapowerNet): Power grid with valid parameters defined to perform power flow simulation.
        verbose (bool): Whether to print additional information

    Returns:
        dict<str:np.ndarray>: 1D array containing criticality values per PandaPower element type
        float: Lowest criticality value encountered 
        float: Highest criticality value encountered
    """
    criticality = {}
    if grid.res_bus.shape[0] == 0:
        if verbose: print("Running Power Flow")
        pandapower.runpp(grid, algorithm="nr", calculate_voltage_angles=True, init="dc", trafo_model="t", trafo_loading="power",
                        enforce_q_lims=True, voltage_depend_loads=True, numba=True, consider_line_temperature=False)

    def get_apparent_power(grid, attr):
        df = getattr(grid, f"res_{attr}") if hasattr(grid, f"res_{attr}") else getattr(grid, attr)
        if "p_mw" in df.columns:
            active_power = df.loc[:, "p_mw"]
            reactive_power =  df.loc[:, "q_mvar"]
            apparent_power = np.sqrt(np.power(active_power, 2) + np.power(reactive_power, 2))
        elif attr == "switch":
            apparent_power = np.zeros(grid.switch.shape[0])
            apparent_line_power = get_apparent_power(grid, "line")
            apparent_trafo_power = get_apparent_power(grid, "trafo")
            for idx, row in grid.switch.iterrows():
                if row.et == "l": # Line
                    apparent_power[idx] = apparent_line_power[row.element]
                elif row.et == "t": # Transformer
                    apparent_power[idx] = apparent_trafo_power[row.element]
        elif attr in ["measurement", "trafo3w"]:
            if verbose: print(f"'{attr}' apparent power calculation not supported")
            apparent_power = pd.Series([])
        else:
            start = "hv" if "trafo" in attr else "from"
            end = "lv" if "trafo" in attr else "to"
            active_power = np.max(np.abs(df.loc[:, [f"p_{start}_mw", f"p_{end}_mw"]]), axis=1)
            reactive_power = np.max(np.abs(df.loc[:, [f"q_{start}_mvar", f"q_{end}_mvar"]]), axis=1)
            apparent_power = np.sqrt(np.power(active_power, 2) + np.power(reactive_power, 2))
        return apparent_power

    lowest, highest = math.inf, -math.inf
    for attr in pandapower.pp_elements():
        apparent_power = get_apparent_power(grid, attr)
        low, high = np.min(apparent_power), np.max(apparent_power)
        lowest = low if low < lowest else lowest
        highest = high if high > highest else highest
        criticality[attr] = apparent_power
    return criticality, lowest, highest

def criticality_by_capacity(grid):
    criticality = {}
    for attr in pandapower.pp_elements():
        df = getattr(grid, attr)
        if "sn_mva" in df.columns:
            criticality[attr] = df.sn_mva
        elif attr == "bus":
            criticality[attr] =  np.zeros(shape=df.shape[0])
            for other_attr in pandapower.pp_elements():
                other_df = getattr(grid, other_attr)
                if hasattr(other_df, "bus") and hasattr(other_df, "sn_mva"):
                    added_power = other_df.groupby("bus").sn_mva.sum()
                    criticality[attr][added_power.index] = added_power
                elif hasattr(other_df, "bus"):
                    print(other_attr)
        else: # Not Implemented
            # print(attr)
            criticality[attr] = np.zeros(shape=df.shape[0])
        # if hasattr(df, "bus"):
        #     print(attr)
    return criticality

# crit = criticality_by_degree(grid, norm=None)
# crit = criticality_by_size(grid, norm=minmax)
# crit = criticality_by_power_flow(grid)
# crit = criticality_by_capacity(grid)

In [None]:
import warnings
import matplotlib
from collections import Counter
from pandapower.plotting import draw_collections, create_bus_collection, create_line_collection, \
                                create_trafo_collection, create_line_switch_collection, \
                                create_sgen_collection, create_load_collection, create_ext_grid_collection
from matplotlib import colormaps
from matplotlib.colors import Normalize
from matplotlib.lines import Line2D
from visualization.patches import ElectricalPatchHandler, ElectricalPatchMaker

def modal_value(lst):
    data = Counter(lst)
    return data.most_common(1)[0][0]


fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(12,12))
grid = network.grid
N = len(np.unique(grid.bus.vn_kv))
cmap_list = [(volt) for volt in np.unique(grid.bus.vn_kv)]
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", "", category=FutureWarning)

    bus_patch = Line2D([0], [0], marker="8", markersize=10, markerfacecolor="white",
                       markeredgecolor="black", color="None")
    colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
    bus_pc = create_bus_collection(grid, zorder=10,
        size=0.2, patch_type="poly8", 
        infofunc=lambda id:grid.bus.loc[id].name,
        # cbar_title="Criticality",
        #linewidth=2, # color=["black"]*len(grid.bus))
        #cmap="jet",# norm=Normalize(vmin=0, vmax=grid.bus.vn_kv.max()*1.1),
        z=crit["bus"])
    print({i:val for i,val in enumerate(crit["bus"])})
    bus_coords = list(zip(grid.bus_geodata.loc[:, "x"].values, grid.bus_geodata.loc[:, "y"].values))

    cmap = matplotlib.colormaps['jet'] # Note: Assumes values are normalized between 0 and 1!
    ext_patch = ElectricalPatchMaker(symbol="ext_grid", x0=8, size=10, lw=2, fc="white", ec="black")
    ext_grid_pc = create_ext_grid_collection(grid, size=0.3)
    
    trafo_patch = ElectricalPatchMaker(symbol="trafo", x0=12, y0=4, size=5, lw=2, fc="white", ec="black")
    trafo_pc = create_trafo_collection(grid, zorder=9,
        size=0.2, color=cmap(crit["trafo"]),
        infofunc=lambda id:grid.bus.loc[id].name)
    
    switch_patch = ElectricalPatchMaker(symbol="switch", x0=8, size=10, lw=2, fc="white", ec="black")
    line_switch_pc = create_line_switch_collection(grid, zorder=8,
        size=0.2, distance_to_bus=0.5)
    
    gen_patch = ElectricalPatchMaker(symbol="sgen", x0=12, y0=4, size=10, lw=1, fc="white", ec="black")
    gen_pc = create_sgen_collection(grid, zorder=9, linewidth=1,
        size=0.2, color="black", patch_facecolor="white", patch_edgecolor=cmap(crit["sgen"]), orientation=0)

    load_patch= ElectricalPatchMaker(symbol="load", size=10, x0=12, lw=1, fc="white", ec="black")
    load_pc = create_load_collection(grid, zorder=8, linewidth=1,
        size=0.2, color="black", patch_facecolor="white", patch_edgecolor=cmap(crit["load"]), orientation=np.pi)
    
    line_pc = create_line_collection(grid, zorder=0,
        use_bus_geodata=True, color="black")
    
    legend_map = {"Bus":bus_patch, "Generator":gen_patch, "Load":load_patch, "Transformer":trafo_patch,
                  "External Grid":ext_patch, "Switch":switch_patch}

draw_collections([bus_pc, ext_grid_pc, trafo_pc, line_switch_pc, gen_pc, load_pc, line_pc],
                 ax=ax, plot_colorbars=True, set_aspect=True, axes_visible=(True, True))

labels, handles = zip(*sorted(zip(*(legend_map.keys(), legend_map.values())), key=lambda t: t[0]))
fig.legend(labels=labels, handles=handles, loc="lower center", bbox_to_anchor=(0.5, -0.1), ncol=1 + (len(labels) // 3),
           handler_map={trafo_patch:ElectricalPatchHandler(), ext_patch:ElectricalPatchHandler(), switch_patch:ElectricalPatchHandler(),
                        gen_patch:ElectricalPatchHandler(), load_patch:ElectricalPatchHandler()},
           title="Legend", fancybox=True, fontsize='large', title_fontsize='larger')
ax.axis("off")
plt.show()