# Custom Environment
## 1. Import

In [113]:
import gymnasium as gym
from gymnasium import Env
from gymnasium.spaces import Discrete, Box, Dict, Tuple, MultiBinary, MultiDiscrete

import os
import numpy as np
import random

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy

## 2. Types of Spaces

In [20]:
Discrete(3).sample() # Value between 0 till 3

2

In [21]:
Box(0,1,shape=(3,3)).sample() # Value low=0, high=1, shape is output shape, in this case array of arrays (matrix)

array([[0.74903476, 0.7298928 , 0.9465985 ],
       [0.7472668 , 0.8261999 , 0.18240553],
       [0.9845458 , 0.7449783 , 0.41522068]], dtype=float32)

In [22]:
Tuple((Discrete(3),Box(0,1,shape=(3,)))).sample() # Tuple combines different spaces

(1, array([0.37930974, 0.2735864 , 0.77152306], dtype=float32))

In [23]:
Dict({'height':Discrete(2), 'speed':Box(0,100,shape=(1,))}).sample() # same as Tuple() but using a dictionary

OrderedDict([('height', 0), ('speed', array([87.94416], dtype=float32))])

In [24]:
MultiBinary(4).sample() # One hot encoding of multiple discrete values

array([0, 1, 0, 1], dtype=int8)

In [25]:
MultiDiscrete([5,2,2]).sample() # Multiple discrete values ranges

array([0, 0, 1])

## 3. Building an Environment

In [94]:
class ShowerEnv(Env):
    def __init__(self,simple=True):
        self.action_space = Discrete(3)
        self.observation_space = Box(low=0,high=100,shape=(1,))
        self.state = 38 + random.randint(-3,3)
        self.shower_length = 60
        self.info = {}
        self.simple = simple
        
    def step(self,action):
        # Apply action (change in temperature)
        self.state += action -1

        # Decrease episode lenght remaining (shower time)
        self.shower_length -= 1

        # Define reward function (immediate reward)
        if self.state >= 37 and self.state <= 39:
            if self.simple:
                reward = 1
            else:
                reward = 10
        else:
            if self.simple:
                reward = -1
            else:
                reward = -1 * 0.1*(60-self.shower_length)

        if self.shower_length <=0:
            truncated = True
            terminated = True
        else:
            truncated = False
            terminated = False

        return self.state,reward,terminated,truncated,self.info
        
    def render(self):
        pass
        
    def reset(self, seed=None):
        self.state = np.array([38 + random.randint(-3, 3)]).astype(float)
        self.shower_length = 60
        return self.state, {}

In [95]:
env_simple = ShowerEnv(simple=True)
env_advanced = ShowerEnv(simple=False)

## 4. Test Environment

In [96]:
episodes =5
for episode in range(1,episodes+1):
    obs = env_simple.reset()
    terminated = False
    score = 0
    
    while not terminated:
        env_simple.render()
        action = env_simple.action_space.sample()
        obs, reward, terminated,truncated, info = env_simple.step(action) 
        score += reward
    print('Episode:{} Score:{}'.format(episode,score))
env.close()

Episode:1 Score:-46
Episode:2 Score:-26
Episode:3 Score:-44
Episode:4 Score:22
Episode:5 Score:-24


## 5. Train Model

In [104]:
log_path = os.path.join('Training','Logs')
model_simple = PPO('MlpPolicy',env_simple,verbose=1,tensorboard_log=log_path)
model_advanced = PPO('MlpPolicy',env_advanced,verbose=1,tensorboard_log=log_path)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [105]:
# model_simple.learn(total_timesteps=80000)
model_advanced.learn(total_timesteps=80000)

Logging to Training/Logs/PPO_22
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 60       |
|    ep_rew_mean     | 108      |
| time/              |          |
|    fps             | 850      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 60           |
|    ep_rew_mean          | -14.6        |
| time/                   |              |
|    fps                  | 706          |
|    iterations           | 2            |
|    time_elapsed         | 5            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0072826417 |
|    clip_fraction        | 0.0305       |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.09        |
|    explained_variance   

<stable_baselines3.ppo.ppo.PPO at 0x7ff86979a2d0>

## 6. Save Model

In [106]:
shower_path_simple = os.path.join('Training','Saved_Models','Shower_Model_PPO_simpleReward')
shower_path_advanced = os.path.join('Training','Saved_Models','Shower_Model_PPO')
model_advanced.save(shower_path_advanced)

In [107]:
try:
    del model_simple
except:
    print("model simple does not exist")
try:
    del model_advanced
except:
    print("model advanced does not exist")
    
model_simple = PPO.load(shower_path_simple,env_simple)
model_advanced = PPO.load(shower_path_advanced,env_advanced)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [110]:
avg_reward,variance = evaluate_policy(model_simple,env_simple,n_eval_episodes=10,render=False)
print('simple model: avg_reward:{}, variance:{}'.format(avg_reward,variance))
avg_reward,variance = evaluate_policy(model_advanced,env_advanced,n_eval_episodes=10,render=False)
print('advanced model: avg_reward:{}, variance:{}'.format(avg_reward,variance))

simple model: avg_reward:59.4, variance:0.9165151389911681
advanced model: avg_reward:286.7999999910593, variance:383.5900937307958


Much higer variance in 