In [1]:
import sympy as sm

import networkx as nx

import matplotlib.pyplot as plt

import pprint
from pprint import pprint

import numpy as np
from numpy.linalg import det

import math

import copy
from copy import deepcopy

import json

import itertools

In [2]:
def disp_error_plot(errors=[], fig_size=10):
    
    plt.figure(figsize=(fig_size, fig_size))
    
    plt.plot(range(len(errors)), errors)
    plt.show()

In [3]:
def disp_graph_with_custom_labels(gr=None, 
                                  nodes_labels={}, 
                                  edges_labels={},
                                  fig_size=8,
                                  node_label=None, 
                                  edge_label=None):
    plt.figure(figsize=(fig_size, fig_size))
    
    if not edge_label == None and not edge_label == '':
        edges_labels = nx.get_edge_attributes(G=gr, name=edge_label)
        
    if not node_label == None and not node_label == '':
        nodes_labels = nx.get_node_attributes(G=gr, name=node_label)

    layout = nx.shell_layout(gr)
    
    nx.draw(G=gr, pos=layout, node_size=1500)
    nx.draw_networkx_edge_labels(G=gr, 
                                 pos=layout, 
                                 edge_labels=edges_labels, 
                                 font_family='sans-serif')
    nx.draw_networkx_labels(G=gr, 
                            pos=layout, 
                            labels=nodes_labels, 
                            font_family='sans-serif')
    plt.show()

