In [1]:
# Model
import torch
import torch.nn as nn
from torch.distributions import MultivariateNormal

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.set_default_dtype(torch.float)


class Memory:
    def __init__(self):
        self.actions = []
        self.states = []
        self.logprobs = []
        self.rewards = []
        self.is_terminals = []

    def clear_memory(self):
        del self.actions[:]
        del self.states[:]
        del self.logprobs[:]
        del self.rewards[:]
        del self.is_terminals[:]


class ActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim, action_std):
        super(ActorCritic, self).__init__()
        self.actor = nn.Sequential(
            nn.Linear(state_dim, 64), nn.Tanh(), nn.Linear(64, 32), nn.Tanh(), nn.Linear(32, action_dim), nn.Tanh()
        )
        self.critic = nn.Sequential(
            nn.Linear(state_dim, 64), nn.Tanh(), nn.Linear(64, 32), nn.Tanh(), nn.Linear(32, 1)
        )
        self.action_var = torch.full((action_dim,), action_std * action_std).to(device)

    def forward(self):
        raise NotImplementedError

    def act(self, state, memory):
        action_mean = self.actor(state)
        cov_mat = torch.diag(self.action_var).to(device)

        dist = MultivariateNormal(action_mean, cov_mat)
        action = dist.sample()
        action_logprob = dist.log_prob(action)

        memory.states.append(state)
        memory.actions.append(action)
        memory.logprobs.append(action_logprob)

        return action.detach()

    def evaluate(self, state, action):
        action_mean = self.actor(state)
        action_var = self.action_var.expand_as(action_mean)
        cov_mat = torch.diag_embed(action_var).to(device)
        dist = MultivariateNormal(action_mean, cov_mat)

        action_logprobs = dist.log_prob(action)
        dist_entropy = dist.entropy()
        state_value = self.critic(state)

        return action_logprobs, torch.squeeze(state_value), dist_entropy

    def set_action_std(self, new_action_std):
        self.action_var = torch.full((self.action_var.shape[0],), new_action_std * new_action_std).to(device)


class PPO:
    def __init__(self, state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip):
        self.lr = lr
        self.betas = betas
        self.gamma = gamma
        self.eps_clip = eps_clip
        self.K_epochs = K_epochs

        self.policy = ActorCritic(state_dim, action_dim, action_std).to(device)
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas)

        self.policy_old = ActorCritic(state_dim, action_dim, action_std).to(device)
        self.policy_old.load_state_dict(self.policy.state_dict())

        self.MseLoss = nn.MSELoss()

    def set_action_std(self, new_action_std):
        self.policy.set_action_std(new_action_std)
        self.policy_old.set_action_std(new_action_std)

    def select_action(self, state, memory):
        state = torch.FloatTensor(state.reshape(1, -1)).to(device)
        return self.policy_old.act(state, memory).cpu().data.numpy().flatten()

    def update(self, memory):
        # Monte Carlo estimate of rewards:
        rewards = []
        discounted_reward = 0
        for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):
            if is_terminal:
                discounted_reward = 0
            discounted_reward = reward + (self.gamma * discounted_reward)
            rewards.insert(0, discounted_reward)

        # Normalizing the rewards
        rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
        rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5)

        # Convert list to tensor
        old_states = torch.squeeze(torch.stack(memory.states).to(device), 1).detach()
        old_actions = torch.squeeze(torch.stack(memory.actions).to(device), 1).detach()
        old_logprobs = torch.squeeze(torch.stack(memory.logprobs), 1).to(device).detach()

        # Optimize policy for K epochs
        for _ in range(self.K_epochs):
            # Evaluating old actions and values
            logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)

            # Finding ratios (pi_theta / pi_theta__old)
            ratios = torch.exp(logprobs - old_logprobs.detach())

            # Finding Surrogate Loss
            advantages = rewards - state_values.detach()
            surr1 = ratios * advantages
            surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
            loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy

            # Take gradient step
            self.optimizer.zero_grad()
            loss.mean().backward()
            self.optimizer.step()

        # Copy new weights into old policy
        self.policy_old.load_state_dict(self.policy.state_dict())


  return torch._C._cuda_getDeviceCount() > 0


In [4]:
# Test
import gymnasium as gym
import torch
import matplotlib.pyplot as plt
import numpy as np


