In [None]:
import os
import re
import sys
import json
import networkx as nx
import numpy as np

powerfactory_path = r'C:\Program Files\DIgSILENT\PowerFactory 2024 SP1\Python\3.10'
if powerfactory_path not in sys.path:
    sys.path.append(powerfactory_path)
import powerfactory as pf

if '..' not in sys.path:
    sys.path.append('..')
from pfcommon import make_full_object_name, get_objects

Start PowerFactory and activate the project:

In [None]:
project_name = '\\Terna_Inerzia\\V2020_Rete_Sardegna_2021_06_03cr'
app = pf.GetApplication()
if app is None:
    raise Exception('Cannot get PowerFactory application')
print('Got PowerFactory application.')
err = app.ActivateProject(project_name)
if err:
    raise Exception(f'Cannot activate project {project_name}')
print(f'Activated project "{project_name}".')

Collect all (in service) sites, lines, terminals (i.e., buses), and loads of the network:

In [None]:
sites = get_objects(app, 'ElmSite', sort=True, keep_out_of_service=False)
n_sites = len(sites)
lines = get_objects(app, 'ElmLne', sort=True, keep_out_of_service=False)
n_lines = len(lines)
obj_types = 'ElmTerm','ElmLod','ElmSym'
obj_type_names = {'ElmTerm': 'terminal', 'ElmLod': 'load', 'ElmSym': 'SM'}
OBJECTS = {typ: get_objects(app, typ, sort=True, keep_out_of_service=typ=='ElmSym') for typ in obj_types}

print(f'Found {n_sites} sites.')
print(f'Found {n_lines} lines.')
for typ in obj_types:
    print('Found {} {}s.'.format(len(OBJECTS[typ]), obj_type_names[typ]))

Here we define two classes that will be used in the following:
1. `SiteNode` contains information about a site and will be a node in the graph.
1. `LineSiteEdge` contains information about the line connecting two sites.

In [None]:
class SiteNode (object):
    def __init__(self, site):
        self.name = site.loc_name
        self.full_name = make_full_object_name(site)
        self._coords = [site.GPSlat, site.GPSlon]
        self._lat,self._lon = self._coords
    @property
    def coords(self):
        return self._coords
    @coords.setter
    def coords(self, c):
        assert len(c) == 2
        self._coords = np.squeeze(np.array(c))
        self._lat,self._lon = self._coords
    @property
    def lat(self):
        return self._lat
    @property
    def lon(self):
        return self._lon
    def __eq__(self, o):
        if o is None:
            return False
        return self.full_name == o.full_name and self.lat == o.lat and self.lon == o.lon
    def __hash__(self):
        return hash(repr(self))

class LineSiteEdge (object):
    def __init__(self, line, sites, nodes):
        self.name = line.loc_name
        self.full_name = make_full_object_name(line)
        self.coords = line.GPScoords
        self.length = line.dline if line.HasAttribute('dline') else 1e-3
        self.vrating = line.typ_id.uline
        self.terminals = [line.bus1.cterm, line.bus2.cterm]
        self.nodes = [None,None]
        for site in sites:
            for subst in site.GetContents():
                for obj in subst.GetContents():
                    try:
                        idx = self.terminals.index(obj)
                        self.nodes[idx] = nodes[site.loc_name]
                    except:
                        pass
                    if self.nodes.count(None) == 0:
                        break
                else:
                    continue
                break
            else:
                continue
            break
        self.node1,self.node2 = self.nodes

This dictionary contains a mapping between site name and the objects it contains (i.e., all objects contained in every substation of the site):

In [None]:
site_contents = {site.loc_name: [obj for subst in site.GetContents() for obj in subst.GetContents()] for site in sites}

Instantiate all nodes and edges (this takes a while):

In [None]:
nodes = {site.loc_name: SiteNode(site) for site in sites}
edges = [LineSiteEdge(line, sites, nodes) for line in lines]
n_nodes = len(nodes)
n_edges = len(edges)

Build a graph using all the nodes' and edges' information, and make sure it is connected:

In [None]:
G = nx.MultiGraph()
for e in edges:
    G.add_edge(e.node1, e.node2, coords=e.coords, length=e.length, vrating=e.vrating, label=e.name)
assert n_nodes == len(G.nodes)
assert n_edges == len(G.edges)
assert nx.is_connected(G)

print('No. of nodes: {}'.format(n_nodes))
print('No. of edges: {}'.format(n_edges))
if nx.is_connected(G):
    print('The graph is connected.')
else:
    print('The graph is not connected. No. of connected components: {}'.format(nx.number_connected_components(G)))

Add GPS coordinates to those nodes that don't have them. To do that, we assign to a node either:
1. the coordinates of one of the endpoints of a line it is connected to; or,
1. the coordinates of the closest node (i.e., site).

Notice that in the code below we perform more than one pass, since a node might have connected neighbors with GPS coordinates.

