In [6]:
#import libs

import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
import torch.multiprocessing as mp

import gymnasium as gym

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)
plt.rcParams["figure.figsize"] = (10, 5)

cuda


In [7]:
#Khởi tạo môi trường

env_name = "InvertedPendulum-v4"
# Create and wrap the environment
env = gym.make(env_name)
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50)  # Records episode-reward

# Observation-space of InvertedPendulum-v4 (4)
obs_space_dims = env.observation_space.shape[0]
# Action-space of InvertedPendulum-v4 (1)
action_space_dims = env.action_space.shape[0]
rewards_over_seeds = []


In [8]:
#Tạo folder kết quả

import os
def mkdir(path): 
    if not os.path.exists(path):
        os.mkdir(path)

save_path = "./save"
env_path = f'{save_path}/{env_name}'
model_path = f'{save_path}/{env_name}/model'
demo_path = f'{save_path}/{env_name}/demo'

mkdir(save_path)
mkdir(env_path)
mkdir(model_path)
mkdir(demo_path)

In [9]:
#Khai báo thuật toán
from stable_baselines3 import PPO
from sb3_contrib import TRPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy

policy_kwargs = dict(
    activation_fn=torch.nn.Tanh,
    net_arch=[dict(pi=[128, 128], vf=[128, 128])]
)

In [10]:
#train model

seed_set = [1]
total_num_episodes = 5000  # Total number of episodes
evaluation_interval = 1000
learning_rate = 3e-4
for seed in seed_set:
    np.random.seed(seed)
    torch.manual_seed(seed)

    # Initialize the PPO agent
    model = PPO("MlpPolicy", env, policy_kwargs=policy_kwargs, verbose=0, seed=seed, learning_rate=learning_rate)
    # model = TRPO("MlpPolicy", env, policy_kwargs=policy_kwargs, verbose=0, seed=seed)

    rewards = []

    for episode in range(0, total_num_episodes + 1, evaluation_interval):
        if episode > 0:
            # Continue training the agent
            model.learn(total_timesteps=evaluation_interval)

        # Evaluate the agent
        mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
        print(f"Seed: {seed}, Episode: {episode}, Mean Reward: {mean_reward}, Std Reward: {std_reward}")
        rewards.append((episode, mean_reward))

        # Save the model
        model.save(f"{model_path}/{env_name}_ppo_seed_{seed}_episode_{episode}")

    rewards_over_seeds.append(rewards)
    

TypeError: empty() received an invalid combination of arguments - got (tuple, dtype=NoneType, device=NoneType), but expected one of:
 * (tuple of ints size, *, tuple of names names, torch.memory_format memory_format, torch.dtype dtype, torch.layout layout, torch.device device, bool pin_memory, bool requires_grad)
 * (tuple of ints size, *, torch.memory_format memory_format, Tensor out, torch.dtype dtype, torch.layout layout, torch.device device, bool pin_memory, bool requires_grad)


In [None]:
# ploting
rewards_to_plot = []
for seed_index, seed_rewards in enumerate(rewards_over_seeds):
    for episode, reward in seed_rewards:
        rewards_to_plot.append([seed_set[seed_index], episode, reward])
 
df1 = pd.DataFrame(rewards_to_plot, columns=["seed", "episodes", "reward"])
 
# Visualize the rewards
sns.set(style="darkgrid", context="talk", palette="rainbow")
sns.lineplot(x="episodes", y="reward", hue="seed", data=df1).set(
    title="PPO for InvertedPendulum-v4"
)
plt.show()

In [None]:
#Visualize

from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from PIL import Image
%matplotlib inline

# Function to visualize the trained model
def visualize_trained_model(agent, env_name="InvertedPendulum-v4", num_episodes=1, seed=1):
    env = gym.make(env_name, render_mode='rgb_array')
    frames = []
    
    # Set seed for reproducibility
    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)

    for episode in range(num_episodes):
        obs, info = env.reset(seed=seed)
        done = False
        while not done:
            frame = env.render()
            frames.append(frame)
            action = agent.sample_action(obs)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
    
    env.close()

    # Create animation
    fig = plt.figure()
    plt.axis('off')
    im = plt.imshow(frames[0])

    def update(frame):
        im.set_array(frame)
        return [im]

    ani = FuncAnimation(fig, update, frames=frames, interval=50)
    plt.close()
    display(HTML(ani.to_jshtml()))

    frames = [Image.fromarray(frame) for frame in frames]
    frames[0].save(f'{demo_path}\{env_name}_reinforce_seed_{seed}.gif', save_all=True, append_images=frames[1:], loop=0, duration=50)
    
    return ani

visualize_trained_model(model, env_name="InvertedPendulum-v4")