In [None]:
import numpy as np
import torch

import gym
from gym import Env
from gym.spaces import Discrete, Box, Dict
from gym.utils import seeding

import random

from a import PPO
import os
import matplotlib.pyplot as plt

In [None]:
class Heater(Env):
    def __init__(self):

        self.done = False
        self.reward = 0
        self.max_time = 30

        # Action declaration
        self.action_space = Dict({
            "discrete": Discrete(3), # Discrete actions up, down, stay
            "continuous": Box(low=np.array([0., ]), high=np.array([1., ]), dtype=np.float32) 
        })

        # Temperature array
        self.low = np.array([0.,])
        self.high = np.array([100.,])
        self.observation_space = Box(low=self.low, high=self.high, dtype=np.float32)
        
        # Set start temp and start time
        self.reset()
    
    
    def seed(self,seed=None):
        self.np_random, seed = seeding.np_random(seed)

    def step(self, action):
        temp = self.state[0]

        d_action = action["discrete"]

        c_action = action["continuous"]
        c_action = self.interpolation(c_action)[0]
        
        if d_action == 0: # Increase temperature
            temp += c_action
        
        elif d_action == 1: # Decrease temperature
            temp -= c_action
        
        elif d_action == 2:
            temp += 0
        
        # Reward function

        if self.time > 0:
            self.reward -= abs(38.0 - temp) 
            
            if 37.8 <= temp <= 38.2:
                reward = 100
                self.done = True
            
            if abs(38.0 - temp) >= 20:
                reward = -100
                self.done = True

        # Reduce time by 1 second
        self.time -= 1 


        self.state = np.array([temp, ], dtype=np.float32)

        if self.time <= 0:
            self.done = True
            
        # Set placeholder for info
        info = {}
        
        # Return step information
        return self.state, self.reward, self.done, info
    
    def interpolation(self, x):
        # interpolation
        y1 = 0.
        y2 = 5.
        x1 = 0.
        x2 = 1.

        y = y1 + ((y2 - y1)/(x2 - x1))*(x-x1)
        return y

    def render(self):
        # Implement visualization --> in this case is not built
        pass
    
    def reset(self):
        # Reset shower temperature
        temp =  38. + random.randint(-5, 5)
        self.state = np.array([temp, ], dtype=np.float32)

        self.done = False
        self.reward = 0
        
        # Reset shower time
        self.time = self.max_time 
        return self.state


In [None]:
env = Heater()
episodes = 10

for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0
    temps = []

    while not done:
        action = env.action_space.sample()  
        temp, reward, done, info = env.step(action)
        temps.append(temp) 
        score +=reward
    mean_temp = np.mean(np.array(temps))
    print(f'Episode: {episode}, Mean temperature: {mean_temp:.2f} Score: {score}')

### Helper functions

In [None]:
def evaluate_policy(env, model, render, steps_per_epoch):
    scores = 0
    turns = 3
    for j in range(turns):
        s, done, ep_r, steps = env.reset(), False, 0, 0
        while not (done or (steps >= steps_per_epoch)):
            # Take deterministic actions at test time
            action_d, action_c = model.evaluate(s)
            action = {
                "discrete": action_d,
                "continuous": action_c 
            }
            s_prime, r, done, info = env.step(action)

            ep_r += r
            steps += 1
            s = s_prime
            if render:
                env.render()
        scores += ep_r
    return scores/turns


def plot_learning_curve(x, scores):
    running_avg = np.zeros(len(scores))
    for i in range(len(running_avg)):
        running_avg[i] = np.mean(scores[max(0, i-100):(i+1)])
    
    plt.plot(x, running_avg, label="Running average")
    plt.plot(x, scores, alpha=0.4)
    plt.title('Learning plot')
    plt.xlabel("Runs")
    plt.ylabel("Scores")
    plt.legend(loc="best")

### Main training