In [None]:
n_nodes_with_coords = 0
for node in G.nodes:
    if node.lat > 0:
        n_nodes_with_coords += 1
print(f'{n_nodes_with_coords}/{n_nodes} nodes have GPS coordinates.')
while n_nodes_with_coords < n_nodes:
    for edge in G.edges:
        data = G.get_edge_data(*edge)
        coords = data['coords']
        line_name = data['label']
        loc1,loc2 = line_name[:6],line_name[6:12]
        node1,node2 = edge[:2] if edge[0].name==loc1 else edge[1::-1]
        if len(coords) > 0:
            if node1.lat == 0 or (hasattr(node1,'dst') and node1.dst > 0):
                node1.coords = coords[0]
                if hasattr(node1,'dst'):
                    node1.dst = 0
                else:
                    n_nodes_with_coords += 1
            elif abs(node1.lat-coords[0][0]) > 1e-1 or abs(node1.lon-coords[0][1]) > 1e-1:
                print('{} [1]: ({:6.3f},{:6.3f}) != ({:6.3f},{:6.3f})'.\
                      format(node1.name, node1.lat,node1.lon,coords[0][0],coords[0][1]))
            if node2.lat == 0 or (hasattr(node2,'dst') and node2.dst > 0):
                node2.coords = coords[-1]
                if hasattr(node2,'dst'):
                    node2.dst = 0
                else:
                    n_nodes_with_coords += 1
            elif abs(node2.lat-coords[-1][0]) > 1e-1 or abs(node2.lon-coords[-1][1]) > 1e-1:
                print('{} [2]: ({:6.3f},{:6.3f}) != ({:6.3f},{:6.3f})'.\
                      format(node2.name,node2.lat,node2.lon,coords[-1][0],coords[-1][1]))
        elif node1.lat != 0 and node2.lat == 0 and (not hasattr(node2,'dst') or data['length'] < node2.dst):
            node2.coords = [node1.lat, node1.lon]
            node2.dst = data['length']
            n_nodes_with_coords += 1
        elif node2.lat != 0 and node1.lat == 0 and (not hasattr(node1,'dst') or data['length'] < node1.dst):
            node1.coords = [node2.lat, node2.lon]
            node1.dst = data['length']
            n_nodes_with_coords += 1
    print(f'{n_nodes_with_coords}/{n_nodes} nodes have GPS coordinates.')

Build an `INFO` dictionary with information about sites, lines and all the objects contained in the `OBJECTS` dictionary:

In [None]:
sort_dict_keys = lambda D: {k: D[k] for k in sorted(D.keys())}

def find_containing_site(obj, site_contents):
    site_names = [k for k,v in site_contents.items() if obj in v]
    assert len(site_names) == 1
    return site_names[0]

INFO = {
    'ElmSite': {name: {'coords': [node.lat,node.lon]} for name,node in nodes.items()},
    'ElmLne': {edge.name: {'coords': edge.coords, 'vrating': edge.vrating} for edge in edges}
}
for k,v in INFO['ElmSite'].items():
    v['contents'] = [obj.loc_name for obj in site_contents[k]]
for typ in obj_types:
    INFO[typ] = {}
    for obj in OBJECTS[typ]:
        site_name = find_containing_site(obj, site_contents)
        lat,lon = INFO['ElmSite'][site_name]['coords']
        INFO[typ][obj.loc_name] = {'coords': [lat,lon]}
        if typ == 'ElmLod':
            INFO[typ][obj.loc_name]['P'] = obj.plini
            INFO[typ][obj.loc_name]['Q'] = obj.qlini
        elif typ == 'ElmSym':
            INFO[typ][obj.loc_name]['S'] = obj.typ_id.sgn
            INFO[typ][obj.loc_name]['H'] = obj.typ_id.h
    INFO[typ] = sort_dict_keys(INFO[typ])
outfile = os.path.join('..',project_name.split(os.path.sep)[-1] + '_FULL_INFO.json')
json.dump(INFO, open(outfile,'w'), indent=4)

In [None]:
import matplotlib.pyplot as plt

fig,ax = plt.subplots(1, 1, figsize=(4,6))
for edge in G.edges:
    data = G.get_edge_data(*edge)
    coords = np.array(data['coords'])
    lw = data['vrating'] / 100
    if coords.shape[0] > 0:
        x,y = coords[:,1],coords[:,0]
    else:
        x = [edge[0].lon, edge[1].lon]
        y = [edge[0].lat, edge[1].lat]
    ax.plot(x, y, 'k', lw=lw)
    if coords.shape[0] > 0:
        ax.plot(x[0], y[0], 'g.')
        ax.plot(x[-1], y[-1], 'r.')
for node in G.nodes:
    ax.plot(node.lon, node.lat, 'ko', markerfacecolor='w', markersize=5, markeredgewidth=1.5)
plt.axis('off')
fig.tight_layout()