# Reinforcement Learning

## Understanding the environment
We are using a library that has everything you need to create a simulation
https://www.gymlibrary.dev/

In [3]:
%pip install gym
%pip install gym[classic_control]


Note: you may need to restart the kernel to use updated packages.
zsh:1: no matches found: gym[classic_control]
Note: you may need to restart the kernel to use updated packages.


We create a simulation

<img src="cart_pole.gif">

In [2]:
import gym

env = gym.make("CartPole-v1", render_mode="human")
env.action_space.seed(42)

observation, info = env.reset(seed=42)

for _ in range(1000):
    observation, reward, terminated, _, info = env.step(env.action_space.sample())

    if terminated:
        observation, info = env.reset()

env.close()

DependencyNotInstalled: pygame is not installed, run `pip install gym[classic_control]`

### How does it work?
Let's comprehend the actions and states

In [None]:
env.action_space
# 0 - Left
# 1 - Right

Discrete(2)

In [None]:
env = gym.make("CartPole-v1")
obs = env.reset()
obs #Vertical Position, Velocity, Angle, Angular Velocity

array([ 0.00384444, -0.02675204, -0.01892   , -0.01974045], dtype=float32)

### Progression

Each step we do in the simulation is acting according to the action we pass to the simulation. It changes the actual state.

In [None]:
print( env.reset() )
print( env.step(0) )

[-0.0311069   0.01699071 -0.02974914 -0.02360288]
(array([-0.03076709, -0.17769226, -0.03022119,  0.2595474 ], dtype=float32), 1.0, False, {})


Adtional variables that returns:

*   Reward - Positive if doing the right action, negative if not
*   Done - If we end the simulation
*   Truncate - If the simulation overpasses a limit steps 
*   Aditional Information - Extra info the environment can use.

### Basic Simulation

We can try to keep the cartpole facing up.

1. If the angle is less than 0, it should move to the left
2. If the angle is more than 0, it should move to the rifht

