In [27]:
import gym
from gym import spaces
import numpy as np
import torch
from gym.envs.registration import register
import torch.nn as nn
import copy
from collections import deque
import random
import tqdm
import math
import matplotlib.pyplot as plt

In [28]:
class WaterTankEnv(gym.Env):

    @staticmethod
    def gen_params(A = 20.0, a = 2.0, b = 5.0) -> dict:
        """Returns a ``dict`` containing the physical parameters such as gravitational force and torque or speed limits."""

        td = {
                "max_voltage": 120.0,
                "min_voltage": 0.0,
                "max_H": 30.0,
                "min_H": 1e-6,
                "setpoint": 10.0, # setpoint
                "A": A,
                "a": a,
                "b": b,
                "dt": 0.05,
                "P_hat": 0.0,
                "eps": 1e-3,
                "tf": 100
        }
        return td

    def __init__(self):
        
        params = WaterTankEnv.gen_params()
        self.params = params

        # Create observation space. Pump voltage
        self.observation_space = spaces.Box(
            low = np.array([params["min_H"]]),
            high = np.array([params["max_H"]])
        )

        # Action spaces (number of actions)
        self.action_space = spaces.Box(
            low = params["min_voltage"],
            high = params["max_voltage"]
        ) # possible voltage values

        self._instantiate_initial_values()
    
    def _instantiate_initial_values(self):
        # Instantiate all state variables and independent variables
        if self.params is None:
            self.params = WaterTankEnv.gen_params()

        # Instantiate state variable
        self.H = np.random.uniform(
            low = self.params["min_H"],
            high = self.params["max_H"]
        )

        # Instantiate voltage
        self.v = np.random.uniform(
            low = self.params["min_voltage"],
            high = self.params["max_voltage"]
        )

        # Compute input flow
        self.P = self.params["b"]*self.v
        # Compute output flow
        self.Q = self.params["a"]*np.sqrt(self.H)

        self.state = np.array([self.H], dtype = np.float32) # our state

        self.t = 0

        return self.state

    def reset(self):
        return self._instantiate_initial_values() # state, info
    
    def step(self, action: torch.Tensor):
        # Update the new voltage value
        self.V = action
        
        # Clamp value
        self.V = np.clip(self.V, self.params["min_voltage"], self.params["max_voltage"])

        # Compute new input flow
        self.P = self.V * self.params["b"]

        # Compute new output flow
        self.Q = np.sqrt(self.H)*self.params["a"]

        # Get old water level
        H = self.H

        # Compute new H
        new_H = H + self.params["dt"]*(self.P - self.Q)/self.params["A"]

        # Update water level
        self.H = new_H

        # Clamp
        self.H = np.clip(self.H, self.params["min_H"], self.params["max_H"])

        # Compute cost
        cost = np.abs(self.H - self.params["setpoint"]) / self.params["setpoint"]

        # Compute reward
        reward = -cost

        # Increase time
        self.t += self.params["dt"]

        # Check if simulation is terminated
        terminated = np.abs(self.H - self.params["setpoint"]) <= self.params["eps"] or self.t >= self.params["tf"]

        # Return
        return np.array([self.H], dtype = np.float32), reward, terminated, {} # obs, reward, terminated, truncated, info                    

    def render(self):
        pass


In [29]:
env = WaterTankEnv()
env.reset()
action = torch.tensor([20], dtype = torch.float32)
observation, reward, done, info = env.step(action)
observation

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


array([[6.447536]], dtype=float32)

In [30]:
episodes = 10
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0
    episode_len = 0

    while not done:
        episode_len += 1
        env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score += reward
    print("Episode {} Score: {} Mean: {}".format(episode, score, score / episode_len))

Episode 1 Score: [-3982.5037] Mean: [-1.9902567]
Episode 2 Score: [-3971.5444] Mean: [-1.9847798]
Episode 3 Score: [-3962.839] Mean: [-1.9804293]
Episode 4 Score: [-3970.301] Mean: [-1.9841584]
Episode 5 Score: [-3979.2776] Mean: [-1.9886445]
Episode 6 Score: [-3991.323] Mean: [-1.9946642]
Episode 7 Score: [-3998.108] Mean: [-1.998055]
Episode 8 Score: [-4001.061] Mean: [-1.9995308]
Episode 9 Score: [-4000.6826] Mean: [-1.9993416]
Episode 10 Score: [-3997.4282] Mean: [-1.9977152]


In [31]:
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.shape[0]

net = nn.Sequential(
    nn.Linear(input_dim, 64),
    nn.Tanh(),
    nn.Linear(64, 64),
    nn.Tanh(),
    nn.Linear(64, 64),
    nn.Tanh(),
    nn.Linear(64, output_dim),
)

net(torch.tensor([5], dtype = torch.float32))

tensor([-0.1612], grad_fn=<ViewBackward0>)

In [32]:
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Define simulation time
tf = 5 # in seconds
N = int(math.ceil(tf/env.params["dt"]))

# Define training parameters
num_episodes = 50 
gamma = 0.99
max_iters_per_episode = N # equivalent to tf seconds
n_time_steps = N

rewards = []
water_levels = []
input_flows = []

print(f"Episodes {num_episodes}, steps per episode {max_iters_per_episode}")


Episodes 50, steps per episode 100


In [33]:
# Test model and env
state = env.reset()
state = torch.tensor(state, dtype = torch.float32)
q_values = net(state)
q_values

tensor([-0.1707], grad_fn=<ViewBackward0>)

In [36]:
# Train the model
pbar = tqdm.tqdm(range(num_episodes))
for episode in pbar:
    #state = torch.tensor(env.step(0)[0], dtype=torch.float32)
    state = env.reset()
    state = torch.tensor(state, dtype=torch.float32)
    total_reward = 0

    done = False
    episode_len = 0

    while not done:

        q_values = net(state) # get model values

        action = torch.max(q_values).item()
                
        next_state, reward, done, info = env.step(action) # Compute next state
        
        next_state_tensor = torch.tensor(next_state, dtype=torch.float32)
        
        target_q_value = reward + gamma * torch.max(net(next_state_tensor))
        
        loss =  criterion(q_values[action], target_q_value)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        state = next_state_tensor
        total_reward += reward

    print(f'Episode {episode + 1} - Total Reward: {total_reward}')
    rewards.append(total_reward)
    #water_levels.append(mean_episode_level)
    #input_flows.append(mean_episode_inflow)

#fig, axes = plt.subplots(nrows = 1, ncols = 3)
plt.figure(1)
plt.plot(np.arange(len(rewards))*env.dt, rewards)
#axes[1].plot(np.arange(len(water_levels))*env.dt, water_levels)
#axes[2].plot(np.arange(len(input_flows))*env.dt, input_flows)
plt.show()




  0%|          | 0/50 [00:00<?, ?it/s]


IndexError: only integers, slices (`:`), ellipsis (`...`), None and long or byte Variables are valid indices (got float)