In [1]:
import numpy as np
from numpy import linalg as LA

We start with building the linearized voltage dynmiacs model.

In [2]:
# Build the adjecent matrix for IEEE 13-bus sytem (except the root bus):
A = -np.eye(12)
A[1,0]=1
A[2,0]=1
A[3,0]=1
A[4,1]=1
A[5,2]=1
A[6,2]=1
A[7,2]=1
A[8,3]=1
A[9,5]=1
A[10,5]=1
A[11,7]=1
# how this adjecent matrix is built:
# lines(rows):0-1,1-2,1-5,1-3,2-4,5-7,5-8,5-9,3-6,7-10,7-11,9-12
# buses (columns):1,2,5,3,4,7,8,9,6,10,11,12

# Impedence values for each lines
X = np.diag([0.3856,0.1276,0.3856,0.1119,0.0765,0.0771,0.1928,0.0423,0.1119,0.0766,0.0766,0.0423])

F = -np.linalg.inv(A)
X =2*F@X@F.T
# Thus the linearized power flow equation is
# v = Xq + v_{env}, note here v is squared voltage magniture.

# Data generation

In [3]:
np.min(np.linalg.eigvals(X))
# X is a positive definite matrix

0.025732389209492255

In [4]:
# Obviously, v = Xq + v_{env} is monotone w.r.t. q
# For simplicity, we set v_{env} to one.
q_data = (np.random.rand(12,10000).astype(np.float32) - 0.5)*0.2 #range [-0.1,0.1]
v_data = X@q_data + 1
print(v_data.T.shape)

(10000, 12)


# Build the monotone models

In [5]:
# Monotone Neural Network, Codes from paper 'Monotone, Bi-Lipschitz, and Polyak-{\L}ojasiewicz Networks'
import torch
import math
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from typing import Sequence, Callable

def cayley(W: torch.Tensor) -> torch.Tensor:
    cout, cin = W.shape
    if cin > cout:
        return cayley(W.T).T
    U, V = W[:cin, :], W[cin:, :]
    I = torch.eye(cin, dtype=W.dtype, device=W.device)
    A = U - U.T + V.T @ V
    iIpA = torch.inverse(I + A)
    return torch.cat((iIpA @ (I - A), -2 * V @ iIpA), axis=0)

class MonLipNet(nn.Module):
    def __init__(self,
                 features: int,
                 unit_features: Sequence[int],
                 mu: float = 0.1, #lower bound for monotonicity
                 nu: float = 10., #upper bound for monotonicity
                 act_fn: Callable = None):
        super().__init__()
        use_cuda = torch.cuda.is_available()
        self.device   = torch.device("cuda" if use_cuda else "cpu")
        self.mu = mu
        self.nu = nu
        self.units = unit_features
        self.act_fn = act_fn
        self.Fq = nn.Parameter(torch.empty(sum(self.units), features)).to(self.device)
        nn.init.xavier_normal_(self.Fq)
        self.fq = nn.Parameter(torch.empty((1,))).to(self.device)
        nn.init.constant_(self.fq, self.Fq.norm())
        self.by = nn.Parameter(torch.zeros(features)).to(self.device)
        Fr, fr, b = [], [], []
        if act_fn is not None:
            d = []
        nz_1 = 0
        for nz in self.units:
            R = nn.Parameter(torch.empty((nz, nz+nz_1))).to(self.device)
            nn.init.xavier_normal_(R)
            r = nn.Parameter(torch.empty((1,))).to(self.device)
            nn.init.constant_(r, R.norm())
            Fr.append(R)
            fr.append(r)
            b.append(nn.Parameter(torch.zeros(nz)))
            if act_fn is not None:
                d.append(nn.Parameter(torch.zeros(nz)).to(self.device))
            nz_1 = nz
        self.Fr = nn.ParameterList(Fr).to(self.device)
        self.fr = nn.ParameterList(fr).to(self.device)
        self.b = nn.ParameterList(b).to(self.device)
        if act_fn is not None:
            self.d = nn.ParameterList(d)
        # cached weights
        self.Q = None
        self.R = None

    def forward(self, x):
        sqrt_gam = math.sqrt(self.nu - self.mu)
        sqrt_2 = math.sqrt(2.)
        if self.training:
            self.Q, self.R = None, None
            Q = cayley(self.fq * self.Fq / self.Fq.norm()).to(self.device)
            R = [cayley(fr * Fr / Fr.norm()) for Fr, fr in zip(self.Fr, self.fr)]
        else:
            if self.Q is None:
                with torch.no_grad():
                    self.Q = cayley(self.fq * self.Fq / self.Fq.norm())
                    self.R = [cayley(fr * Fr / Fr.norm()) for Fr, fr in zip(self.Fr, self.fr)]
            Q, R = self.Q, self.R

        xh = sqrt_gam * x.to(self.device) @ Q.T
        yh = []
        hk_1 = xh[..., :0]
        idx = 0
        for k, nz in enumerate(self.units):
            xk = xh[..., idx:idx+nz]
            gh = torch.cat((xk, hk_1), dim=-1).to(self.device)
            if self.act_fn is None:
                gh = sqrt_2 * F.relu(sqrt_2 * gh @ R[k].T + self.b[k]) @ R[k]
            else:
                gh = sqrt_2 * (gh @ R[k].T) + self.b[k]
                gh = self.act_fn(gh * torch.exp(self.d[k])) * torch.exp(-self.d[k])
                gh = sqrt_2 * gh @ R[k]
            hk = gh[..., :nz] - xk
            gk = gh[..., nz:]
            yh.append(hk_1-gk)
            idx += nz
            hk_1 = hk
        yh.append(hk_1)

        yh = torch.cat(yh, dim=-1).to(self.device)
        y = 0.5 * ((self.mu + self.nu) * x.to(self.device) + sqrt_gam * yh @ Q) + self.by

        return y