class Test:
    def __init__(
        self,
        model_path="trained_model.pth",
        env_name="LunarLander-v3",
        max_timesteps=350,
        action_std=0.01,
        K_epochs=40,
        eps_clip=0.2,
        gamma=0.99,
        lr=0.0003,
        betas=(0.9, 0.999),
        n_simulations=100,
        env_kwargs=None,
    ):
        self.model_path = model_path
        self.n_simulations = n_simulations
        self.env_name = env_name
        self.max_timesteps = max_timesteps
        self.action_std = action_std
        self.K_epochs = K_epochs
        self.eps_clip = eps_clip
        self.gamma = gamma
        self.lr = lr
        self.betas = betas
        self.env_kwargs = env_kwargs or {"continuous": True}

    def run_simulation(self, env, ppo, memory):
        state, _ = env.reset()
        total_reward = 0
        timesteps = 0

        for t in range(self.max_timesteps):
            action = ppo.select_action(state, memory)
            state, reward, done, _, _ = env.step(action)
            total_reward += reward
            timesteps += 1
            if done:
                break

        return total_reward, timesteps

    def plot_results(self, rewards, timesteps):
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

        ax1.hist(rewards, bins=20, edgecolor="black")
        ax1.set_title("Distribution of Rewards")
        ax1.set_xlabel("Reward")
        ax1.set_ylabel("Frequency")
        ax1.set_xlim(-50, max(rewards))
        ax1.axvline(
            np.mean(rewards), color="r", linestyle="dashed", linewidth=1, label=f"Mean: {np.mean(rewards):.1f}"
        )
        ax1.legend()

        ax2.hist(timesteps, bins=20, edgecolor="black")
        ax2.set_title("Distribution of Episode Lengths")
        ax2.set_xlabel("Timesteps")
        ax2.set_ylabel("Frequency")
        ax2.axvline(
            np.mean(timesteps), color="r", linestyle="dashed", linewidth=1, label=f"Mean: {np.mean(timesteps):.1f}"
        )
        ax2.legend()

        plt.tight_layout()
        plt.savefig("test_results.png")
        plt.close()

    def load_and_test_model(self):
        env_no_render = gym.make(self.env_name, **self.env_kwargs)
        render_kwargs = {**self.env_kwargs, "render_mode": "human"}
        env_render = gym.make(self.env_name, **render_kwargs)

        state_dim = env_no_render.observation_space.shape[0]
        action_dim = env_no_render.action_space.shape[0]

        ppo = PPO(
            state_dim, action_dim, self.action_std, self.lr, self.betas, self.gamma, self.K_epochs, self.eps_clip
        )
        state_dict = torch.load(self.model_path, weights_only=True, map_location=torch.device("cpu"))
        ppo.policy_old.load_state_dict(state_dict)
        ppo.policy_old.eval()

        memory = Memory()
        rewards = []
        timesteps = []
        print(f"Running {self.n_simulations} simulations...")

        for i in range(self.n_simulations):
            if i % 10 == 0:
                print(f"Simulation {i}/{self.n_simulations}")

            if i < self.n_simulations - 1:
                reward, t = self.run_simulation(env_no_render, ppo, memory)
            else:
                reward, t = self.run_simulation(env_render, ppo, memory)

            rewards.append(reward)
            timesteps.append(t)
            memory.clear_memory()

        env_no_render.close()
        env_render.close()

        avg_reward = np.mean(rewards)
        std_reward = np.std(rewards)
        avg_timesteps = np.mean(timesteps)
        std_timesteps = np.std(timesteps)

        # Plot results
        self.plot_results(rewards, timesteps)

        # Print summary statistics
        print("\nTest Results Summary:")
        print(f"Average Reward: {avg_reward:.2f} ± {std_reward:.2f}")
        print(f"Average Episode Length: {avg_timesteps:.2f} ± {std_timesteps:.2f}")
        print("Results visualization saved as 'test_results.png'")

        return rewards, timesteps


In [6]:
# Test model already trained

test = Test()
_, _ = test.load_and_test_model()

Running 100 simulations...
Simulation 0/100
Simulation 10/100
Simulation 20/100
Simulation 30/100
Simulation 40/100
Simulation 50/100
Simulation 60/100
Simulation 70/100
Simulation 80/100
Simulation 90/100

Test Results Summary:
Average Reward: 224.69 ± 89.67
Average Episode Length: 228.55 ± 43.67
Results visualization saved as 'test_results.png'


In [2]:
# Train
import gymnasium as gym
import torch


