In [None]:
%pip install swig
%pip install gymnasium[box2d]
%pip install stable-baselines3[extra]

# PPO

In [None]:
# ppo_gae_minibatch.py
import math
import random
import numpy as np
import torch
import torch.nn as nn
from torch.distributions import MultivariateNormal, Categorical
import gymnasium as gym
from collections import deque
import time # Import time for rendering delay
import imageio # Import imageio for GIF creation

# Ensure imageio is installed
try:
    import imageio
except ImportError:
    print("imageio not found. Installing imageio...")
    %pip install imageio
    import imageio

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ---------------------------
# Rollout Buffer (for GAE)
# ---------------------------
class RolloutBuffer:
    def __init__(self):
        self.states = []
        self.actions = []
        self.log_probs = []
        self.rewards = []
        self.dones = []
        self.state_values = []

    def clear(self):
        self.__init__()

    def to_tensors(self):
        # stack and move to device
        states = torch.stack(self.states).to(device)  # (T, state_dim)
        actions = torch.stack(self.actions).to(device)
        log_probs = torch.stack(self.log_probs).to(device)
        state_values = torch.stack(self.state_values).to(device)
        rewards = torch.tensor(self.rewards, dtype=torch.float32).to(device)
        dones = torch.tensor(self.dones, dtype=torch.float32).to(device)
        return states, actions, log_probs, state_values, rewards, dones

# ---------------------------
# ActorCritic
# ---------------------------
class ActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim, has_continuous_action_space, action_std_init=0.6):
        super().__init__()
        self.has_continuous_action_space = has_continuous_action_space
        self.action_dim = action_dim

        # shared or separate nets? we keep separate heads for clarity
        # actor
        if has_continuous_action_space:
            self.actor = nn.Sequential(
                nn.Linear(state_dim, 64),
                nn.Tanh(),
                nn.Linear(64, 64),
                nn.Tanh(),
                nn.Linear(64, action_dim),
                nn.Tanh()  # outputs in [-1,1], scale externally as needed
            )
            # store action variance (as tensor of variances)
            self.action_var = torch.full((action_dim,), action_std_init * action_std_init).to(device)
        else:
            self.actor = nn.Sequential(
                nn.Linear(state_dim, 64),
                nn.Tanh(),
                nn.Linear(64, 64),
                nn.Tanh(),
                nn.Linear(64, action_dim),
                nn.Softmax(dim=-1)
            )

        # critic
        self.critic = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.Tanh(),
            nn.Linear(64, 64),
            nn.Tanh(),
            nn.Linear(64, 1)
        )

    def set_action_std(self, new_action_std):
        if not self.has_continuous_action_space:
            print("Warning: trying to set action std for discrete action space.")
            return
        self.action_var = torch.full((self.action_dim,), new_action_std * new_action_std).to(device)

    def forward(self):
        raise NotImplementedError

    def act(self, state):
        """
        state: tensor (state_dim,) or (1, state_dim) -> returns action, logprob, value
        """
        if state.dim() == 1:
            state = state.unsqueeze(0)  # -> (1, state_dim)

        if self.has_continuous_action_space:
            action_mean = self.actor(state)  # (B, action_dim)
            action_var = self.action_var.expand_as(action_mean)
            cov_mat = torch.diag_embed(action_var)  # (B, action_dim, action_dim)
            dist = MultivariateNormal(action_mean, cov_mat)
        else:
            action_probs = self.actor(state)
            dist = Categorical(action_probs)

        action = dist.sample()
        action_logprob = dist.log_prob(action)
        state_val = self.critic(state).squeeze(-1)  # (B,)

        # squeeze if batch size 1
        return action.squeeze(0), action_logprob.squeeze(0), state_val.squeeze(0)

    def evaluate(self, states, actions):
        """
        states: (N, state_dim)
        actions: (N, ...) - shape depends on discrete/continuous
        returns: logprobs (N,), values (N,), entropy (N,)
        """
        if self.has_continuous_action_space:
            action_mean = self.actor(states)
            action_var = self.action_var.expand_as(action_mean)
            cov_mat = torch.diag_embed(action_var)
            dist = MultivariateNormal(action_mean, cov_mat)
        else:
            action_probs = self.actor(states)
            dist = Categorical(action_probs)

        action_logprobs = dist.log_prob(actions)
        dist_entropy = dist.entropy()
        state_values = self.critic(states).squeeze(-1)
        return action_logprobs, state_values, dist_entropy