In [None]:
def relgas_basicas(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1 # 0 left, 1 right

In [None]:
env = gym.make("CartPole-v1", render_mode="human")

totals = []

for episode in range(20):
    recompenzas_episodio = 0
    obs = env.reset()[0]
    for step in range(1000): # 1000 steps max, we don't want to run forever
        action = relgas_basicas(obs)
        obs, reward, done, _, info = env.step(action)
        recompenzas_episodio += reward
        if done:
            break
    totals.append(recompenzas_episodio)
    
env.close()

error: ignored

### Model Evaluation

In [None]:
import numpy as np
np.mean(totals), np.std(totals), np.min(totals), np.max(totals)

(45.95, 9.40996811896831, 34.0, 64.0)

## Using Q-Learning

In [None]:
env = gym.make("CartPole-v1")
env.reset()

(array([-0.0232647 , -0.00226463, -0.04544467, -0.03048344], dtype=float32),
 {})

### Preparing Q-Table
<img src="q-table.png">

We have numerical and continuous values.
We can discretize them to reduce the shape of the table.

In [None]:
bin = 50 # grupos

In [None]:
q_table = np.zeros(shape=(bin,bin,bin,bin,2)) # 4 dimensiones de estados y las acciones.
q_table.shape

### Discretizing

Let's do this in an intuitive way

<img src="https://datascientest.com/es/wp-content/uploads/sites/7/2020/12/illu_normali_blog-49-1024x562.png" width=300>

In [None]:
valores = np.arange(10,100)
grupos = 5

separación = [ round( ( valor - valores.min() )  / ( valores.max() - valores.min() ) * grupos )  for valor in valores]
print( separación )

[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5]


In [None]:
def discretize(state):
    h=env.observation_space.high 
    l=env.observation_space.low
    
    if type(state) is tuple:
        state = state[0]
    
    aux = ( (state - l) / (h-l) ) * bin
    return tuple( aux.astype('int32') )

In [None]:
discretize( env.reset() )

  aux = ( (state - l) / (h-l) ) * bin


(25, 0, 23, 0)

El obtener una tupla con los valores del indice de la tabla, nos permite usarlos como indices para el arreglo

In [None]:
q_table[ discretize( env.reset() ) ] # nos traerá las posibles acciones

  aux = ( (state - l) / (h-l) ) * bin


array([0., 0.])

### Trainning the model

<img src="q-learning.png">

In [None]:
import random
from IPython.display import clear_output

#inicialización de simulación
env = gym.make("CartPole-v1")

# Hyperparametros
aprendizaje = 0.1 # Taza aprendizaje
descuento = 0.9 # Taza descuento

epsilon = 0.1 # Umbral de aleatoriedad

for i in range(1, 50000):
    state = env.reset() # Reseteamos los valores

    final = False
    
    while not final:
        if random.uniform(0, 1) < epsilon:
            action = env.action_space.sample() # Explora espacio de acciones
        else:
            action = np.argmax(q_table[discretize(state)]) # Usa valores aprendidos
        
        valor_siguiente, recompenza, final, _, info = env.step(action)
        
        valor_actual = q_table[discretize(state)][action]
        valor_futuro = np.max(q_table[discretize(valor_siguiente)])
        
        nuevo_valor = valor_actual + ( aprendizaje * ( recompenza + ( descuento * valor_futuro ) - valor_actual ) )
        q_table[discretize(state)][action] = nuevo_valor

        state = valor_futuro
        
    if i % 100 == 0:
        clear_output(wait=True)
        print(f"Episode: {i}")

print("Training finished.\n")

Episode: 49900
Training finished.



### Model Evaluation

In [None]:
env = gym.make("CartPole-v1")
env.reset()

total_rewards = []
episodes = 1000

for _ in range(episodes):
    state = env.reset()
    rewards = 0
    done = False
    
    while not done:
        action = np.argmax(q_table[discretize(state)])
        state, reward, done, _, info = env.step(action)

        if reward == 1:
            rewards += 1

    total_rewards.append( rewards )
env.close()

print(f"Resultados despues de {episodes} episodios:")
print( np.mean(total_rewards), np.std(total_rewards), np.min(total_rewards), np.max(total_rewards) )

  aux = ( (state - l) / (h-l) ) * bin


Resultados despues de 1000 episodios:
30.974 13.258254937962235 12 94


## Creating our own Simulations

In [None]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random

## Example of a simulation

In [None]:
class ReguladorSonido(Env):
  def __init__(self):
    # Accciones: 
    #Subir bocina, bajar bocina, subir youtube, bajar youtube, mantener volumen
    self.action_space = Discrete(5)
    # Valores posibles
    # Sonido Bocina  [ 1 - 100 ]
    # Sonido Youtube [ 1 - 100 ]
    self.observation_space = Box(low=np.array([0,0]), high=np.array([100,100]))
    # Set inicio
    self.state = tuple( [ random.randint(0,100), random.randint(0,100) ] )
    # Set Límite ( Canción de 2 minutos)
    self.duracion_cancion = 120
    self.epoch = 0 # segundo actual

  def step(self, action):
    done = False

    # Aplicar accion
    # 0 - Bajar Volumen Bocina
    # 1 - Subir Volumen Bocina
    # 2 - Bajar Volumen Youtube
    # 3 - Subir Volumen Youtube
    # 4 - Mantener Volumen
    if action == 0:
      self.state = tuple( [ max( self.state[0] -1, 0 ), self.state[1] ] )
    if action == 1:
      self.state = tuple( [ min(self.state[0] +1, 99 ), self.state[1] ] )
    if action == 2:
      self.state = tuple( [ self.state[0], max( self.state[1] -1, 0 ) ] )
    if action == 3:
      self.state = tuple( [ self.state[0], min( self.state[1] +1, 99) ] )
    if action == 4:
      self.state = self.state 

    # Cada segundo que pasa, la canción va acabando
    self.epoch = self.epoch + 1

    # Estimando el premio
    volumen_total = np.array( self.state ).mean()
    if volumen_total > 60 and volumen_total < 65:
      reward = 10
      done = True
    else:
      reward = -1


    # Verificar si acabó la canción o se llego al valor esperado
    if self.duracion_cancion == self.epoch and not done: 
      done = True

    # Set placeholder for info
    info = {}

    # Return step information
    return self.state, reward, done, self.epoch, info

  def render(self):
    # Implement viz
    pass

  def reset(self):
    # Reseteo de volumen
    self.state = tuple( [ random.randint(30,90), random.randint(30,90) ] )
    # Reseteo de canción
    self.duracion_cancion = 120
    # Reseteo de epoch
    self.epoch = 0

    return self.state

### Creating the environment

In [None]:
env = ReguladorSonido()
env.reset()

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


(58, 66)

## Q - Table

In [None]:
env.observation_space

Box(0.0, 100.0, (2,), float32)

In [None]:
env.action_space.sample()

2

In [None]:
acciones = env.action_space.n
observaciones = env.observation_space.high[0].astype('int32')
q_table = np.zeros( [observaciones,observaciones, acciones] )

In [None]:
q_table.shape

(100, 100, 5)

### Trainning the environment

In [None]:
from IPython.display import clear_output

# Hyperparameters
aprendizaje = 0.1
descuento = 0.9
epsilon = 0.1 # variabe control

for i in range(1, 200001):
    estado = env.reset()
    done = False # bandera de finalización
    
    while not done:
        if random.uniform(0, 1) < epsilon:
            action = env.action_space.sample() # Explore action space
        else:
            action = np.argmax(q_table[estado]) # Exploit learned values

        siguiente_estado, recompenza, done, _, info = env.step(action)
        
        valor_actual = q_table[estado][action]
        valor_futuro = np.max(q_table[siguiente_estado])
        
        nuevo_valor = valor_actual + ( aprendizaje * ( recompenza + ( descuento * valor_futuro ) - valor_actual ) )
        q_table[estado][action] = nuevo_valor

        estado = siguiente_estado
        
    if i % 100 == 0:
        clear_output(wait=True)
        print(f"Episode: {i}")

print("Training finished.\n")

Episode: 81600


KeyboardInterrupt: ignored

### Testing the model

In [None]:
total_epochs, total_reward = 0, 0
episodes = 100

env = ReguladorSonido()

for _ in range(episodes):
    state = env.reset()
    env.render()
    epochs, reward = 0, 0
    
    done = False
    
    while not done:
      action = np.argmax(q_table[state])
      state, reward, done, _, info = env.step(action)
      epochs += 1

      total_reward += reward
    total_epochs += epochs

print(f"Results after {episodes} episodes:")
print(f"Average timesteps per episode: {total_epochs / episodes}")
print(f"Average rewards per episode: {total_reward / episodes}")

Results after 100 episodes:
Average timesteps per episode: 30.51
Average rewards per episode: -21.6


In [None]:
q_table

## Q-Learning Ejercicio

In [None]:
!pip install gym[atari]

Collecting ale-py~=0.8.0
  Downloading ale_py-0.8.0-cp310-cp310-win_amd64.whl (950 kB)
     -------------------------------------- 950.8/950.8 kB 1.1 MB/s eta 0:00:00
Collecting importlib-resources
  Downloading importlib_resources-5.10.2-py3-none-any.whl (34 kB)
Installing collected packages: importlib-resources, ale-py
Successfully installed ale-py-0.8.0 importlib-resources-5.10.2




In [None]:
import gym
import numpy as np

env = gym.make("Taxi-v3", render_mode='human')

env.reset() # reset environment to a new, random state
env.render()


print("Acciones {}".format(env.action_space))
print("Estados {}".format(env.observation_space))

In [None]:
env.close()

In [None]:
env = gym.make("Taxi-v3")
env.reset()

In [None]:
env.observation_space

Discrete(500)

Create the Q table

In [None]:
import random
from IPython.display import clear_output

env = gym.make("Taxi-v3")

# Hyperparameters
aprendizaje = 
descuento = 
epsilon = 

for i in range(1, 100001):
    state = env.reset()[0]
    done = False
    
    while not done:
        if random.uniform(0, 1) < epsilon:
            action = env.action_space.sample() # Explore action space
        else:
            action = np.argmax(q_table[state]) # Exploit learned values

        siguiente_estado, recompenza, done, _, info = env.step(action) 
        
        valor_actual = q_table[state, action]
        valor_futuro = np.max(q_table[siguiente_estado])
        
        nuevo_valor = valor_actual + ( aprendizaje * ( recompenza + ( descuento * valor_futuro ) - valor_actual ) )
        q_table[state, action] = nuevo_valor

        state = siguiente_estado
        
    if i % 100 == 0:
        clear_output(wait=True)
        print(f"Episode: {i}")

print("Training finished.\n")

Episode: 100000
Training finished.



In [None]:
total_epochs, total_penalties = 0, 0
episodes = 100


for _ in range(episodes):
    env = gym.make("Taxi-v3", render_mode='human')
    state = env.reset()[0]
    env.render()
    epochs, penalties, reward = 0, 0, 0
    
    done = False
    
    while not done:
        action = np.argmax(q_table[state])
        state, reward, done, _, info = env.step(action)

        if reward == -10:
            penalties += 1

        epochs += 1

    total_penalties += penalties
    total_epochs += epochs
    env.close()

print(f"Results after {episodes} episodes:")
print(f"Average timesteps per episode: {total_epochs / episodes}")
print(f"Average penalties per episode: {total_penalties / episodes}")

Results after 100 episodes:
Average timesteps per episode: 12.84
Average penalties per episode: 0.0


In [None]:
env.close()