In [6]:
# ICNN
class ReHU(nn.Module):
    """ Rectified Huber unit"""
    def __init__(self, d):
        super().__init__()
        self.a = 1/d
        self.b = -d/2

    def forward(self, x):
        return torch.max(torch.clamp(torch.sign(x)*self.a/2*x**2,min=0,max=-self.b),x+self.b)


class ICNN(nn.Module):
    def __init__(self, layer_sizes, activation=F.relu_):
        super().__init__()
        self.W = nn.ParameterList([nn.Parameter(torch.Tensor(l, layer_sizes[0]))
                                   for l in layer_sizes[1:]])
        self.U = nn.ParameterList([nn.Parameter(torch.Tensor(layer_sizes[i+1], layer_sizes[i]))
                                   for i in range(1,len(layer_sizes)-1)])
        self.bias = nn.ParameterList([nn.Parameter(torch.Tensor(l)) for l in layer_sizes[1:]])
        self.act = activation
        self.reset_parameters()
        self.rehu = ReHU(1.0)
        self.act2 = torch.nn.Tanh()
        self.beta = nn.Parameter(torch.tensor(5.0), requires_grad=False)
        self.act_sp = torch.nn.Softplus()

    def reset_parameters(self):
        # copying from PyTorch Linear
        for W in self.W:
            nn.init.kaiming_uniform_(W, a=0.1**0.5)
        for U in self.U:
            nn.init.kaiming_uniform_(U, a=0.1**0.5)
        for i,b in enumerate(self.bias):
            fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.W[i])
            bound = 1 / (fan_in**0.5)
            nn.init.uniform_(b, -bound, bound)

    def forward(self, x):
        beta = torch.abs(self.beta)
        z = F.linear(x, self.W[0], self.bias[0])
        z = self.act_sp(z*beta)/beta

        for W,b,U in zip(self.W[1:-1], self.bias[1:-1], self.U[:-1]):
            z = F.linear(x, W, b) + F.linear(z, F.softplus(U)) / U.shape[0]
            z = self.act_sp(z*beta)/beta

        V_res = F.linear(x, self.W[-1], self.bias[-1]) + F.linear(z, F.softplus(self.U[-1])) / self.U[-1].shape[0]
        return V_res * 0.1 #here 0.1 is only for scalaring purpose