# ---------------------------
# PPO with GAE + Minibatch
# ---------------------------
class PPO:
    def __init__(
        self,
        state_dim,
        action_dim,
        has_continuous_action_space,
        lr_actor=3e-4,
        lr_critic=1e-3,
        gamma=0.99,
        K_epochs=10,
        eps_clip=0.2,
        gae_lambda=0.95,
        action_std_init=0.6,
        entropy_coef=0.01,
        max_grad_norm=0.5,
        minibatch_size=64,
        device=device
    ):
        self.device = device
        self.gamma = gamma
        self.eps_clip = eps_clip
        self.K_epochs = K_epochs
        self.gae_lambda = gae_lambda
        self.entropy_coef = entropy_coef
        self.max_grad_norm = max_grad_norm
        self.minibatch_size = minibatch_size

        self.has_continuous_action_space = has_continuous_action_space

        self.policy = ActorCritic(state_dim, action_dim, has_continuous_action_space, action_std_init).to(self.device)
        self.policy_old = ActorCritic(state_dim, action_dim, has_continuous_action_space, action_std_init).to(self.device)
        self.policy_old.load_state_dict(self.policy.state_dict())

        # separate optimizers for actor and critic (we pass actor params and critic params)
        self.optimizer = torch.optim.Adam([
            {'params': self.policy.actor.parameters(), 'lr': lr_actor},
            {'params': self.policy.critic.parameters(), 'lr': lr_critic}
        ])

        self.buffer = RolloutBuffer()
        self.mse_loss = nn.MSELoss()

    def set_action_std(self, new_action_std):
        if self.has_continuous_action_space:
            self.policy.set_action_std(new_action_std)
            self.policy_old.set_action_std(new_action_std)
        else:
            print("Warning: discrete action space, ignoring set_action_std.")

    def select_action(self, state):
        """
        state: numpy array from env
        returns: action in env-friendly format (numpy scalar/array)
        and internally stores tensors in buffer
        """
        state_t = torch.FloatTensor(state).to(self.device)
        with torch.no_grad():
            action, action_logprob, state_val = self.policy_old.act(state_t)

        # store tensors (ensure consistent shapes)
        self.buffer.states.append(state_t)
        # actions: ensure actions stored as tensor (for discrete it's scalar tensor)
        self.buffer.actions.append(action.detach())
        self.buffer.log_probs.append(action_logprob.detach())
        self.buffer.state_values.append(state_val.detach())

        if self.has_continuous_action_space:
            return action.cpu().numpy()
        else:
            return action.item()

    def compute_gae(self, rewards, dones, values, next_value):
        """
        rewards, dones, values: all torch tensors (1D) of length T
        next_value: scalar tensor (value at t+1)
        returns: advantages (T,), returns (T,) (i.e., targets for critic)
        """
        T = len(rewards)
        advantages = torch.zeros(T, dtype=torch.float32).to(self.device)
        last_gae = 0.0
        for t in reversed(range(T)):
            mask = 1.0 - dones[t]  # 0 if done, 1 if not done
            delta = rewards[t] + self.gamma * next_value * mask - values[t]
            last_gae = delta + self.gamma * self.gae_lambda * mask * last_gae
            advantages[t] = last_gae
            next_value = values[t]
        returns = advantages + values
        return advantages, returns

    def update(self):
        # convert buffer to tensors
        states, actions, old_log_probs, old_values, rewards, dones = self.buffer.to_tensors()

        # compute next_value for last step (bootstrap)
        with torch.no_grad():
            # last state's value predicted by old policy (if buffer empty this will error upstream)
            if len(self.buffer.states) == 0:
                return
            last_state = self.buffer.states[-1]
            next_value = self.policy_old.critic(last_state.unsqueeze(0)).squeeze(0).detach()
            # if last step was terminal, next_value should be 0 -> handled in compute_gae via dones mask

        # compute advantages & returns via GAE
        advantages, returns = self.compute_gae(rewards, dones, old_values.squeeze(-1), next_value)

        # normalize advantages
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        # flatten tensors: states (T, state_dim), actions shape depends, log_probs (T,)
        T = states.shape[0]
        batch_size = T
        # Prepare indices for minibatching
        indices = np.arange(batch_size)

        # convert returns to same shape as values for MSE
        returns = returns.detach()

        # repeat K_epochs times with minibatches
        for epoch in range(self.K_epochs):
            np.random.shuffle(indices)
            for start in range(0, batch_size, self.minibatch_size):
                end = start + self.minibatch_size
                mb_idx = indices[start:end]

                mb_states = states[mb_idx]
                mb_actions = actions[mb_idx]
                mb_old_log_probs = old_log_probs[mb_idx]
                mb_returns = returns[mb_idx]
                mb_advantages = advantages[mb_idx]

                # evaluate current policy
                logprobs, state_values, dist_entropy = self.policy.evaluate(mb_states, mb_actions)
                # Ensure shapes
                state_values = state_values.view(-1)

                # ratio for PPO
                ratios = torch.exp(logprobs - mb_old_log_probs.detach())

                surr1 = ratios * mb_advantages
                surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * mb_advantages
                loss_actor = -torch.min(surr1, surr2).mean()
                loss_critic = self.mse_loss(state_values, mb_returns)
                loss_entropy = -self.entropy_coef * dist_entropy.mean()

                loss = loss_actor + loss_critic + loss_entropy

                self.optimizer.zero_grad()
                loss.backward()
                # gradient clipping
                torch.nn.utils.clip_grad_norm_(self.policy.parameters(), self.max_grad_norm)
                self.optimizer.step()

        # copy new weights into old policy
        self.policy_old.load_state_dict(self.policy.state_dict())
        # clear buffer
        self.buffer.clear()

