In [40]:
import tensorflow as tf
import gymnasium as gym
from stable_baselines3 import TD3
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize, SubprocVecEnv
from stable_baselines3.common.noise import NormalActionNoise
import numpy as np
import matplotlib.pyplot as plt
import os


from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.logger import HParam
from stable_baselines3.common.callbacks import EvalCallback, CallbackList
from stable_baselines3.common.evaluation import evaluate_policy

In [41]:
# Parametri dell'ambiente
hp_reset_noise_scale = 0.04759391555142866  # Scala del rumore quando l'ambiente viene resettato
hp_forward_reward_weight = 1.895790973082321  # Peso della ricompensa per il movimento in avanti
hp_ctrl_cost_weight = 1.5077149854491863  # Peso del costo di controllo del robot
hp_healthy_reward = 2.4052783506228486  # Ricompensa per mantenere lo stato "sano"
hp_contact_cost_weight = 6.97442897111752e-05  # Peso del costo per i contatti
hp_healthy_z_lower = 0.38253131966695203  # Altezza minima considerata "sana"
hp_healthy_z_upper = 1.1546046945435202  # Altezza massima considerata "sana"
hp_contact_force_min = -1.254958844476713  # Forza minima di contatto considerata
hp_contact_force_max = 0.9972723819502675  # Forza massima di contatto considerata
hp_healthy_z_range = (hp_healthy_z_lower, hp_healthy_z_upper)  # Range di altezza "sana"
hp_contact_force_range = (hp_contact_force_min, hp_contact_force_max)  # Range di forza di contatto

# Parametri di ottimizzazione per TD3
hp_learning_rate = 0.0001500747188896999  # Learning rate per la rete di attore-critico
hp_learning_starts = 5000  # Numero di step prima di iniziare gli aggiornamenti della rete
hp_batch_size = 1024  # Dimensione del batch per SGD
hp_gamma = 0.943633865948463  # Fattore di sconto per il futuro reward
hp_tau = 0.00823476649680571  # Fattore di interpolazione per l'aggiornamento della rete target
hp_noise_std = 0.11889226516450727  # Deviazione standard del rumore di esplorazione
hp_noise_clip = 0.3182225104935072  # Clipping del rumore per la policy target smoothing
hp_policy_delay = 3  # Delay tra gli aggiornamenti della politica rispetto ai Q-network updates
hp_train_freq = 10  # Frequenza di aggiornamento della rete (ogni X step di interazione)
hp_gradient_steps = 8  # Numero di passi di aggiornamento eseguiti dopo ogni batch

# Parametri dell'azione (rumore per esplorazione)
hp_action_noise_mean = [0.0] * 8  # Media del rumore gaussiano per l'esplorazione
hp_action_noise_sigma = [0.3426192847562814] * 8  # Deviazione standard del rumore gaussiano

# Parametri globali
hp_num_envs = 6  # Numero di environment paralleli per il training
hp_total_timesteps = 3_000_000  # Numero totale di timesteps per l'addestramento
hp_episodes_evaluation = 200  # Numero di episodi usati per valutare la policy


In [42]:
def make_env():
    """
    Crea e restituisce l'ambiente Ant-v5 dalla libreria Gymnasium.

    Questa funzione istanzia l'ambiente "Ant-v5", uno degli ambienti recenti e ben supportati
    in Gymnasium. I parametri usati sono:
    - reset_noise_scale (0.1): determina la scala del rumore quando l'ambiente viene resettato.
    - render_mode ('None'): indica che non verrà effettuato il rendering durante l'esecuzione.

    Ritorna:
        gym.Env: l'ambiente Ant-v5 inizializzato.
    """
    
    # Crea l'ambiente Ant-v5 con i parametri specificati
    return gym.make("Ant-v5", 
                    reset_noise_scale=hp_reset_noise_scale,  # Scala del rumore quando l'ambiente viene resettato
                    forward_reward_weight=hp_forward_reward_weight,  # Peso della ricompensa per il movimento in avanti
                    ctrl_cost_weight=hp_ctrl_cost_weight,  # Peso del costo di controllo
                    healthy_reward=hp_healthy_reward,  # Ricompensa per mantenere lo stato "sano"
                    contact_cost_weight=hp_contact_cost_weight,  # Peso del costo per i contatti
                    healthy_z_range=hp_healthy_z_range,  # Range di altezza considerata "sana"
                    contact_force_range=hp_contact_force_range,  # Range di forza di contatto considerata
                    render_mode='none')  # Nessun rendering durante l'esecuzione

In [43]:
# 1. Creiamo un ambiente vettorializzato utilizzando SubprocVecEnv per gestire più istanze dell'ambiente in parallelo.
NUM_ENVS = hp_num_envs
env = SubprocVecEnv([make_env for _ in range(NUM_ENVS)])