class Mono_ICNN_grad(nn.Module):
    def __init__(self, obs_dim, hidden_dim,distribued=False):
        super(Mono_ICNN_grad, self).__init__()
        use_cuda = torch.cuda.is_available()
        self.device   = torch.device("cuda" if use_cuda else "cpu")

        self.rehu = ReHU(float(7.0))
        self.icnn = ICNN([obs_dim, hidden_dim, hidden_dim, 1], activation=torch.nn.Softplus())
        self.distributed = distribued

    def forward(self, state):
        output = self.icnn(state)
        compute_batch_jacobian = torch.vmap(torch.func.jacrev(self.icnn))
        y = compute_batch_jacobian(state).squeeze()
        return y

In [7]:
def init_NNs():
    features = 12
    units = [200, 200]
    mu, nu = 0.01, 20
    FTN_net = MonLipNet(features, units, mu, nu)
    FTN_optimizer = torch.optim.Adam(FTN_net.parameters(), lr=1e-3)

    # define the monotone neural network with gradient of SCNN
    hidden_dim = 200
    cvx_net = Mono_ICNN_grad(12, hidden_dim)

    # params_to_optimize = [param for g in policy_net.g_list for param in g.parameters()]
    cvx_optimizer = torch.optim.Adam(cvx_net.parameters(), lr=1e-3)
    # Loss function
    criterion = nn.MSELoss()
    return FTN_net, FTN_optimizer, cvx_net,cvx_optimizer,criterion

In [8]:
from torch import nn, optim
from torch.autograd import Variable
from torch.utils.data import DataLoader

batch_size = 256
epochs = 100
v_data = np.float32(v_data)
dataset = torch.utils.data.TensorDataset(torch.tensor(q_data.T), torch.tensor(v_data.T))
train_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
FTN_net, FTN_optimizer, cvx_net,cvx_optimizer,criterion = init_NNs()

loss_cvx_list = []
loss_ftn_list = []
for epoch in range(epochs):
    epoch_loss = 0
    cvx_epoch_loss = 0
    for i, (q, v) in enumerate(train_dataloader):
        # Reset gradients
        FTN_optimizer.zero_grad()
        cvx_optimizer.zero_grad()

        # Forward pass: Compute predicted actions by passing states to the model
        predicted_v = FTN_net(q)
        cvx_predicted_v = cvx_net(q)

        # Calculate loss
        loss = criterion(predicted_v, v)
        cvx_loss = criterion(cvx_predicted_v, v)

        # Backward pass: Compute gradient of the loss with respect to model parameters
        loss.backward()
        cvx_loss.backward()

        # Perform a single optimization step (parameter update)
        FTN_optimizer.step()
        cvx_optimizer.step()

        # Accumulate loss
        epoch_loss += loss.item()
        cvx_epoch_loss += cvx_loss.item()
    loss_cvx_list.append(cvx_epoch_loss)
    loss_ftn_list.append(epoch_loss)

    # Print average loss for the epoch
    print(f"****FTN method****(Linear): Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/i:.5f}")
    print(f"****CVX method****(Linear): Epoch {epoch+1}/{epochs}, Loss: {cvx_epoch_loss/i:.5f}")


****FTN method****(Linear): Epoch 1/100, Loss: 0.36568
****CVX method****(Linear): Epoch 1/100, Loss: 0.81256
****FTN method****(Linear): Epoch 2/100, Loss: 0.02676
****CVX method****(Linear): Epoch 2/100, Loss: 0.22075
****FTN method****(Linear): Epoch 3/100, Loss: 0.00577
****CVX method****(Linear): Epoch 3/100, Loss: 0.06378
****FTN method****(Linear): Epoch 4/100, Loss: 0.00303
****CVX method****(Linear): Epoch 4/100, Loss: 0.04729
****FTN method****(Linear): Epoch 5/100, Loss: 0.00170
****CVX method****(Linear): Epoch 5/100, Loss: 0.04555
****FTN method****(Linear): Epoch 6/100, Loss: 0.00108
****CVX method****(Linear): Epoch 6/100, Loss: 0.04391
****FTN method****(Linear): Epoch 7/100, Loss: 0.00070
****CVX method****(Linear): Epoch 7/100, Loss: 0.04212
****FTN method****(Linear): Epoch 8/100, Loss: 0.00047
****CVX method****(Linear): Epoch 8/100, Loss: 0.04097
****FTN method****(Linear): Epoch 9/100, Loss: 0.00034
****CVX method****(Linear): Epoch 9/100, Loss: 0.03889
****FTN me

