Esta notebook contiene bloques de código útiles para realizar Q-learning en el entorno "Pendulum"

In [1]:
import numpy as np
from pendulum_env_extended import PendulumEnvExtended
import random 
from tqdm import tqdm

In [2]:
env = PendulumEnvExtended(render_mode='rgb_array')

Discretización de los estados

In [3]:
x_space = np.linspace(-1, 1, 10)
y_space = np.linspace(-1, 1, 10)
vel_space = np.linspace(-8, 8, 100)
x_space

array([-1.        , -0.77777778, -0.55555556, -0.33333333, -0.11111111,
        0.11111111,  0.33333333,  0.55555556,  0.77777778,  1.        ])

Obtener el estado a partir de la observación

In [4]:
def get_state(obs):
    x, y, vel = obs
    x_bin = np.digitize(x, x_space)
    y_bin = np.digitize(y, y_space)
    vel_bin = np.digitize(vel, vel_space)
    return x_bin, y_bin, vel_bin

In [5]:
state = get_state(np.array([-0.4, 0.2, 0.3]))
state

(3, 6, 52)

Discretización de las acciones

In [6]:
actions = list(np.linspace(-2, 2, 10))
actions

[-2.0,
 -1.5555555555555556,
 -1.1111111111111112,
 -0.6666666666666667,
 -0.22222222222222232,
 0.22222222222222232,
 0.6666666666666665,
 1.1111111111111107,
 1.5555555555555554,
 2.0]

In [7]:
def get_sample_action():
    return random.choice(actions)

Inicilización de la tabla Q

In [8]:
Q = np.zeros((len(x_space) + 1, len(y_space) + 1, len(vel_space) + 1, len(actions)))
Q

array([[[[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
    

Obtención de la acción a partir de la tabla Q

In [9]:
def optimal_policy(state, Q):
    action = actions[np.argmax(Q[state])]
    return action

Epsilon-Greedy Policy

In [10]:
def epsilon_greedy_policy(state, Q, epsilon=0.1):
    explore = np.random.binomial(1, epsilon)
    if explore:
        action = get_sample_action()
        print('explore')
    # exploit
    else:
        action = optimal_policy(state, Q)
        print('exploit')
        
    return action

Ejemplo de episodio 

In [51]:
obs,_ = env.reset()
print(obs)
done = False
total_reward = 0
state = get_state(obs)
while not done:
    
    # Acción del modelo
    action = epsilon_greedy_policy(state, Q, 0.5)
    
    # Indice de la accion en Q
    action_idx = actions.index(action)
    
    # Acción del ambiente
    real_action = np.array([action])
     
    obs, reward, done, _, _ = env.step(real_action)
    
    next_state = get_state(obs)
    
    state = next_state
   # Usar action_idx para actualizar Q
    total_reward += reward
    print('->', state, action, reward, obs, done)
    env.render()
print('total_reward', total_reward)

Q

[0.8319928  0.55478644 0.7093836 ]
exploit
-> (9, 8, 55) -2.0 -0.4001915168964935 [0.80839264 0.5886436  0.8254734 ] False
explore
-> (8, 8, 56) -1.5555555555555556 -0.46667953111384164 [0.77690506 0.62961775 1.0336229 ] False
exploit
-> (8, 8, 57) -2.0 -0.5746818459165168 [0.7375556  0.67528635 1.2058362 ] False
explore
-> (8, 8, 62) 1.5555555555555554 -0.6974279781239685 [0.6684789 0.7437311 1.9456342] False
explore
-> (8, 9, 66) 0.6666666666666665 -1.0823017545136078 [0.5662838 0.8242103 2.6034327] False
exploit
-> (7, 9, 68) -2.0 -1.6203716887065367 [0.44028005 0.8978605  2.9215903 ] False
explore
-> (6, 9, 71) -1.1111111111111112 -2.0977738529586416 [0.2806724  0.95980364 3.428319  ] False
exploit
-> (5, 9, 74) -2.0 -2.833909261456331 [0.09195592 0.99576306 3.8481717 ] False
exploit
-> (4, 9, 77) -2.0 -3.671426765864771 [-0.12235631  0.99248624  4.294994  ] False
exploit
-> (3, 9, 79) -2.0 -4.716504215531383 [-0.35192946  0.9360265   4.739359  ] False
explore
-> (2, 9, 83) -0.6666

array([[[[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
    

: 

In [11]:
def train(num_k_episodes, epsilon, Q):
    all_rewards = []
    for i in tqdm(num_k_episodes):
        obs,_ = env.reset()
        done = False
        total_reward = 0
        state = get_state(obs)
        while not done:

            # Acción del modelo
            action = epsilon_greedy_policy(state, Q, 0.5)

            # Indice de la accion en Q
            action_idx = actions.index(action)

            # Acción del ambiente
            real_action = np.array([action])

            obs, reward, done, _, _ = env.step(real_action)
            ## Hacer Q-learning
            Q[state][action_idx] = Q[state][action_idx] + alpha * (reward + gamma * np.max(Q[next_state]) - Q[state][action_idx])
            next_state = get_state(obs)

            state = next_state
           # Usar action_idx para actualizar Q
            total_reward += reward
            print('->', state, action, reward, obs, done)
            env.render()