In [1]:
# Reinforcement Learning WRRF Optimization
# By: Henry Croll, Kaoru Ikuma, and Soumik Sarkar
# Copyright Â© - 2024
# Iowa State University Research Foundation, Inc.

# This notebook was developed for training and evaluation of reinforcement learning agents applied to water resource recovery facility optimization.
# This notebook requires additional files and software to function: 
# - SUMO from Dynamita with Digital Twin Toolkit
# - scheduler.py from Dynamita
# - tool.py from Dynamita
# - SUMO model .dll file for simulation
# - SUMO state .xml file for simulation starting state
# - Influent.xlsx file with influent data

# Initial Stablebaselines3 agents and test environment based on Deep RL Tutorial by Nicholas Renotte
# Create test custom environment based on Tutorial by Nicholas Renotte

# Agent tuned for open control of BSM1 WWTF simulation

# Updated 12/10/2024

In [2]:
import scheduler as ds
import tool as dtool
import time 

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import shutil
import tensorflow

import os
import random
import gym
from gym import Env, spaces
from stable_baselines3 import PPO, DQN, A2C, TD3
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.utils import set_random_seed

In [None]:
# Create folder for logs
log_path = os.path.join('Test_Training', 'Test_Logs')

# define what model and state to use
SUMOmodel = "MODEL_NAME_HERE.dll"
SUMOstate = "STATE_NAME_HERE"

# Create active xml 
SUMOstate_active = "ACTIVE_STATE_NAME_HERE"
path = 'YOUR_PATH_HERE\\Dynamita\\Sumo22\\PythonAPI\\YOUR_PATH_HERE\\'
src = path + SUMOstate + ".xml"
dst = path + SUMOstate_active + ".xml"

shutil.copyfile(src, dst)

In [4]:
# This callback is called when new data is available. It depends on the Sumo__Datacomm variable. 
# In this example it happens every hour.     
def data_callback(job, data):
    # Get the information regarding the running simulation
    jobData = ds.sumo.getJobData(job)
    
    # Sumo__Time is in milliseconds, convert it to days.
    data["Sumo__Time"] /= dtool.day
    
    # print progress to the console
    # print(f"DC #{job} - {data['Sumo__Time']}")
    
    # Save the data for the job. We could process it right here 
    # (print, save it in a file, database, etc).
    # But in this example we collect it and export it once the 
    # simulation is finished.
    for item in data:     
        jobData["results"][item] = data[item]

In [5]:
# This callback is called when Sumo wants to say something, like a command was successful or not,
# an error occured or the simulation is finished.
def msg_callback(job, msg):
    # Make sure we see the messages
    #print("MSG #" + str(job) + ": '" + msg + "'")
    
    # If the simulation is finished, we tell Sumo to save the state - we'll get another message when the state is saved
    if (ds.sumo.isSimFinishedMsg(msg)):
        jobData = ds.sumo.getJobData(job)   
        ds.sumo.sendCommand(job, f"save {SUMOstate_active}.xml")
        
    # When the state is finished we can clean up the simulation
    elif msg.startswith("530045"):
        jobData = ds.sumo.getJobData(job)        
        ds.sumo.finish(job)
        jobData["finished"] = True