In [29]:
class GPA:
    
    def __init__(self, params={}):
        self.gpa = nx.DiGraph()
        self.N_v = params['nodes']['count']
        self.N_e = params['edges']['count']
        
        self.known_p_nodes = list()
        self.known_q_nodes = list()
        self.unknown_p = list()
        self.unknown_q = list()
        
        self.knowns = dict()
        
        self.order = list()
        
        self.nodes_info = params['nodes']['nodes_info']
        self.edges_info = params['edges']['edges_info']
        
        self.variables = list()
        self.var_literals = list()
        
        self._read_nodes(self.nodes_info)
        self._read_edges(self.edges_info)
        
        self._init_matrixes()
        self._init_equations()
        self._init_jacobi_matrix()
        
        self.solving_errors = list()
        
    
    def _read_nodes(self, node_part={}):
        
        self.nodes_numeration = dict()
        node_index = 0
        for node, value in sorted(node_part.items(), key=lambda x: x[0]):
            
            self.nodes_numeration[node] = node_index
            node_index += 1
            
            name = value['name']
            P_info = value['P']
            Q_info = value['Q']
            
            P = P_info['P']
            P_max = P_info['P_max']
            P_min = P_info['P_min']
            
            Q = Q_info['Q']
            Q_max = Q_info['Q_max']
            Q_min = Q_info['Q_min']
            
            
            var_p = sm.symbols('p' + str(node))
            var_q = sm.symbols('q' + str(node))
            
            self.unknown_p.append(var_p)
            
            if P == None:
                P = sm.symbols('p' + str(node))
                self.variables.append(P)
                self.var_literals.append(str(P))
            else:
                self.knowns[str(var_p)] = P
                self.known_p_nodes.append(node)
                
            if Q == None:
                Q = sm.symbols('q' + str(node))
                self.unknown_q.append(Q)
            else:
                self.knowns[str(var_q)] = Q
                self.known_q_nodes.append(node)
            
            self.gpa.add_node(node, P=P, P_min=P_min, 
                              P_max=P_max, Q=Q, Q_min=Q_min, Q_max=Q_max, name=name, var=var_p)
            
        self.nodes = list(self.gpa.nodes())
        
        for node in self.known_q_nodes + self.known_p_nodes:
            self.order.append(self.nodes.index(node))
            
        tmp_p = list()
        for node in self.known_p_nodes:
            tmp_p.append(self.nodes.index(node))
        
        tmp_q = list()
        for node in self.known_q_nodes:
            tmp_q.append(self.nodes.index(node))
        
        self.known_p_nodes, self.known_q_nodes = tmp_p, tmp_q
            
    def _read_edges(self, edge_part={}):
        internal_nodes = np.zeros((2, self.N_v))
        
        for node_index, value in sorted(edge_part.items(), key=lambda v: int(v[0][0])):
            u = value['u']
            v = value['v']
            A = value['A']
            x = value['x']
            
            u_index = self.nodes_numeration[u]
            v_index = self.nodes_numeration[v]
            
            internal_nodes[0, u_index] += 1
            internal_nodes[1, v_index] += 1
            
            if x == None:
                x = sm.symbols('x' + str(node_index))
                self.variables.append(x)
                self.var_literals.append(str(x))
            
            self.gpa.add_edge(u_of_edge=u, v_of_edge=v, A=A, x=x)
            
        internal_nodes = np.logical_and(internal_nodes[0, :], internal_nodes[1, :])
        self.internal_nodes = set()
        for node_index, is_internal in enumerate(internal_nodes):
            if is_internal:
                self.internal_nodes.add(str(node_index + 1))
           
        
    def _init_matrixes(self):
        self.X = sm.zeros(rows=self.N_e, cols=1)
        for edge in self.gpa.edges():
            edge_index = list(self.gpa.edges()).index(edge)
            edge_params = self.gpa.edges[edge]

            self.X[edge_index] = edge_params['x']
        
        self.X = np.array(self.X)
        self.A = np.array(nx.incidence_matrix(self.gpa, oriented=True).todense())
        self.Q = sm.Matrix(np.dot(self.A, self.X))
        
        self.P = sm.zeros(rows=self.N_v, cols=1)
        for node in self.gpa.nodes():
            node_index = list(self.gpa.nodes()).index(node)
            node_params = self.gpa.nodes[node]
            
            self.P[node_index] = node_params['P']

        self.X = sm.Matrix(self.X)
        self.Q = sm.Matrix(self.Q)
        self.A = sm.Matrix(self.A)
        
        self.AF = deepcopy(self.A)
        
        for row in range(self.AF.shape[0]):
            for col in range(self.AF.shape[1]):
                if self.AF[row, col] == -1:
                    self.AF[row, col] = 0
        
        self.AL = -(self.A - self.AF)
        
        self.DF = sm.zeros(len(self.gpa.edges()), 1)
        self.DL = sm.zeros(len(self.gpa.edges()), 1)
        
        for eq_index, edge in enumerate(self.gpa.edges()):
            u = edge[0]
            v = edge[1]
            
            p_s = self.gpa.nodes[u]['var']
            p_f = self.gpa.nodes[v]['var']
            
            eq = p_s ** 2 - p_f ** 2
            
            self.DF[eq_index] = sm.diff(eq, p_s).subs(self.knowns.items())
            self.DL[eq_index] = -sm.diff(eq, p_f).subs(self.knowns.items())
        
        self.DF = sm.diag(*self.DF)
        self.DL = sm.diag(*self.DL)
        
    def _init_equations(self):
        edges = self.gpa.edges()
        nodes = self.gpa.nodes()

        self.equations = list()
        
        pos = 0
        
        for u, v in sorted(self.gpa.edges(), key=lambda x: int(x[0])):
            P_s = nodes[u]['P']
            P_f = nodes[v]['P']
            Q = edges[(u, v)]['x']
            A = edges[(u, v)]['A']

            eq = P_s ** 2 - P_f ** 2 - A * Q ** 2
            
            self.equations.append(eq)
        
        for el in self.internal_nodes:
            eq_index = list(self.gpa.nodes()).index(el)
            eq = deepcopy(self.Q[eq_index])
            
            self.equations.append(eq)
            self.Q[eq_index] = self.gpa.nodes[el]['Q']
        self.equations = sm.Matrix(self.equations)
        
    
    def _init_jacobi_matrix(self):
        self.Jacobi_matrixes = dict()
        self.Jacobi = sm.zeros(len(self.equations), len(self.variables))
        for eq_index, eq in enumerate(self.equations):
            for var_index, var in enumerate(self.variables):        
                eq_d = sm.diff(eq, var)
                self.Jacobi[eq_index, var_index] = eq_d
                
        for var_index, var in enumerate(self.variables):
            jacobi_var = deepcopy(self.Jacobi)
            jacobi_var[:, var_index] = self.equations
            self.Jacobi_matrixes[str(var)] = jacobi_var
            
            
    def _solve_step(self, approximation):
        solve_system = dict()
        for var, jacobi_var in self.Jacobi_matrixes.items():
            jacobi = deepcopy(self.Jacobi)
            
            jacobi = jacobi.subs(approximation.items()).det()
            jacobi_var = jacobi_var.subs(approximation.items()).det()
            
            solve_system[var] = jacobi_var / jacobi
            
        for var, value in solve_system.items():
            approximation[var] = approximation[var] - value
        
        return approximation

    
    def subs_approximation(self, approximation):
        for node in self.gpa.nodes():
            node_info = self.gpa.nodes[node]
            for var, value in approximation.items():
                if str(node_info['P']) == var:
                    node_info['P'] = value

        for edge in self.gpa.edges():
            edge_info = self.gpa.edges[edge]
            for var, value in approximation.items():
                if str(edge_info['x']) == var:
                    edge_info['x'] = value
    
    
    def solve_equations(self, start_approximation=None, eps=0.001, iterations=25):
        
        if not start_approximation:
            raise KeyError('Начальное приближение не задано!')
        
        start_approximation_vars = list(start_approximation.keys())
        
        for var in self.var_literals:
            if not var in start_approximation_vars:
                raise KeyError('Не задано начальное значение для ' + str(var))
        
        approximation = start_approximation
        errors = list()
        
        for _ in range(iterations):
            approximation = self._solve_step(approximation)
            error = deepcopy(self.equations.subs(approximation.items())).norm()
            errors.append(error)
            
            if error < eps:
                break
        
        points = -int(math.log10(eps))
        
        for var in approximation:
            approximation[var] = round(approximation[var], points)
        
        self.solve = approximation
        self.solving_errors = errors
        self.subs_approximation(self.solve)
        
        return approximation
    
    
    def construct_sense_matrix(self):
        self.p_var = self.P[self.known_q_nodes, :]
        self.p_fix = self.P[self.known_p_nodes, :]
        self.q_var = self.Q[self.known_p_nodes, :]
        self.q_fix = self.Q[self.known_q_nodes, :]
        
        aq = self.A[self.known_q_nodes, :]
        ap = self.A[self.known_p_nodes, :]
        
        aqf = self.AF[self.known_q_nodes, :]
        apf = self.AF[self.known_p_nodes, :]
        
        aql = self.AL[self.known_q_nodes, :]
        apl = self.AL[self.known_p_nodes, :]
        
        df = self.DF
        dl = self.DL
        
        self.M = sm.Matrix(sm.MatMul(aq, sm.Matrix(sm.MatAdd(sm.Matrix(sm.MatMul(df, sm.Matrix(aqf.T))),
                                                        sm.Matrix(sm.MatMul(dl, sm.Matrix(aql.T)))))))
        
        self.M_PQ = sm.Matrix(sm.MatMul(ap, sm.Matrix(sm.MatAdd(sm.Matrix(sm.MatMul(df, sm.Matrix(aqf.T))),
                                                           sm.Matrix(sm.MatMul(dl, sm.Matrix(aql.T)))))))
        
        self.M_QP = sm.Matrix(sm.MatMul(aq, sm.Matrix(sm.MatAdd(sm.Matrix(sm.MatMul(df, sm.Matrix(apf.T))), 
                                                 sm.Matrix(sm.MatMul(dl, sm.Matrix(apl.T)))))))
        
        self.M_PP = sm.Matrix(sm.MatMul(ap, sm.Matrix(sm.MatAdd(sm.Matrix(sm.MatMul(df, sm.Matrix(apf.T))), 
                                                           sm.Matrix(sm.MatMul(dl, sm.Matrix(apl.T)))))))
        

