Esta notebook contiene bloques de código útiles para realizar Q-learning en el entorno "Pendulum"

In [4]:
import numpy as np
from pendulum_env_extended import PendulumEnvExtended
import random 

In [5]:
alpha = 0.1
gamma = 0.99
epsilon = 0.1
n_episodes = 100
epsilon_variability = 0.8
cant_Buckets = 15

In [6]:
env = PendulumEnvExtended(render_mode='rgb_array')

Discretización de los estados

In [7]:
x_space = np.linspace(-1, 1, cant_Buckets)
y_space = np.linspace(-1, 1, cant_Buckets)
vel_space = np.linspace(-8, 8, 100)
x_space

array([-1.        , -0.85714286, -0.71428571, -0.57142857, -0.42857143,
       -0.28571429, -0.14285714,  0.        ,  0.14285714,  0.28571429,
        0.42857143,  0.57142857,  0.71428571,  0.85714286,  1.        ])

Obtener el estado a partir de la observación

In [8]:
def get_state(obs):
    x, y, vel = obs
    x_bin = np.digitize(x, x_space)
    y_bin = np.digitize(y, y_space)
    vel_bin = np.digitize(vel, vel_space)
    return x_bin, y_bin, vel_bin

In [9]:
state = get_state(np.array([-0.4, 0.2, 0.3])) #mapeo de estado. Nos dice el bin en el que estamos
state

(5, 9, 52)

Discretización de las acciones

In [10]:
actions = list(np.linspace(-2, 2, cant_Buckets)) #minimo, maximo y cuantos bins
actionBuckets=np.linspace(-2, 2, cant_Buckets)
actions

[-2.0,
 -1.7142857142857144,
 -1.4285714285714286,
 -1.1428571428571428,
 -0.8571428571428572,
 -0.5714285714285716,
 -0.2857142857142858,
 0.0,
 0.2857142857142856,
 0.5714285714285712,
 0.8571428571428568,
 1.1428571428571428,
 1.4285714285714284,
 1.714285714285714,
 2.0]

In [11]:
def getActions(action):
    return np.digitize(action,actionBuckets)

In [12]:
def get_sample_action():
    return random.choice(actions)

Inicilización de la tabla Q

In [13]:
Q = np.zeros((len(x_space) + 1, len(y_space) + 1, len(vel_space) + 1, len(actions)))
Q