# Test the performance

In [9]:
# Test
q_test = (np.random.rand(12,1000).astype(np.float32) - 0.5)*0.2 #range [-0.1,0.1]
v_test = X@q_test + 1

In [10]:
predicted_v = FTN_net(torch.tensor(q_test.T))
cvx_predicted_v = cvx_net(torch.tensor(q_test.T))

In [11]:
# check the mean square error
print(np.mean(np.square(predicted_v.detach().numpy() - v_test.T)))
print(np.mean(np.square(cvx_predicted_v.detach().numpy() - v_test.T)))

1.3519971514479501e-05
0.00035759874583684176


# Test with nonlinear power system

In [12]:
! pip install pandapower



In [13]:
from scipy.io import loadmat
import pandapower as pp
import pandapower.networks as pn

In [14]:
import gym

def create_13bus():
    pp_net = pp.converter.from_mpc('case_13.mat', casename_mpc_file='case_mpc')

    pp_net.sgen['p_mw'] = 0.0
    pp_net.sgen['q_mvar'] = 0.0

    pp.create_sgen(pp_net, 2, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 7, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 9, p_mw = 0, q_mvar=0)

    pp.create_sgen(pp_net, 1, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 3, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 4, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 5, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 6, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 8, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 10, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 11, p_mw = 0, q_mvar=0)
    pp.create_sgen(pp_net, 12, p_mw = 0, q_mvar=0)

    # In the original IEEE 13 bus system, there is no load in bus 3, 7, 8.
    # Add the load to corresponding node for dimension alignment in RL training
    pp.create_load(pp_net, 3, p_mw = 0, q_mvar=0)
    pp.create_load(pp_net, 7, p_mw = 0, q_mvar=0)
    pp.create_load(pp_net, 8, p_mw = 0, q_mvar=0)

    return pp_net


class IEEE13bus(gym.Env):
    def __init__(self, pp_net, injection_bus, v0=1, vmax=1.05, vmin=0.95, all_bus=False):
        self.network =  pp_net
        self.obs_dim = 1
        self.action_dim = 1
        self.injection_bus = injection_bus
        self.agentnum = len(injection_bus)
        # if self.agentnum == 12:  # comment out for mpc experiments
        #     all_bus=True
        self.v0 = v0
        self.vmax = vmax
        self.vmin = vmin

        self.load0_p = np.copy(self.network.load['p_mw'])
        self.load0_q = np.copy(self.network.load['q_mvar'])

        self.gen0_p = np.copy(self.network.sgen['p_mw'])
        self.gen0_q = np.copy(self.network.sgen['q_mvar'])
        self.all_bus = all_bus

        self.state = np.ones(self.agentnum, )

    def step(self, action):
        # state-transition dynamics
        for i in range(len(self.injection_bus)):
            self.network.sgen.at[i, 'q_mvar'] = action[i]

        pp.runpp(self.network, algorithm='bfsw', init = 'dc')

        self.state = self.network.res_bus.iloc[self.injection_bus].vm_pu.to_numpy()
        return self.state

    def reset0(self, seed=1): #reset voltage to nominal value

        self.network.load['p_mw'] = 0*self.load0_p
        self.network.load['q_mvar'] = 0*self.load0_q

        self.network.sgen['p_mw'] = 0*self.gen0_p
        self.network.sgen['q_mvar'] = 0*self.gen0_q

        pp.runpp(self.network, algorithm='bfsw')
        self.state = self.network.res_bus.iloc[self.injection_bus].vm_pu.to_numpy()
        return self.state



In [15]:
net = create_13bus()
injection_bus = np.array([1,2,5,3,4,7,8,9,6,10,11,12])
env = IEEE13bus(net, injection_bus)

  and should_run_async(code)


