In [1]:
import gymnasium as gym
import numpy as np
import os
import torch
import matplotlib.pyplot as plt
import imageio

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv, VecNormalize, DummyVecEnv
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback, CallbackList
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor

In [2]:
# Numero di ambienti paralleli per il training
NUM_ENVS = 4

# Funzione per creare un ambiente monitorato
def make_env():
    return lambda: Monitor(gym.make("HalfCheetah-v5",
                                    reset_noise_scale=0.16861882648143064,
                                    forward_reward_weight=0.9408203240971191,
                                    ctrl_cost_weight=0.09598052645324526,
                                    render_mode='none'))

# Creazione degli ambienti per il training
env = SubprocVecEnv([make_env() for _ in range(NUM_ENVS)])
env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10.)

# Selezione automatica del device (GPU/CPU)
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

# Modifica della funzione di ricompensa per penalizzare la postura errata
def custom_reward(env):
    state = env.unwrapped.sim.data.qpos
    torso_angle = state[2]  # Assumendo che il terzo stato sia l'angolo del torso
    forward_velocity = state[3]  # Velocità in avanti
    penalty = -abs(torso_angle) * 0.5  # Penalizzazione per torso inclinato
    reward = forward_velocity * 0.8 + penalty
    return reward

# Parametri del modello
model_params = {
    "policy": "MlpPolicy",
    "env": env,
    "learning_rate": 7.642236216979812e-05,
    "n_steps": 1024,
    "batch_size": 64,
    "n_epochs": 10,
    "gamma": 0.9955582618297791,
    "gae_lambda": 0.9653759042371923,
    "clip_range": 0.2742621016643404,
    "ent_coef": 0.038083013834726225,
    "verbose": 1,
    "tensorboard_log": "./ppo_HalfCheetah_tensorboard/",
    "device": device,
    "policy_kwargs": dict(net_arch=[256, 256, 128])
}

# Creazione dell'ambiente di valutazione
eval_env = DummyVecEnv([make_env()])  # DummyVecEnv per evitare problemi con SubprocVecEnv
eval_env = VecNormalize(eval_env, norm_obs=True, norm_reward=True, clip_obs=10., training=False)

# Callback per valutazione e salvataggi
eval_callback = EvalCallback(eval_env, best_model_save_path="./logs/best_model",
                             log_path="./logs/", eval_freq=5000, deterministic=True, render=False)
checkpoint_callback = CheckpointCallback(save_freq=10000, save_path="./logs/checkpoints/",
                                         name_prefix="ppo_halfcheetah_checkpoint")

# Creazione e training del modello
model = PPO(**model_params)
model.learn(total_timesteps=1_000_000, callback=CallbackList([eval_callback, checkpoint_callback]))

# Salvataggio del modello e normalizzazione
model.save("ppo_HalfCheetah_model")
env.save("vecnormalize_HalfCheetah.pkl")

# Caricamento del modello e della normalizzazione per la valutazione
model = PPO.load("ppo_HalfCheetah_model", device=device)
eval_env = VecNormalize.load("vecnormalize_HalfCheetah.pkl", eval_env)
eval_env.training = False

def evaluate_agent(model, env, episodes=100):
    mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=episodes, deterministic=True)
    print(f"Mean Reward: {mean_reward:.2f} ± {std_reward:.2f}")
    return mean_reward, std_reward

# Valutazione del modello allenato
mean_reward_trained, std_reward_trained = evaluate_agent(model, eval_env, episodes=100)


  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(


Using mps device
Logging to ./ppo_HalfCheetah_tensorboard/PPO_6




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | -341     |
| time/              |          |
|    fps             | 1443     |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 4096     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 1e+03       |
|    ep_rew_mean          | -331        |
| time/                   |             |
|    fps                  | 597         |
|    iterations           | 2           |
|    time_elapsed         | 13          |
|    total_timesteps      | 8192        |
| train/                  |             |
|    approx_kl            | 0.018291216 |
|    clip_fraction        | 0.0615      |
|    clip_range           | 0.274       |
|    entropy_loss         | -8.53       |
|    explained_variance   | -0.342      |
|    learning_rate        | 7.

In [3]:
#5. Salviamo il modello
model.save("ppo_HalfCheetah_model")
env.save("vecnormalize_HalfCheetah.pkl")    # salviamo anche i parametri di normalizzazione


In [8]:
import gymnasium as gym
import numpy as np
import os
import torch
import matplotlib.pyplot as plt
import imageio

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv, VecNormalize, DummyVecEnv
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback, CallbackList
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
NUM_ENVS=2
def make_envv():
    return Monitor(gym.make("HalfCheetah-v5",
                            reset_noise_scale=0.16861882648143064,
                            forward_reward_weight=0.9408203240971191,
                            ctrl_cost_weight=0.09598052645324526,
                            render_mode='rgb_array'))

# Creiamo gli ambienti paralleli
env = SubprocVecEnv([make_envv for _ in range(NUM_ENVS)])
env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10.)
# Funzione per visualizzare la policy in tempo reale e registrare il video
def render_and_record_policy(model_path, output_filename="videos/halfcheetah_best_policy.mp4", episodes=1):
    os.makedirs("", exist_ok=True)
    env = make_envv()
    env = DummyVecEnv([lambda: env])
    env = VecNormalize.load("vecnormalize_HalfCheetah.pkl", env)
    env.training = False  # Disabilita la normalizzazione della reward per la valutazione
    env.norm_reward = False
    
    model = PPO.load(model_path, env=env)
    obs = env.reset()
    frames = []
    
    for _ in range(episodes * 1000):  # Esegui abbastanza step per registrare un episodio completo
        action, _ = model.predict(obs, deterministic=True)
        obs, _, done, _ = env.step(action)
        frames.append(env.render(mode='rgb_array'))
        if done:
            obs = env.reset()
    
    imageio.mimsave(output_filename, frames, fps=30)
    print(f"Video salvato in {output_filename}")
    env.close()

# Registra un video della policy ottimale
render_and_record_policy("ppo_HalfCheetah_model")

Video salvato in videos/halfcheetah_best_policy.mp4