# ---------------------------
# Training loop (CartPole example)
# ---------------------------
def train_cartpole(
    env_name="MountainCar-v0",
    max_updates=1000,
    max_steps_per_update=2048,
    update_epochs=10,
    minibatch_size=64,
    render=False,
    seed=42
):
    # seed
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

    env = gym.make(env_name)
    state_dim = env.observation_space.shape[0]

    # Determine if action space is continuous or discrete
    if isinstance(env.action_space, gym.spaces.Box):
        action_dim = env.action_space.shape[0] # For continuous action spaces
        is_continuous = True
    elif isinstance(env.action_space, gym.spaces.Discrete):
        action_dim = env.action_space.n # For discrete action spaces
        is_continuous = False
    else:
        raise NotImplementedError("Unsupported action space type")

    ppo = PPO(
        state_dim=state_dim,
        action_dim=action_dim,
        has_continuous_action_space=is_continuous,
        lr_actor=3e-4,
        lr_critic=1e-3,
        gamma=0.99,
        K_epochs=update_epochs,
        eps_clip=0.2,
        gae_lambda=0.95,
        action_std_init=0.6,
        entropy_coef=0.01,
        max_grad_norm=0.5,
        minibatch_size=minibatch_size,
        device=device
    )

    running_reward = 0
    avg_length = 0
    time_step = 0
    scores_deque = deque(maxlen=100)

    obs, info = env.reset(seed=seed)
    state = obs
    penalty = mn = mx = 0
    for update in range(1, max_updates + 1):
        # collect trajectories until we have max_steps_per_update transitions
        for step in range(max_steps_per_update):
            action = ppo.select_action(state)

            # step environment
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            # store reward and done
            ppo.buffer.rewards.append(reward)
            ppo.buffer.dones.append(float(done))

            state = next_state
            time_step += 1
            running_reward += reward
            avg_length += 1

            if render:
                env.render()

            if done:
                if terminated:
                    print("=========================================")
                    print("         Reached To The Top!!!!!!!       ")
                    print("=========================================")

                scores_deque.append(running_reward)
                obs, info = env.reset()
                state = obs
                avg_length = 0
                penalty = 0

        # After collecting enough samples, call update
        ppo.update()

        # logging
        if len(scores_deque) > 0:
            avg_score = np.mean(scores_deque)
        else:
            avg_score = running_reward / (update * max_steps_per_update)

        print(f"Update {update}\tAverage100EpLen {avg_score:.2f}\tRunning reward (last batch sum) {np.mean(scores_deque)}")
        # stopping condition (CartPole solved ~195 over 100 episodes)
        if np.mean(scores_deque) >= 50:
            print("Environment solved!")
            break

        running_reward = 0

    env.close()
    return ppo

def demonstrate_policy(ppo_agent, env_name="CartPole-v1", num_episodes=5, seed=42, save_gif=False, gif_filename="policy_demonstration.gif", gif_fps=30):
    """Demonstrates the trained PPO agent in the environment with rendering, and optionally saves a GIF."""
    print(f"\nDemonstrating policy for {env_name}...")

    render_mode = "rgb_array" if save_gif else "human"
    eval_env = gym.make(env_name, render_mode=render_mode)

    all_frames = []

    # If saving GIF, only run for 1 episode to keep file size manageable
    episodes_to_run = 1 if save_gif else num_episodes

    for episode in range(episodes_to_run):
        obs, info = eval_env.reset(seed=seed + episode)
        state = obs
        done = False
        total_reward = 0
        step_count = 0

        print(f"--- Episode {episode + 1} ---")
        while not done:
            action = ppo_agent.select_action(state)
            if abs(action) < 0.33:
                action = 1
            elif action >= 0.33:
                action = 2
            else:
                action = 0
            next_state, reward, terminated, truncated, info = eval_env.step(action)
            done = terminated or truncated

            if save_gif:
                frame = eval_env.render()
                all_frames.append(frame)
            else: # Human render mode
                eval_env.render()
                time.sleep(0.01) # Small delay for better visualization

            state = next_state
            total_reward += reward
            step_count += 1
        print(f"Episode finished after {step_count} steps with total reward: {total_reward}")

    eval_env.close()
    print("Demonstration complete.")

    if save_gif and len(all_frames) > 0:
        print(f"Saving GIF to {gif_filename}...")
        imageio.mimsave(gif_filename, all_frames, fps=gif_fps)
        print("GIF saved.")

    return all_frames # Return frames if needed elsewhere (though not explicitly requested, good for flexibility)