array([[[[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
    

Obtención de la acción a partir de la tabla Q

In [14]:
def optimal_policy(state, Q):
    posX, posY, vel = state
    action = actions[np.argmax(Q[posX][posY][vel])]
    return action

Epsilon-Greedy Policy

In [15]:
def epsilon_greedy_policy(state, Q, epsilon=0.9):
    explore = np.random.binomial(1, epsilon)
    # explore
    if explore:
        action = get_sample_action()
        #print("Explore: " , action)
    # exploit
    else:
        action = optimal_policy(state, Q)
        #print("Exploit: " , action)
        
    return action

Ejemplo de episodio 

In [None]:
obs,_ = env.reset()
print(obs)
done = False
total_reward = 0
while not done:
    state = get_state(obs)
    print('state', state)
    action = epsilon_greedy_policy(state, Q, 0.5)
    action_idx = actions.index(action)
    # Acción del ambiente
    
    real_action = np.array([action_idx])

    obs, reward, done, _, _ = env.step(real_action)
    next_state = get_state(obs)
    
    total_reward += reward
    print('->', state, action, reward, obs, done)
    env.render()
print('total_reward', total_reward)

In [16]:
import wandb
from wandb.sklearn import plot_precision_recall, plot_feature_importances
from wandb.sklearn import plot_class_proportions, plot_learning_curve, plot_roc

def train_policy():
        try:
            with wandb.init() as run:
                config = run.config
                alpha = config.alpha
                gamma = config.gamma
                epsilon = config.epsilon
                epsilon_variability = config.epsilonVariability
                episodes = config.episodes
                total_rewards = []
                total_reward_promedio = []

                for episode in range(episodes):
                    obs, _ = env.reset()
                    done = False
                    total_reward = 0
                    step_count = 0

                    while not done:
                        if (step_count % 250 == 0):
                              epsilon = max(epsilon - epsilon_variability * epsilon, 0) 
                        state = obs
                        action = epsilon_greedy_policy(get_state(state), Q, epsilon)
                        obs, reward, done, _, _ = env.step([action])
                        total_reward_promedio.append(reward)
                        newState = get_state(state)
                        binAction = actions.index(action)
                        Q[newState][binAction] += alpha * (reward + gamma * np.max(Q[get_state(obs)]) - Q[newState][binAction])
                        total_reward += reward
                        step_count += 1
                        wandb.log({'episode_reward': reward})


                    total_rewards.append(total_reward)
                    last_ten_rewards = total_reward_promedio[-500:]
                    last_rewards_mean = np.mean(last_ten_rewards)
                    wandb.log({'train_avg_reward': last_rewards_mean})
                    wandb.log({'Q': Q})	
        except Exception as e:
                print(f"An error occurred: {e}")
        finally:
                wandb.finish()

In [17]:
import wandb

# Configuración del sweep
sweep_configuration = {
    "method": "random",
    "metric": {"goal": "maximize", "name": 'train_avg_reward'},
    "parameters": {
        "episodes": {"max": 3000, "min": 1400},
        "alpha": {"max": 0.6, "min": 0.5},
        "gamma": {"max": 0.6, "min": 0.5},
        "epsilon": {"max": 1.0, "min": 0.8},
        "epsilonVariability": {"max": 0.001, "min": 0.0001}
    },
}

sweep_id = wandb.sweep(sweep=sweep_configuration, project="pendulum-sweep")

wandb.agent(sweep_id, function=train_policy)   

Create sweep with ID: wxtl2f2m
Sweep URL: https://wandb.ai/ia-santiago-moron/pendulum-sweep/sweeps/wxtl2f2m


[34m[1mwandb[0m: Agent Starting Run: d0p7s9pr with config:
[34m[1mwandb[0m: 	alpha: 0.5953079196360637
[34m[1mwandb[0m: 	episodes: 1975
[34m[1mwandb[0m: 	epsilon: 0.9283740493280916
[34m[1mwandb[0m: 	epsilonVariability: 0.0008642959711366576
[34m[1mwandb[0m: 	gamma: 0.5139306025376746
Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.




In [None]:


def train_finalPolicy():
    try:
            #Cambiar valores de alpha, gamma, epsilon, epsilon_variability y episodes
            alpha = 0.8
            gamma = 0.5333435999129466
            epsilon = 0.99
            epsilon_variability = 0.001
            episodes = 2500

            env = PendulumEnvExtended()

            
            total_rewards = []
            for episode in range(episodes):
                obs, _ = env.reset()
                done = False
                total_reward = 0
                step_count = 0
                while not done:
                        state = obs
                        if (step_count % 25 == 0):
                              epsilon = max(epsilon - epsilon_variability * epsilon, 0) 
                        action = epsilon_greedy_policy(get_state(state), Q, epsilon)
                        binAction = actions.index(action)
                        obs, reward, done, _, _ = env.step([action])
                        newState = get_state(state)
                        Q[newState][binAction] += alpha * (reward + gamma * np.max(Q[get_state(obs)]) - Q[newState][binAction])
                        total_reward += reward
                        step_count += 1
            return Q

    except Exception as e:
        print(f"An error occurred: {e}")

In [None]:
policy = train_finalPolicy()

In [None]:
import matplotlib.pyplot as plt
from IPython import display
import numpy as np
import wandb
import moviepy.editor as mpy
import tempfile
import os

# Inicialización del entorno y wandb
obs, _ = env.reset()
done = False
fig, ax = plt.subplots()
img = ax.imshow(env.render(), animated=True)
wandb.init(project="visualize-predictions", name="simulation")

# Lista para almacenar las imágenes
frames = []
step = 0
actionList = []
while step != 100:
    step = step + 1
    state = obs
    action = optimal_policy(get_state(obs), policy)
    print(action)
    actionList.append(action)
    obs, reward, done, _, _ = env.step(np.array([action]))

    # Actualización de la imagen con el nuevo estado del entorno
    img.set_data(env.render())
    display.display(plt.gcf())
    display.clear_output(wait=True)  # Asegura una transición suave en Jupyter Notebook

    # Guardar la figura actual como una imagen en la lista de frames
    fig.canvas.draw()
    frame = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
    frame = frame.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    frames.append(frame)

# Crear un video a partir de las imágenes guardadas
clip = mpy.ImageSequenceClip(frames, fps=4)
with tempfile.TemporaryDirectory() as tmpdirname:
    video_path = os.path.join(tmpdirname, "simulation.mp4")
    clip.write_videofile(video_path)

    # Loguear el video en wandb
    wandb.log({"simulation": wandb.Video(video_path, fps=4, format="mp4")})

# Finalizar sesión en wandb
wandb.finish()
