# Joint code

In [2]:
from functools import partial
from itertools import product
import gymnasium as gym
from tqdm import tqdm

In [None]:
def learn_and_save(model, name, **kwargs):
    model.learn(**kwargs)
    model.save("models/" + name)

: 

# Discrete control with classic control

I will use DQN and PPO both for discrete control.

They will be compared across two discrete control environments: CartPole and LunarLander.

In [None]:
# !pip install --upgrade pip  # Upgrade pip (optional)
# !pip install swig --force-reinstall
# !pip install "gymnasium[box2d]" --force-reinstall

In [None]:
from stable_baselines3 import PPO, DQN
algorithms = {
    "ppo": partial(PPO, "MlpPolicy", verbose=1),
    "dqn": partial(DQN, "MlpPolicy", verbose=1),
}

envs = [
    "CartPole-v1",
    "LunarLander-v3",
]

tests = {
    f"{algo}_{env}": (model(env=gym.make(env)), env)
    for (algo, model), env in 
    product(algorithms.items(), envs)
}

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




In [None]:
for name, (model, env) in tqdm(tests.items()):
    learn_and_save(model, env, name)

In [None]:
model_name = "dqn"
env_name = "LunarLander-v3"
env = gym.make(env_name, render_mode="human")
if model_name == "dqn":
    model = DQN.load(f"discrete_models/{model_name}_{env_name}")
else:
    model = PPO.load(f"discrete_models/{model_name}_{env_name}")

# Test the trained model
obs, info = env.reset()
for _ in range(1000):
    action, _ = model.predict(obs)  # Get action from the trained policy
    obs, reward, done, _, info = env.step(action)
    env.render()
    if done:
        obs, info = env.reset()

env.close()


# Continuous control baseline for the Mujoco environment

Here I will benchmark PPO, SAC and TD3 on the Mujoco environment. The goal is to provide a baseline for the continuous control problem. I will be usign th stable_baselines3 library for the implementations. 


In [None]:
# !pip install glfw
# !pip install mujoco --force-reinstall

In [4]:
from stable_baselines3 import PPO, TD3, SAC

tensorboard_logs = "logs/cont_model_training"
algorithms = {
    "sac": partial(SAC, "MlpPolicy", verbose=0, tensorboard_log = tensorboard_logs),
    "ppo": partial(PPO, "MlpPolicy", verbose=0, tensorboard_log = tensorboard_logs),
    "td3": partial(TD3, "MlpPolicy", verbose=0, tensorboard_log = tensorboard_logs),
}

envs = [
    "HalfCheetah-v5",
    "Walker2d-v5",
]

continuous_tests = {
    f"{algo}_{env}": (model(env=gym.make(env)), env)
    for (algo, model), env in 
    product(algorithms.items(), envs)
}



In [39]:
for name, (model, env) in tqdm(continuous_tests.items()):
    learn_and_save(model, name, total_timesteps = 100_000, progress_bar = True)



Output()



Output()



Output()



Output()



Output()



Output()

100%|██████████| 6/6 [1:11:25<00:00, 714.28s/it]


In [5]:
import pandas as pd
import numpy as np
from torch.utils.tensorboard import SummaryWriter

# Initialize TensorBoard writer
writer = SummaryWriter("logs/model_benchmarks")
results = []

for model_name, env_name in product(algorithms.keys(), envs):

    print(f"Testing {model_name} on {env_name}")


    output_path = f"models/{model_name}_{env_name}"
    if model_name == "ppo":
        model = PPO.load(output_path)
    elif model_name == "sac":
        model = SAC.load(output_path)
    elif model_name == "td3":
        model = TD3.load(output_path)

    env = gym.make(env_name)

    model_rewards = []
    for episode in range(10):
        obs, info = env.reset()
        episode_rewards = []
        step = 0
        
        while True:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, truncated, info = env.step(action)
            episode_rewards.append(reward)
            
            writer.add_scalar(f"{model_name}/{env_name}/reward", reward, global_step=step)
            step += 1
            
            if done or truncated:
                obs, info = env.reset()
                break
        
        episode_reward_sum = np.sum(episode_rewards)
        writer.add_scalar(f"{model_name}/{env_name}/episode_reward", episode_reward_sum, global_step=episode)
        model_rewards.append(episode_reward_sum)
    
    results.append((model_name, env_name, model_rewards))

# Convert results to DataFrame
results_df = pd.DataFrame(results, columns=["model", "env", "rewards"])
writer.close()
results_df

Testing sac on HalfCheetah-v5
Testing sac on Walker2d-v5
Testing ppo on HalfCheetah-v5
Testing ppo on Walker2d-v5
Testing td3 on HalfCheetah-v5
Testing td3 on Walker2d-v5


Unnamed: 0,model,env,rewards
0,sac,HalfCheetah-v5,"[3268.011316335609, 3229.958860041289, 3200.65..."
1,sac,Walker2d-v5,"[678.860195512473, 865.5477026981571, 973.5563..."
2,ppo,HalfCheetah-v5,"[820.4886555781516, 888.4022718585487, 945.185..."
3,ppo,Walker2d-v5,"[264.6140951780384, 272.8905004904235, 258.291..."
4,td3,HalfCheetah-v5,"[2449.923212954687, 2416.5050584828905, 2391.6..."
5,td3,Walker2d-v5,"[452.67596064325136, 432.21821284272283, 453.6..."


In [24]:
results_df["episode_rewards"] = results_df["rewards"].apply(lambda x: [sum(episode) for episode in x])
results_df["mean_rewards"] = results_df["episode_rewards"].apply(lambda x: sum(x) / len(x))
results_df

Unnamed: 0,model,env,rewards,episode_rewards,mean_rewards
0,sac,HalfCheetah-v5,"[[0.17729030124474038, 0.18531359240967804, 0....","[0.23795232037798686, -0.602733062121019, -0.8...",-0.808175
1,sac,Walker2d-v5,"[[1.0537013664859018, 1.0459833816374642, 1.02...","[272.6357501695, 265.81133252289095, 277.19786...",273.794685
2,ppo,HalfCheetah-v5,"[[0.021592438917991896, 0.014950960621128107, ...","[-0.4884005741504921, -0.42129596176091955, -0...",-0.231901
3,ppo,Walker2d-v5,"[[1.00358606554381, 1.0111977170558628, 1.0148...","[282.4551820165615, 278.55592123321344, 283.06...",282.488005
4,td3,HalfCheetah-v5,"[[0.6199017438119019, 0.3289844823446879, -0.4...","[-396.3309078673116, -396.47682597980105, -396...",-396.17821
5,td3,Walker2d-v5,"[[1.095917954011297, 1.0567903156864185, 0.945...","[-13.158023583736677, -13.53891233738134, -13....",-12.630525


In [None]:
from gymnasium.wrappers import RecordVideo

import os
os.environ["MUJOCO_GL"] = "glfw"  # or "glfw" if EGL fails


algo_name = "ppo"
env_name = "HalfCheetah-v5"

if algo_name == "ppo":
    model = PPO.load(f"models/{algo_name}_{env_name}")

env = gym.make(env_name)  # Ensure you have MuJoCo installed


# Test the trained model and record the video
obs, info = env.reset()
for _ in range(1000):
    action, _ = model.predict(obs)  # Get action from the trained policy
    obs, reward, done, _, info = env.step(action)
    if done:
        obs, info = env.reset()

env.close()