In [None]:
from gym import Env
from gym.spaces import Discrete, Box
import gym
import random
import numpy as np
from stable_baselines3 import SAC
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import EvalCallback, BaseCallback
import py_dss_interface
import os
import pathlib

In [None]:
script_path = os.path.dirname(os.path.abspath("__file__"))
dss_file = pathlib.Path(script_path).joinpath("Article SAC.dss")
dss = py_dss_interface.DSSDLL()

In [None]:
class OpenD(Env):

    def __init__(self):
        self.irad=0
        self.load=0
        self.temp=0
        self.action_space = Box(low=-1, high=1,shape=(5,1))
        self.observation_space = Box(low=-1 , high=3, shape=(123,), dtype=np.float64)

    def step(self, action):
        self.pv_list2=['PV1','PV2','PV3','PV4','PV5']
        for i in range(len(self.pv_list2)):
            dss.pvsystems_write_name(self.pv_list2[i])
            var=action[i][0]*1500
            dss.pvsystems_write_kvar(var)
        dss.text("Solve")
        self.calculator()
        r1=r2=r3=0

        for volt in self.state[0:117]:
            r1+= abs(volt-1.0)

        for powers in range(5):
             if self.po[powers] > self.state[powers+117]:
                r2+= (self.state[powers+117] - self.po[powers])

        if self.lo < self.state[122]:
            r3+=self.lo- self.state[122]

        r11=r1*5
        r22=r2*2
        r33=r3*1

        reward=r11+r22+r33


        self.powerFlow_length+= -1

        if self.powerFlow_length <= 0:
            done = True

        else:
            done = False

        info = {}

        return self.states, reward, done, info

    def reset(self, rando=True):
        self.powerFlow_length = 100
        self.reward_list=[]
        self.po=[]

        dss.text(f"compile [{dss_file}]")

        if rando==True:
            self.irad= round(random.uniform(0,1),4)
            self.load= round(random.uniform(0.5,1),4)
            self.temp=25

        dss.loadshapes_first()
        dss.loadshapes_next()
        dss.loadshapes_write_p_mult(f"[{self.load}]")
        dss.loadshapes_next()
        dss.loadshapes_write_p_mult(f"[{self.irad}]")
        dss.text(f"edit Tshape.MyTemp temp={self.temp}")
        dss.text("Solve")
        self.calculator()


        for i in range(5):
            self.po.append(self.pv_powers()[0][i]/1000)

        self.lo=self.loss()[0]+self.loss()[1]

        return self.states

    def loss(self):
        loss= dss.circuit_losses()
        for i in range(len(loss)):
            loss[i] = round(loss[i]/1e6,5)
        return loss


    def calculator(self):
        self.state=[]
        self.state=dss.circuit_all_bus_vmag_pu()

        for i in range(3,6):
            self.state[i]=1.0000001

        for i in range(len(self.state)):
            self.state[i] = round(self.state[i],5)

        for i in range(5):
            self.state.append(self.pv_powers()[0][i]/1000)

        self.state.append((self.loss()[0]+self.loss()[1]))


        self.states = np.array(self.state, dtype='float64')
        return self.states, self.state



    def pv_powers(self):
        self.pv_list=['PVSystem.PV1','PVSystem.PV2','PVSystem.PV3','PVSystem.PV4','PVSystem.PV5']
        self.power_list=[]
        self.repower_list=[]
        for i in range(len(self.pv_list)):
            dss.circuit_set_active_element(self.pv_list[i]) #[P1i,Q1i,P2i,Q2i,P3i,Q3i])
            self.repower=-( dss.cktelement_powers()[1]+dss.cktelement_powers()[3]+dss.cktelement_powers()[5] )
            self.power=-( dss.cktelement_powers()[0]+dss.cktelement_powers()[2]+dss.cktelement_powers()[4] )

            if self.power<0.01 and self.power> -0.01:
                self.power=0
            self.power_list.append(self.power)

            if self.repower<0.01 and self.repower> -0.01:
                self.repower=0
            self.repower_list.append(self.repower)

        for i in range(len(self.repower_list)):
            self.repower_list[i] = round(self.repower_list[i],0)

        for i in range(len(self.power_list)):
            self.power_list[i] = round(self.power_list[i],0)

        return self.power_list,self.repower_list


In [None]:
env = OpenD()
check_env(env)

In [None]:
logdir = "log"

if not os.path.exists(logdir):
    os.makedirs(logdir)

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
model = SAC('MlpPolicy', env, verbose = 1 ,  tensorboard_log=logdir ,device="cpu")

In [None]:
name = "sac"
CHECKPOINT_DIR = f'./train2/{name}/'
callback = TrainAndLoggingCallback(check_freq=20000, save_path=CHECKPOINT_DIR)

In [None]:
model.learn(total_timesteps=1000000 ,tb_log_name=name , reset_num_timesteps =False  , callback=callback)