In [6]:
def SUMO_step(self, set_air1, set_air2, set_air3, set_air4, set_air5, set_recycle, set_ras):
    
    # start a Sumo simulation
    job = ds.sumo.schedule(
                        SUMOmodel, # model dll path extracted from a .sumo file
                        
                        # Commands to the SumoCore to setup the simulation
        commands      = [
                        # Load a base state that includes parameters set from the Sumo GUI. 
                        # If this is omitted every state and parameter will be to it's factory default
                         f"load {SUMOstate_active}.xml",
                         "maptoic",
            
                        # Send new influent parameters
            
                        f"set Sumo__Plant__Influent2__param__Q {self.influentdf['Q'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__SU {self.influentdf['SU'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__SB {self.influentdf['SB'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__XU {self.influentdf['XU'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__XB {self.influentdf['XB'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__XOHO {self.influentdf['XOHO'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__XNITO {self.influentdf['XNITO'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__XE {self.influentdf['XE'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__SO2 {self.influentdf['SO2'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__SNOx {self.influentdf['SNOx'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__SNHx {self.influentdf['SNHx'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__SN_B {self.influentdf['SN,B'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__XN_B {self.influentdf['XN,B'][self.influent]};",
                        f"set Sumo__Plant__Influent2__param__SALK {self.influentdf['SALK'][self.influent]};",

                        # Send new control parameters
                         f"set Sumo__Plant__Sideflowdivider1__param__Qpumped_target {set_recycle};",
                         f"set Sumo__Plant__CSTR1__param__DOSP {set_air1};",
                         f"set Sumo__Plant__CSTR2__param__DOSP {set_air2};",
                         f"set Sumo__Plant__CSTR3__param__DOSP {set_air3};",
                         f"set Sumo__Plant__CSTR4__param__DOSP {set_air4};",
                         f"set Sumo__Plant__CSTR5__param__DOSP {set_air5};",
                         f"set Sumo__Plant__Clarifier1__param__Qsludge_target {set_ras};",
            
                        # Set simulation length and how often we want data
                         f"set Sumo__StopTime {15*dtool.minute};",
                         f"set Sumo__DataComm {15*dtool.minute};", 
                                       
                        # Choose simulation mode from dynamic/steady
                         "mode dynamic;",
#                         "loadtsv inf_data.tsv;"
            
                        # Start the simulation
                        # If you omit this, you can still start the simulation with sumo.sendCommand(job1, "start")
                        # but we've not seen a usecase where it is needed.
                         "start;"],
                                                 
                        # list the variables we're interested in
        variables     = self.variables,
                        
           
                        # when a jobData is persistent it doesn't get cleared when the simulation is finished
                        # so we can store simulation results in it.
        jobData       = {
                            ds.sumo.persistent: True,
                            "results" : { },
                            "finished": False,
                            "index" : self.i
                        },
        );
    
    # wait until the simulation is finished
    jobData = ds.sumo.getJobData(job)
    while (jobData["finished"] != True):
        time.sleep(0.0001)
    
        
    simdata = ds.sumo.jobData.copy()
    
    # clean up the job data from memory
    ds.sumo.deleteJobData
    #ds.sumo.cleanup()
    
    return simdata