In [16]:
# generate data
v_list = []
q_list = []
env.reset0()
for i in range(10000):
  q = (np.random.rand(12).astype(np.float32) - 0.5)*0.4 #range [-0.1,0.1]
  env.step(q)
  v_list.append(env.state)
  q_list.append(q)
v_data = np.array(v_list)
q_data = np.array(q_list)

  and should_run_async(code)


In [17]:
batch_size = 256
epochs = 100
v_data = np.float32(v_data)
dataset = torch.utils.data.TensorDataset(torch.tensor(q_data), torch.tensor(v_data))
train_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
FTN_net, FTN_optimizer, cvx_net,cvx_optimizer,criterion = init_NNs()

loss_cvx_list = []
loss_ftn_list = []
for epoch in range(epochs):
    epoch_loss = 0
    cvx_epoch_loss = 0
    for i, (q, v) in enumerate(train_dataloader):
        # Reset gradients
        FTN_optimizer.zero_grad()
        cvx_optimizer.zero_grad()

        # Forward pass: Compute predicted actions by passing states to the model
        predicted_v = FTN_net(q)
        cvx_predicted_v = cvx_net(q)

        # Calculate loss
        loss = criterion(predicted_v, v)
        cvx_loss = criterion(cvx_predicted_v, v)

        # Backward pass: Compute gradient of the loss with respect to model parameters
        loss.backward()
        cvx_loss.backward()

        # Perform a single optimization step (parameter update)
        FTN_optimizer.step()
        cvx_optimizer.step()

        # Accumulate loss
        epoch_loss += loss.item()
        cvx_epoch_loss += cvx_loss.item()
    loss_cvx_list.append(cvx_epoch_loss)
    loss_ftn_list.append(epoch_loss)

    # Print average loss for the epoch
    print(f"****FTN method****(Linear): Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/i:.5f}")
    print(f"****CVX method****(Linear): Epoch {epoch+1}/{epochs}, Loss: {cvx_epoch_loss/i:.5f}")

  and should_run_async(code)


****FTN method****(Linear): Epoch 1/100, Loss: 0.51592
****CVX method****(Linear): Epoch 1/100, Loss: 0.97385
****FTN method****(Linear): Epoch 2/100, Loss: 0.13363
****CVX method****(Linear): Epoch 2/100, Loss: 0.34439
****FTN method****(Linear): Epoch 3/100, Loss: 0.03372
****CVX method****(Linear): Epoch 3/100, Loss: 0.15213
****FTN method****(Linear): Epoch 4/100, Loss: 0.01463
****CVX method****(Linear): Epoch 4/100, Loss: 0.11556
****FTN method****(Linear): Epoch 5/100, Loss: 0.00833
****CVX method****(Linear): Epoch 5/100, Loss: 0.10291
****FTN method****(Linear): Epoch 6/100, Loss: 0.00505
****CVX method****(Linear): Epoch 6/100, Loss: 0.09239
****FTN method****(Linear): Epoch 7/100, Loss: 0.00330
****CVX method****(Linear): Epoch 7/100, Loss: 0.08267
****FTN method****(Linear): Epoch 8/100, Loss: 0.00225
****CVX method****(Linear): Epoch 8/100, Loss: 0.07376
****FTN method****(Linear): Epoch 9/100, Loss: 0.00163
****CVX method****(Linear): Epoch 9/100, Loss: 0.06568
****FTN me

In [18]:
# Test
v_list = []
q_list = []
env.reset0()
for i in range(1000):
  q = (np.random.rand(12).astype(np.float32) - 0.5)*0.4 #range [-0.1,0.1]
  env.step(q)
  v_list.append(env.state)
  q_list.append(q)
v_test = np.array(v_list)
q_test = np.array(q_list)

In [19]:
predicted_v = FTN_net(torch.tensor(q_test))
cvx_predicted_v = cvx_net(torch.tensor(q_test))

  and should_run_async(code)


In [20]:
# check the mean square error
print(np.mean(np.square(predicted_v.detach().numpy() - v_test)))
print(np.mean(np.square(cvx_predicted_v.detach().numpy() - v_test)))

3.103201279504628e-05
0.0003054466196824061