In [42]:
path = 'models/gpa_1.json'

with open(path, 'r') as fp:
    gpa1 = json.load(fp)

In [43]:
path = 'models/gpa_2.json'

with open(path, 'r') as fp:
    gpa2 = json.load(fp)

In [44]:
g1 = GPA(gpa1)
g1.construct_sense_matrix()

In [45]:
g2 = GPA(gpa2)
g2.construct_sense_matrix()

In [48]:
pprint(g1.M)
pprint(g2.M)

Matrix([[-4.0]])
Matrix([
[-2.0*p3 - 2.0*p4 + 16.0, -2.0*p2, -2.0*p2],
[                 2.0*p3,  2.0*p2,       0],
[                 2.0*p4,       0,  2.0*p2]])


In [49]:
pprint(g1.M_PQ)
pprint(g2.M_PQ)

Matrix([
[-16.0],
[ 10.0],
[ 10.0]])
Matrix([[-16.0, 0, 0]])


In [50]:
pprint(g1.M_QP)
pprint(g2.M_QP)

Matrix([[2.0*p2, -2.0*p2, -2.0*p2]])
Matrix([
[2.0*p2],
[     0],
[     0]])


In [51]:
pprint(g1.M_PP)
pprint(g2.M_PP)

Matrix([
[-2.0*p2,      0,      0],
[      0, 2.0*p2,      0],
[      0,      0, 2.0*p2]])
Matrix([[-2.0*p2]])


In [36]:
g1.p_var

Matrix([[p2]])

In [37]:
g1.p_fix

Matrix([
[8],
[5],
[5]])

In [38]:
g1.q_var

Matrix([
[-1.0*x1],
[ 1.0*x2],
[ 1.0*x3]])

In [39]:
g1.q_fix

Matrix([[0]])