env_train = "MountainCarContinuous-v0"
env_simul = 'MountainCar-v0'

if __name__ == "__main__":
    # 실행 예시: (터미널에서) python ppo_gae_minibatch.py
    trained_ppo = train_cartpole(
        env_name=env_train,
        max_updates=200,
        max_steps_per_update=2048,
        update_epochs=10,
        minibatch_size=64,
        render=False,
        seed=123
    )

    # Demonstrate the trained policy and save GIF
    demonstrate_policy(trained_ppo, env_name=env_simul, num_episodes=1, seed=123, save_gif=True, gif_filename="Cart_ppo_demo.gif")

    # If you still want to see the human rendering for multiple episodes, you can call it again
    # demonstrate_policy(trained_ppo, env_name="CartPole-v1", num_episodes=3, seed=123, save_gif=False)


# PPO(SB3)

In [None]:
%pip install shimmy

In [None]:
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
import gymnasium as gym

env = gym.make("MountainCar-v0")

dqn = DQN(
    "MlpPolicy",
    env=env,
    verbose=1,
    batch_size=256,
    gamma=0.99,
    learning_rate=0.0004,
    exploration_initial_eps=1,
    exploration_final_eps=0.2,
    exploration_fraction=0.3
)

dqn.learn(total_timesteps=3000000, log_interval=10)

In [None]:
import gymnasium as gym
import time
import imageio
from stable_baselines3.common.base_class import BaseAlgorithm # Import to check SB3 agent type

def demonstrate_policy(agent, env_name="CartPole-v1", num_episodes=5, seed=42, save_gif=False, gif_filename="policy_demonstration.gif", gif_fps=30):
    """
    Demonstrates a trained RL agent in an environment with rendering, and optionally saves a GIF.
    This function is adapted to work with both custom agents (having 'select_action')
    and Stable-Baselines3 agents (having 'predict').
    """
    print(f"\nDemonstrating policy for {env_name}...")

    render_mode = "rgb_array" if save_gif else "human"
    eval_env = gym.make(env_name, render_mode=render_mode)

    all_frames = []

    # If saving GIF, only run for 1 episode to keep file size manageable
    episodes_to_run = 1 if save_gif else num_episodes

    # Check if the agent is a Stable-Baselines3 agent
    is_sb3_agent = isinstance(agent, BaseAlgorithm)

    for episode in range(episodes_to_run):
        obs, info = eval_env.reset(seed=seed + episode)
        state = obs
        done = False
        total_reward = 0
        step_count = 0

        print(f"--- Episode {episode + 1} ---")
        while not done:
            action = None
            if is_sb3_agent:
                # Stable-Baselines3 agents use .predict()
                action, _states = agent.predict(state, deterministic=True)
                # For discrete action spaces (like MountainCar-v0 with DQN), action is already the correct integer.
                # No further mapping needed.
            elif hasattr(agent, 'select_action'):
                # Custom PPO-like agents might have .select_action()
                action = agent.select_action(state)
                # This specific mapping was used in the PPO example for MountainCar-v0 simulation
                # when the PPO agent was trained on continuous MountainCarContinuous.
                # This logic is applied only for that specific case.
                if env_name == "MountainCar-v0" and hasattr(agent, 'has_continuous_action_space') and agent.has_continuous_action_space:
                    action = np.argmax(action)
            else:
                raise AttributeError("Agent must have 'select_action' or be a Stable-Baselines3 agent with 'predict' method.")

            next_state, reward, terminated, truncated, info = eval_env.step(action)
            done = terminated or truncated

            if save_gif:
                frame = eval_env.render()
                all_frames.append(frame)
            else: # Human render mode
                eval_env.render()
                time.sleep(0.01) # Small delay for better visualization

            state = next_state
            total_reward += reward
            step_count += 1
        print(f"Episode finished after {step_count} steps with total reward: {total_reward}")

    eval_env.close()
    print("Demonstration complete.")

    if save_gif and len(all_frames) > 0:
        print(f"Saving GIF to {gif_filename}...")
        imageio.mimsave(gif_filename, all_frames, fps=gif_fps)
        print("GIF saved.")

    return all_frames

In [None]:
demonstrate_policy(env_name="MountainCar-v0", agent = dqn, save_gif = True)
print()