In [None]:
def main():
    import os
    import numpy as np
    import torch
    import random

    random_seed = 0
    torch.manual_seed(random_seed)
    np.random.seed(random_seed)
    random.seed(random_seed)

    env = Heater()
    env.seed(random_seed)

    # Evaluation environment
    eval_env = Heater()
    eval_env.seed(random_seed)

    # Hyperparameters
    kwargs = {
        "state_dim": env.observation_space.shape[0], 
        "actions": env.action_space, 
        "env_with_Dead": True,
        "gamma": 0.99, 
        "gae_lambda": 0.95, 
        "policy_clip": 0.2, 
        "n_epochs": 10, 
        "net_width": 128, 
        "lr": 3e-4, 
        "l2_reg": 1e-3, 
        "batch_size": 256,
        "adv_normalization": True, 
        "entropy_coef": 0.01,        # 🔧 reduced for stability
        "entropy_coef_decay": 0.9998
    }

    N = 2048                     # length of long trajectory
    max_steps = env.max_time     # max steps per episode
    Max_train_steps = int(1e3)
    save_interval = int(10e3)
    eval_interval = int(5e3)
    best_interval = int(50e3)
    best_score = -1e9

    if not os.path.exists('model'): 
        os.mkdir('model')
    if not os.path.exists('best_model'): 
        os.mkdir('best_model')
    
    model = PPO(**kwargs)

    traj_length = 0
    total_steps = 0
    score_history = []
    update = 0
    num_updates = Max_train_steps // N

    while total_steps < Max_train_steps:
        obs = env.reset()
        done, steps, score = False, 0, 0

        # ---------- Interact with environment ----------
        while not done and steps < max_steps:
            steps += 1
            traj_length += 1
            total_steps += 1

            action_d, probs_d, action_c, probs_c = model.select_action(obs)
            action = {"discrete": action_d, "continuous": action_c}
            obs_, reward, done, info = env.step(action)

            dw = bool(done and steps != max_steps)  # dead/win detection
            model.put_data((obs, action_d, action_c, reward, obs_, probs_d, probs_c, done, dw))

            obs = obs_
            score += reward

            # ---------- Train ----------
            if traj_length % N == 0:
                a_losses, c_loss, entropies = model.train()
                traj_length = 0
                update += 1

                # Linear LR decay
                frac = 1.0 - (update - 1.0) / num_updates
                lrnow = frac * kwargs["lr"]
                model.optimizer_d.param_groups[0]["lr"] = lrnow
                model.optimizer_c.param_groups[0]["lr"] = lrnow
                model.critic_optimizer.param_groups[0]["lr"] = lrnow

            # ---------- Evaluate ----------
            if total_steps % eval_interval == 0:
                eval_score = evaluate_policy(eval_env, model, False, max_steps)
                score_history.append(eval_score)
                print(
                    f"Env: Heater | Steps: {int(total_steps/1000)}k "
                    f"| Eval score: {eval_score:.2f}"
                )

            # ---------- Save ----------
            if total_steps % save_interval == 0:
                model.save(total_steps)

            # ---------- Best model ----------
            if total_steps >= best_interval:
                if score_history and score_history[-1] > best_score:
                    best_score = score_history[-1]
                    model.best_save()

        print(f"Episode done | Steps: {steps} | Score: {score:.2f}")

    env.close()

    # ---------- Plot learning curve ----------
    x = [i+1 for i in range(len(score_history))]
    plot_learning_curve(x, score_history)


if __name__ == "__main__":
    main()


### Model testing

In [None]:
# Hyperparameters
env = Heater()
# Hyperparameters
kwargs = {
    "state_dim": env.observation_space.shape[0], 
    "actions": env.action_space, 
    "env_with_Dead": True,
    "gamma": 0.99, 
    "gae_lambda": 0.95, 
    "policy_clip": 0.2, 
    "n_epochs": 10, 
    "net_width": 128, 
    "lr": 3e-4, 
    "l2_reg": 1e-3, 
    "batch_size": 1,
    "adv_normalization": True, 
    "entropy_coef": 0, 
    "entropy_coef_decay": 0.9998
}


model = PPO(**kwargs)

model.load_best()
scores = []


for i in range(2):
    obs = env.reset()
    actions = []
    score = 0
    while True:
        action_d, _, action_c,_ = model.select_action(obs)
        action = {
                    "discrete": action_d,
                    "continuous": action_c 
                }
        print(action)
        obs, reward, done, info = env.step(action)
        score += reward
        actions.append(action)

        if done:
            print(f"Done, points: {score}")
            break
    
    scores.append(score)

print(f"Mean score: {np.mean(scores)}")