# 2. Normalizziamo osservazioni e ricompense per stabilizzare l'allenamento.
# VecNormalize scala le osservazioni e le ricompense e limita i valori delle osservazioni a un range [-10, 10].
env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10.)


  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(


In [44]:
# Definisci i parametri del rumore per l'esplorazione
n_actions = env.action_space.shape[-1]  # Numero di azioni nell'ambiente
action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=hp_action_noise_mean * np.ones(n_actions))  # Rumore gaussiano con media e deviazione standard specificate


In [45]:
# Inizializza il modello TD3 con la politica MlpPolicy e l'ambiente creato
model = TD3("MlpPolicy", env,
            learning_rate=hp_learning_rate,  # Imposta il learning rate
            buffer_size=50000,  # Dimensione del buffer di replay
            learning_starts=hp_learning_starts,  # Numero di step prima di iniziare l'aggiornamento della rete
            batch_size=hp_batch_size,  # Dimensione del batch per l'aggiornamento
            gamma=hp_gamma,  # Fattore di sconto per il futuro reward
            tau=hp_tau,  # Fattore di interpolazione per l'aggiornamento della rete target
            action_noise=action_noise,  # Rumore per l'esplorazione
            policy_delay=hp_policy_delay,  # Delay tra gli aggiornamenti della politica rispetto ai Q-network updates
            train_freq=hp_train_freq,  # Frequenza di aggiornamento della rete
            gradient_steps=hp_gradient_steps,  # Numero di passi di aggiornamento eseguiti dopo ogni batch
            seed=42,  # Seed per la riproducibilità
            verbose=0,  # Livello di verbosità
            tensorboard_log="./td3_Ant_tensorboard/",  # Cartella per salvare i log di TensorBoard
            device='mps')  # Dispositivo su cui eseguire il training (es. CPU, GPU)


In [46]:
# Crea un ambiente vettorializzato per la valutazione utilizzando SubprocVecEnv per gestire più istanze dell'ambiente in parallelo
eval_env = SubprocVecEnv([make_env for _ in range(NUM_ENVS)])

# Normalizza osservazioni e ricompense per stabilizzare la valutazione
# VecNormalize scala le osservazioni e le ricompense e limita i valori delle osservazioni a un range [-10, 10]
eval_env = VecNormalize(eval_env, norm_obs=True, norm_reward=True, clip_obs=10.)

  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(


In [47]:
# Crea un callback per la valutazione del modello durante l'allenamento
eval_callback = EvalCallback(
    eval_env,  # Ambiente di valutazione
    best_model_save_path="./logs/best_model",  # Percorso per salvare il miglior modello
    log_path="./logs/",  # Percorso per salvare i log
    eval_freq=50000,  # Frequenza di valutazione (ogni 50000 timesteps)
    deterministic=True,  # Esegui la valutazione in modo deterministico
    render=False  # Non mostrare il rendering durante la valutazione
)

In [48]:
# Alleniamo il modello TD3
# Il parametro total_timesteps indica il numero totale di iterazioni (o passi)
# che il modello eseguirà durante l'allenamento. Ogni timestep rappresenta un'interazione
# con l'ambiente in cui il modello esegue un'azione e riceve un feedback, che viene poi
# usato per aggiornare la sua politica interna.
total_timesteps = hp_total_timesteps  # Numero totale di timesteps per l'addestramento
model.learn(total_timesteps=total_timesteps, callback=eval_callback)  # Avvia l'allenamento del modello con il callback di valutazione

In [None]:
# Salva il modello TD3 addestrato su disco
model.save("td3_Ant_model")

# Salva lo stato dell'ambiente vettorializzato normalizzato su disco
env.save("vecnormalize_Ant.pkl")

In [None]:
def evaluate_random_policy(env, episodes=hp_episodes_evaluation):
    """
    Valuta una policy casuale su un ambiente dato.

    Parametri:
    - env: L'ambiente di simulazione.
    - episodes: Numero di episodi da eseguire per la valutazione.

    Ritorna:
    - La ricompensa media e la deviazione standard delle ricompense ottenute.
    """
    total_rewards = []  # Lista per memorizzare le ricompense totali di ogni episodio
    for _ in range(episodes):
        obs = env.reset()  # Resetta l'ambiente e ottiene l'osservazione iniziale
        done = [False] * env.num_envs  # Stato di completamento per ogni ambiente
        episode_rewards = np.zeros(env.num_envs)  # Ricompense accumulate per ogni ambiente
        while not all(done):
            actions = [env.action_space.sample() for _ in range(env.num_envs)]  # Genera azioni casuali per ogni ambiente
            obs, rewards, done, infos = env.step(actions)  # Esegue le azioni e ottiene le nuove osservazioni e ricompense
            episode_rewards += rewards  # Aggiorna le ricompense accumulate
        total_rewards.extend(episode_rewards)  # Aggiunge le ricompense dell'episodio alla lista totale
    mean_reward_random = np.mean(total_rewards)  # Calcola la ricompensa media
    # std_reward_random = np.std(total_rewards)  # Calcola la deviazione standard delle ricompense (commentato)
    return mean_reward_random  # Ritorna la ricompensa media