In [7]:
# Define reward function
def getreward(self, data):     

    ME = 24*0.005*(1000*sum(map(lambda x : x<20, (self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR1__kLaGO2'], 
                                      self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR2__kLaGO2'])))
         + 1333*sum(map(lambda x : x<20, (self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR3__kLaGO2'],
                                    self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR4__kLaGO2'],
                                    self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR5__kLaGO2']))))

    AE = (9.458/(1.8*1000)*(1000*(self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR1__kLaGO2'] 
                              + self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR2__kLaGO2']) + 
                            1333*(self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR3__kLaGO2']
                              + self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR4__kLaGO2']
                              + self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR5__kLaGO2'])))
    
    PE = (0.004 * self.resultdf.iloc[self.total-1]['Sumo__Plant__Pipe11__Q'] + 
            0.05 * self.resultdf.iloc[self.total-1]['Sumo__Plant__Sludge1__Q'] + 
            0.008 * self.resultdf.iloc[self.total-1]['Sumo__Plant__Pipe14__Q'])
    
    SP = 0 #0.75/1000 * self.resultdf.iloc[self.total-1]['Sumo__Plant__Sludge1__XTSS'] * self.resultdf.iloc[self.total-1]['Sumo__Plant__Sludge1__Q']
    
    EQ = (2*self.resultdf.iloc[self.total-1]['Sumo__Plant__Effluent1__XTSS']
          + self.resultdf.iloc[self.total-1]['Sumo__Plant__Effluent1__TCOD']
          + 10*self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR5__SNOx']
          + 30*self.resultdf.iloc[self.total-1]['Sumo__Plant__CSTR5__SKN']
          + 2*self.resultdf.iloc[self.total-1]['Sumo__Plant__Effluent1__TBOD_5'])*self.resultdf.iloc[self.total-1]['Sumo__Plant__Effluent1__Q']/1000
        
    reward = 0 - ME - AE - PE - EQ - 5*SP
    
    return reward

In [8]:
# Define Environment
# Write custom environment
class SUMOEnv(Env):
    def __init__(self):
        
        ### Define SUMO Parameters ###
        
        # free all resources of Sumo before we continue
        ds.sumo.cleanup()
        
        # set the number of parallel simulations
        ds.sumo.setParallelJobs(1)
        
        # for debugging:
        # ds.sumo.setLogDetails(6)

        # Define influent dataframe
        self.influentdf = pd.read_excel('Influent.xlsx')

        # setup callbacks for Sumo to call
        ds.sumo.message_callback = msg_callback
        ds.sumo.datacomm_callback = data_callback
        
        # list the variables we're interested in for Evaluation
        self.sumo_time = ["Sumo__Time"]      
        self.influent = ["Sumo__Plant__Influent2__Q",
                    "Sumo__Plant__Influent2__TCOD",
                    "Sumo__Plant__Influent2__TKN",
                    "Sumo__Plant__Influent2__TP"]
        self.operation = ["Sumo__Plant__Pipe11__Q",
                    "Sumo__Plant__Pipe14__Q",
                    "Sumo__Plant__Pipe15__Q",
                    "Sumo__Plant__CSTR1__SO2",
                    "Sumo__Plant__CSTR2__SO2",
                    "Sumo__Plant__CSTR3__SO2",
                    "Sumo__Plant__CSTR4__SO2",
                    "Sumo__Plant__CSTR5__SO2",
                    "Sumo__Plant__CSTR1__kLaGO2",
                    "Sumo__Plant__CSTR2__kLaGO2",
                    "Sumo__Plant__CSTR3__kLaGO2",
                    "Sumo__Plant__CSTR4__kLaGO2",
                    "Sumo__Plant__CSTR5__kLaGO2",
                    "Sumo__Plant__CSTR1__SNHx",
                    "Sumo__Plant__CSTR2__SNHx",
                    "Sumo__Plant__CSTR3__SNHx",
                    "Sumo__Plant__CSTR4__SNHx",
                    "Sumo__Plant__CSTR5__SNHx",
                    "Sumo__Plant__CSTR1__SNOx",
                    "Sumo__Plant__CSTR2__SNOx",
                    "Sumo__Plant__CSTR3__SNOx",
                    "Sumo__Plant__CSTR4__SNOx",
                    "Sumo__Plant__CSTR5__SNOx",
                    "Sumo__Plant__CSTR1__XTSS", 
                    "Sumo__Plant__CSTR5__ORP",
                    "Sumo__Plant__CSTR5__SNOx",
                    "Sumo__Plant__CSTR5__TKN",
                    "Sumo__Plant__CSTR5__TCOD",
                    "Sumo__Plant__CSTR5__TBOD_5",
                    "Sumo__Plant__CSTR5__SKN",
                    "Sumo__Plant__CSTR5__TBOD_5",
                    "Sumo__Plant__SRT1"]
        self.effluent = ["Sumo__Plant__Effluent1__Q",
                    "Sumo__Plant__Effluent1__SNOx",
                    "Sumo__Plant__Effluent1__SNHx",
                    "Sumo__Plant__Effluent1__SN_B",
                    "Sumo__Plant__Effluent1__XN_B",
                    "Sumo__Plant__Effluent1__SALK",
                    "Sumo__Plant__Effluent1__XTSS",
                    "Sumo__Plant__Effluent1__TKN",
                    "Sumo__Plant__Effluent1__SKN",
                    "Sumo__Plant__Effluent1__TN",
                    "Sumo__Plant__Effluent1__TCOD",
                    "Sumo__Plant__Effluent1__TBOD_5",
                    "Sumo__Plant__Sludge1__Q",
                    "Sumo__Plant__Sludge1__XTSS"]

        self.variables = self.sumo_time + self.influent + self.operation + self.effluent

        self.resultdf = pd.DataFrame()
        self.rewarddf = pd.DataFrame(columns=["Reward"])
        
        ### Define RL Env Parameters ###
        
        # Define observation space
        # define low observation bounds
        lowobs = np.array([
            # Inf Flow
            #0,
            # Inf NHx Load
            #0,
            # CSTR1 SNHx
            0,
            # CSTR2 SNHx
            0,
            # CSTR3 SNHx
            0,
            # CSTR4 SNHx
            0,
            # CSTR5 SNHx
            0,
            # CSTR1 SNOx
            0,
            # CSTR2 SNOx
            0,
            # CSTR3 SNOx
            0,
            # CSTR4 SNOx
            0,
            # CSTR5 SNOx
            0,
            # CSTR1 XTSS
            0,
        ]).astype(np.float32)
        
        # define high observation bounds
        highobs = np.array([
            # Inf Flow
            #1,
            # Inf NHx Load
            #0,
            # CSTR1 SNHx
            1,
            # CSTR2 SNHx
            1,
            # CSTR3 SNHx
            1,
            # CSTR4 SNHx
            1,
            # CSTR5 SNHx
            1,
            # CSTR1 SNOx
            1,
            # CSTR2 SNOx
            1,
            # CSTR3 SNOx
            1,
            # CSTR4 SNOx
            1,
            # CSTR5 SNOx
            1,
            # CSTR1 XTSS
            1,
        ]).astype(np.float32)
            
        self.observation_space = gym.spaces.Box(lowobs,highobs)
        
        # Define action space
       
        # define low action bounds
        lowact = np.array([
            # CSTR1 DO
            0,
            # CSTR2 DO
            0,
            # CSTR3 DO
            0,
            # CSTR4 DO
            0,
            # CSTR5 DO
            0,
            # recycle IMLR
            0,
            # RAS
            0,
        ]).astype(np.float32)
        
        # define high observation bounds
        highact = np.array([
            # CSTR1 DO
            1,
            # CSTR2 DO
            1,
            # CSTR3 DO
            1,
            # CSTR4 DO
            1,
            # CSTR5 DO
            1,
            # recycle IMLR
            1,
            # RAS
            1,
        ]).astype(np.float32)
            
        self.action_space = gym.spaces.Box(lowact,highact)
        
        # Define starting environment
        self.state = np.zeros(11)
        
        # Start sim counter
        self.i = 1344
        self.total = 0
        self.influent = 1344
    
    def step(self, action):
        # Apply action
        # This will be done by the RL agent which passes the new action into the SUMO_step function
        ### This is where SUMO is called ###
                                      
        set_air1 = action[0]*3
        set_air2 = action[1]*3
        set_air3 = action[2]*3
        set_air4 = action[3]*3
        set_air5 = action[4]*3
        set_recycle = action[5]*92230
        set_ras = action[6]*36892
        
        data = SUMO_step(self, set_air1, set_air2, set_air3, set_air4, set_air5, set_recycle, set_ras)

        # Increase simulation tracker by 1 timestep
        self.i -= 1
        self.total += 1
        self.influent += 1

        self.state = [
            data[self.total]['results']['Sumo__Plant__CSTR1__SNHx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR2__SNHx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR3__SNHx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR4__SNHx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR5__SNHx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR1__SNOx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR2__SNOx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR3__SNOx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR4__SNOx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR5__SNOx']/50,
            data[self.total]['results']['Sumo__Plant__CSTR1__XTSS']/10000
        ]

        # concatonate result to dataframe
        self.resultdf = pd.concat([self.resultdf,(pd.DataFrame(data[self.total]['results'], index = [0]))])
        # update time in first column
        self.resultdf.iloc[-1][0] = self.resultdf.iloc[-1][0]*(self.total)

        ### Calculate reward ###
        reward = getreward(self, data)

        # add reward
        self.rewarddf.loc[self.total] = reward
        
        # Check if simulation is complete
        # Will need to update run-time
        if self.i <= 0: 
            done = True
            # Export results if desired
            self.resultdf.to_csv("RESULTS_FILE_NAME.csv", index = True)
            print(self.total)
        else:
            done = False

        # Set placeholder for info
        info = {}
        
        # Return step information
        return self.state, reward, done, info
    
    def render(self):
        # No renders as part of this environment
        pass
    
    def reset(self):
        # Reset starting environment
        # Create active xml 
        # UNCOMMENT THIS SECTION DURING TRAINING/COMMENT DURING TESTING
#         SUMOstate_active = "ACTIVE_STATE_NAME"
#         path = 'YOUR_PATH\\Dynamita\\Sumo22\\PythonAPI\\YOUR_PATH'
#         src = path + SUMOstate + ".xml"
#         dst = path + SUMOstate_active + ".xml"

#         shutil.copyfile(src, dst)
        
#         Path = os.path.join('Test_Training', 'Saved_Models', 
#                         'MODEL_NAME_HERE')
#         model.save(Path)
        
#         # Reset state
#         self.state = np.zeros(11)
        
        # Reset simulation length for tracking episodes (LEAVE UNCOMMENTED)
        self.i = 1344
        return self.state
        
    
env = SUMOEnv()

In [9]:
# Make Monitor Env
env_monitor = Monitor(env)

# Wrap environment for Stable Baselines
env_wrap = DummyVecEnv([lambda: env_monitor])

hyperparams = {
        "gamma": 0.99,
        "learning_rate": 0.001,
        "batch_size": 200,
        "tau": 0.005,
        "train_freq": (4, "episode"),
        "target_policy_noise": 0.05,
        "learning_starts": 100000,
    }


In [10]:
seed = 0

# Define RL model (UNCOMMENT DURING TRAINING)
#model = TD3('MlpPolicy', env_wrap, **hyperparams, verbose=1, tensorboard_log=log_path, seed = seed)

np.random.seed(seed)
set_random_seed(seed)

# Train Model (UNCOMMENT FOR TRAINING)
#model.learn(total_timesteps = 150000, log_interval = 1) 

In [11]:
# I do not think this section works, it was for the purpose of continuing training of an existing model

# Path = os.path.join('Test_Training', 'Saved_Models', 
#                        'MODEL_NAME_HERE')

# log_path = os.path.join('Test_Training', 'Test_Logs')

# model = TD3.load(Path, env_wrap, **hyperparams, verbose=1, tensorboard_log=log_path, seed = seed)

# model.load_replay_buffer("REPLAY_BUFFER_FILE")

# # Train Model
# model.learn(total_timesteps = 100000, log_interval = 1) 

In [12]:
# UNCOMMENT DURING TRAINING
# Save final model
# Path = os.path.join('Test_Training', 'Saved_Models', 
#                        'MODEL_NAME_HERE')
# model.save(Path)
# model.save_replay_buffer("REPLAY_BUFFER_FILE")

In [None]:
# UNCOMMENT FOR TESTING
# Test the Trained Model

Path = os.path.join('Test_Training', 'Saved_Models', 
                       'MODEL_NAME_HERE')
model = TD3.load(Path)

episodes = 26
total = 0
for episode in range (1, episodes+1):
    obs = env_wrap.reset()
    done = False
    score = 0
    
    while not done:
        action, _ = model.predict(obs)
        print(action)
        obs, reward, done, info = env_wrap.step(action)
        print(obs)
        score += reward
    #print('Episode:{} Score:{}'.format(episode, score))
    total += score

print('Average score:{}'.format(total/episodes))