## Pendulum con Q-learning

In [28]:
import numpy as np
from pendulum_env_extended import PendulumEnvExtended
import random
import wandb
from IPython import display
import matplotlib.pyplot as plt
import sys
import os
import imageio

sys.path.append(os.path.abspath('../Models'))
from model_manager import ModelManager

Inicializar ambiente

In [29]:
env = PendulumEnvExtended(render_mode='rgb_array')

Discretización de los estados

In [30]:
x_space = np.linspace(-1, 1, 10)
y_space = np.linspace(-1, 1, 10)
vel_space = np.linspace(-8, 8, 50)

Obtener el estado a partir de la observación

In [31]:
def get_state(obs):
    x, y, vel = obs
    x_bin = np.digitize(x, x_space)
    y_bin = np.digitize(y, y_space)
    vel_bin = np.digitize(vel, vel_space)
    return x_bin, y_bin, vel_bin

In [32]:
state = get_state(np.array([-0.4, 0.2, 0.3]))

Discretización de las acciones

In [33]:
actions = list(np.linspace(-2, 2, 10))

In [34]:
def get_sample_action():
    return random.choice(actions)

Inicilización de la tabla Q

In [35]:
Q = np.zeros((len(x_space) + 1, len(y_space) + 1, len(vel_space) + 1, len(actions)))

Obtención de la acción a partir de la tabla Q

In [None]:
def optimal_policy(state, Q):
    action = actions[np.argmax(Q[state])]
    return action

Epsilon-Greedy Policy

In [None]:
def epsilon_greedy_policy(state, Q, epsilon=0.1):
    explore = np.random.binomial(1, epsilon)
    if explore:
        action = get_sample_action()
        print('explore')
    else:
        action = optimal_policy(state, Q)
        print('exploit')
        
    return action

Registro con Wandb

In [None]:
wandb.login()

Función de entrenamiento

In [26]:
def train(env, Q, alpha=0.1, gamma=0.9, epsilon=0.1, epsilon_min=0.01, epsilon_decay=0.995, episodes=100):
    for i in range(episodes):
        obs,_ = env.reset()
        done = False
        total_reward = 0
        state = get_state(obs)
        
        while not done:
            action = epsilon_greedy_policy(state, Q, epsilon)
            
            action_idx = actions.index(action)
            
            real_action = np.array([action])
            
            obs, reward, done, _, _ = env.step(real_action)
            next_state = get_state(obs)
            
            Q[state][action_idx] += alpha * (reward + gamma * np.max(Q[next_state]) - Q[state][action_idx])
            
            state = next_state
            total_reward += reward
        
        # Reducir epsilon
        epsilon = max(epsilon_min, epsilon * epsilon_decay)
        
        wandb.log({'episode': i, 'total_reward': total_reward, 'epsilon': epsilon, "avg_q_value": np.mean(Q), "reward": reward})

Probar el agente

In [None]:
obs,_ = env.reset()
done = False

# Configurar el gráfico para visualizar el entorno
# fig = plt.figure()
# img = plt.imshow(env.render(mode='rgb_array'))

while not done:
    state = obs
    action = optimal_policy(get_state(obs), Q)
    obs, reward, done, _, _ = env.step(np.array([action]))
    
    # img.set_data(env.render())
    # display.display(plt.gcf())
    # display.clear_output(wait=True)

Ejecución individual

In [None]:
wandb.init(
    project="pendulum",

    config={
    "alpha": 0.3,
    "gamma": 0.9,
    "epsilon": 0.3,
    "epochs": 1000,
    "epsilon_min":0.01, 
    "epsilon_decay":0.995
    }
)

train(env, Q, wandb.config.alpha, wandb.config.gamma, wandb.config.epsilon, wandb.config.epsilon_min, wandb.config.epsilon_decay, wandb.config.epochs)

wandb.finish()

Ejecución múltiple con sweep

In [None]:
sweep_config = {
    'method': 'grid',
    'metric': {
        'name': 'reward',
        'goal': 'maximize'
    },
    'parameters': {
        'alpha': {
            'values': [0.1, 0.2, 0.3]
        },
        'gamma': {
            'values': [0.8, 0.9, 1.0]
        },
        'epsilon': {
            'values': [0.1, 0.3, 0.5]
        },
        'epochs': {
            'values': [500, 1000]
        },
        'epsilon_min': {
            'values': [0.01]
        },
        'epsilon_decay': {
            'values': [0.995]
        }
    }
}

sweep_id = wandb.sweep(sweep_config, project="taxi")

def train_sweep():
    wandb.init()
    
    config = wandb.config
    
    train(env, Q, config.alpha, config.gamma, config.epsilon, config.epsilon_min, config.epsilon_decay, config.epochs)
    
    wandb.finish()

wandb.agent(sweep_id, function=train_sweep)

Guardar modelo entrenado

In [36]:
manager = ModelManager()
manager.save_model('pendulum', Q)

Cargar modelo entrenado

In [None]:
manager = ModelManager()
model = manager.load_model('pendulum')

Grabar video

In [None]:
def save_video(env, Q, video_name, num_episodes=1, fps=10):
    frames = []

    for _ in range(num_episodes):
        obs, _ = env.reset()
        done = False
        while not done:
            state = get_state(obs)
            action = optimal_policy(state, Q)
            obs, reward, done, _, _ = env.step(np.array([action]))
            frames.append(env.render())

    # Guardar los frames como un video
    imageio.mimsave(video_name, frames, fps=fps)

In [None]:
env = PendulumEnvExtended(render_mode='rgb_array')

save_video(env, Q, 'pendulum.mp4', num_episodes=1, fps=40)

Cargar video a Wandb

In [None]:
import wandb

wandb.init(project='pendulum')

video_name = 'pendulum.mp4'

wandb.log({"Pendulum execution": wandb.Video(video_name, fps=40, format="mp4")})

wandb.finish()