class Train:
    def __init__(
        self,
        env_name="LunarLander-v3",
        render=True,
        render_interval=500,
        log_interval=20,
        max_episodes=1500,
        max_timesteps=350,
        update_timestep=2000,
        action_std=0.5,
        K_epochs=40,
        eps_clip=0.2,
        gamma=0.99,
        lr=0.0003,
        betas=(0.9, 0.999),
        random_seed=None,
        decay_threshold=100,
        decay_speed=0.93,
        env_kwargs=None,
    ):
        self.env_name = env_name
        self.render = render
        self.render_interval = render_interval
        self.log_interval = log_interval
        self.time_step = 0
        self.env_kwargs = env_kwargs or {"continuous": True}

        # Hyperparameters
        self.max_episodes = max_episodes
        self.max_timesteps = max_timesteps
        self.update_timestep = update_timestep
        self.action_std = action_std
        self.K_epochs = K_epochs
        self.eps_clip = eps_clip
        self.gamma = gamma
        self.lr = lr
        self.betas = betas
        self.decay_threshold = decay_threshold
        self.decay_speed = decay_speed
        self.random_seed = random_seed

        # Environment setup
        self.env = gym.make(env_name, **self.env_kwargs)
        state_dim = self.env.observation_space.shape[0]
        action_dim = self.env.action_space.shape[0]

        if random_seed:
            print("Random Seed: {}".format(random_seed))
            torch.manual_seed(random_seed)
            self.env.seed(random_seed)

        self.memory = Memory()
        from models import PPO

        self.ppo = PPO(state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip)

    def play_episode(self, env):
        curr_reward = 0
        state, _ = env.reset()
        for t in range(self.max_timesteps):
            self.time_step += 1
            action = self.ppo.select_action(state, self.memory)
            state, reward, done, _, _ = env.step(action)

            self.memory.rewards.append(reward)
            self.memory.is_terminals.append(done)

            if self.time_step % self.update_timestep == 0:
                self.ppo.update(self.memory)
                self.memory.clear_memory()
                self.time_step = 0

            curr_reward += reward
            if done:
                break
        return curr_reward, t

    def train(self):
        running_reward = 0
        avg_length = 0
        history_avg_reward = []

        for i_episode in range(1, self.max_episodes + 1):
            if self.render and i_episode % self.render_interval == 0:
                render_env = gym.make(self.env_name, render_mode="human", **self.env_kwargs)
                r, length = self.play_episode(render_env)
                running_reward += r
                avg_length += length
                print(f"Render human : length: {length} \t reward: {r}")
            else:
                r, length = self.play_episode(self.env)
                running_reward += r
                avg_length += length

            if i_episode % 500 == 0:
                torch.save(self.ppo.policy.state_dict(), f"./PPO_continuous_{self.env_name}.pth")
                print(f"Saved at episode {i_episode}")

            if i_episode % self.log_interval == 0:
                avg_length = int(avg_length / self.log_interval)
                running_reward = int((running_reward / self.log_interval))

                print(f"Episode {i_episode} \t Avg length: {avg_length} \t Avg reward: {running_reward}")
                history_avg_reward.append((running_reward, avg_length))

                if running_reward > self.decay_threshold and self.action_std > 0.1:
                    self.action_std = self.action_std * self.decay_speed
                    self.ppo.set_action_std(self.action_std)
                    print("action_std : ", self.action_std)

                running_reward = 0
                avg_length = 0

        with open("history_avg_reward.txt", "w") as file:
            file.write("\n".join(str(reward) for reward in history_avg_reward))

        return history_avg_reward


In [3]:
# Train on 500 episodes, to see that it runs and improves in less than 2 minutes.
import time


start_time = time.time()
train = Train(
    render=True,
    render_interval=250,
    log_interval=20,
    max_episodes=500,
    max_timesteps=300,
    update_timestep=2000,
)
history_avg_reward = train.train()
execution_time = time.time() - start_time
print("Training completed in %s seconds" % (execution_time))

Episode 20 	 Avg length: 109 	 Avg reward: -247
Episode 40 	 Avg length: 105 	 Avg reward: -304
Episode 60 	 Avg length: 101 	 Avg reward: -198
Episode 80 	 Avg length: 106 	 Avg reward: -225
Episode 100 	 Avg length: 116 	 Avg reward: -173
Episode 120 	 Avg length: 105 	 Avg reward: -169
Episode 140 	 Avg length: 114 	 Avg reward: -232
Episode 160 	 Avg length: 118 	 Avg reward: -142
Episode 180 	 Avg length: 113 	 Avg reward: -85
Episode 200 	 Avg length: 114 	 Avg reward: -103
Episode 220 	 Avg length: 115 	 Avg reward: -129
Episode 240 	 Avg length: 114 	 Avg reward: -122
Render human : length: 127 	 reward: -88.37317039447605
Episode 260 	 Avg length: 113 	 Avg reward: -92
Episode 280 	 Avg length: 109 	 Avg reward: -86
Episode 300 	 Avg length: 109 	 Avg reward: -111
Episode 320 	 Avg length: 101 	 Avg reward: -105
Episode 340 	 Avg length: 100 	 Avg reward: -66
Episode 360 	 Avg length: 117 	 Avg reward: -56
Episode 380 	 Avg length: 109 	 Avg reward: -72
Episode 400 	 Avg lengt