<a href="https://colab.research.google.com/github/FelipeToroR/Q-learning-DQN/blob/main/DQN_Mountain%20Car.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Librerías requeridas

!apt-get install x11-utils > /dev/null 2>&1 
!pip install pyglet > /dev/null 2>&1 
!apt-get install -y xvfb python-opengl > /dev/null 2>&1
!pip install gym pyvirtualdisplay > /dev/null 2>&1

In [None]:
# Ejemplo agente toma acciones aleatorias

from pyvirtualdisplay import Display
display = Display(visible=0, size=(400, 300))
display.start()

import gym
env = gym.make('MountainCar-v0')
observation = env.reset()

images = []
done = False
total_reward = 0
while not done:
  images.append(env.render(mode='rgb_array'))

  action = env.action_space.sample() # your agent here (this takes random actions)
  observation, reward, done, info = env.step(action) #transition
  total_reward += reward
env.close()

print("total reward:", total_reward)

total reward: -200.0


In [None]:
# Crear video

import cv2
from moviepy.editor import *

def create_video(images, filename):
  res=(600,400) #resolution
  out = cv2.VideoWriter(filename,cv2.VideoWriter_fourcc('M','J','P','G'), 20.0, res)
  for image in images:
      out.write(image)

  out.release()
  return VideoFileClip(filename)

clip=create_video(images, "tmp.avi")
clip.ipython_display(width=400)

100%|█████████▉| 200/201 [00:01<00:00, 192.25it/s]


In [None]:
import numpy as np
import tensorflow as tf
import gym
import os
import datetime
from statistics import mean
from gym import wrappers

import math

In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import random
import gym
import numpy as np
from collections import deque
from keras.models import Model, load_model
from keras.layers import Input, Dense


def Red(input_shape, action_space):
    X_input = Input(input_shape)

    # 'Dense' is the basic form of a neural network layer
    # Input Layer of state size(4) and Hidden Layer with 512 nodes
    X = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(X_input)

    # Hidden layer with 256 nodes
    X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)
    
    # Hidden layer with 64 nodes
    X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)

    # Output Layer with # of actions: 2 nodes (left, right)
    X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

    model = Model(inputs = X_input, outputs = X, name='CartPoleDQNmodel')
    model.compile(loss="mse", optimizer='rmsprop', metrics=["accuracy"])

    return model

In [None]:
class DQNAgent:
    def __init__(self):
        self.env = gym.make('MountainCar-v0')
        # by default, CartPole-v1 has max episode steps = 500
        self.state_size = self.env.observation_space.shape[0]
        
        # Las acciones son un vector de 6 características, 
        #    c/u representando un miembro de las piernas
        self.action_size = 3
        self.EPISODES = 10

        # Memoria para el experience replay
        self.memory = deque(maxlen=20000)
        
        self.gamma = 0.95   
        self.epsilon = 2.0
        self.epsilon_min = 0.001

        # Cada iteración, el epsilon se reduce un 0.001% hasta que sea 0.001
        self.epsilon_decay = 0.999
        self.batch_size = 64

        # Comienza a entrenar en cuanto tenga 1000 experiencias
        self.train_start = 1000

        # Red principal
        self.model = Red(input_shape=(self.state_size,), action_space = self.action_size)

        # Red Objetivo
        self.target_model = Red(input_shape=(self.state_size,), action_space = self.action_size)
        self.target_model.set_weights(self.model.get_weights())

    # Guarda la experiencia y actualiza el epsilon en caso de ser necesario
    def save_experience(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        # Epsilon-greedy
        if len(self.memory) > self.train_start and self.epsilon > self.epsilon_min:
                self.epsilon *= self.epsilon_decay

    # Retorna una acción mediante epsilon-greedy
    def get_action(self, state):
        if np.random.random() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            return np.argmax(self.model.predict(state))

    def transfer_weights(self):
        self.target_model.set_weights(self.model.get_weights())

      
    # Entrenamiento mediante experience replay
    def experience_replay(self):
        if len(self.memory) < self.train_start:
            return
        # Se saca una muestra de las experiencias (subsampling)
        minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))


        state = np.zeros((self.batch_size, self.state_size))
        next_state = np.zeros((self.batch_size, self.state_size))
        action, reward, done = [], [], []


        # Se llenan las colecciones a utilizar con los datos de cada experiencia seleccionada
        for i in range(self.batch_size):
            state[i] = minibatch[i][0]
            action.append(minibatch[i][1])
            reward.append(minibatch[i][2])
            next_state[i] = minibatch[i][3]
            done.append(minibatch[i][4])

        target = self.model.predict(state)
        target_next = self.target_model.predict(next_state)

        for i in range(self.batch_size):
            # Se calculan los valores Q corregidos mediante la ecuación de Q-learning
            if done[i]:
                target[i][action[i]] = reward[i]
            else:
                target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))

        # Se entrena mediante los 
        self.model.fit(state, target, batch_size=self.batch_size, verbose=0)

    # Flujo de entrenamiento
    def run(self):
        for e in range(self.EPISODES):
            state = self.env.reset()
            state = np.reshape(state, [1, self.state_size])
            done = False
            i = 0

            maxp = -1.2

            while not done:
                cumulated_reward = 0

                action = self.get_action(state)
                                                                     
                next_state, reward, done, _ = self.env.step(action)
                next_state = np.reshape(next_state, [1, self.state_size])
            
                # reward += 1000*((math.sin(3*next_state[0,0]) * 0.0025 + 0.5 * next_state[0,1] * next_state[0,1]) - (math.sin(3*state[0,0]) * 0.0025 + 0.5 * state[0,1] * state[0,1]))
                if((math.sin(3*next_state[0,0]) * 0.0025 + 0.5 * next_state[0,1] * next_state[0,1]) > 0):
                  reward += 1
                
                # 100 * sin(3 * sgte_posicion) * gravedad * 0.5 * sgte_velocidad2 - sin(3 * posicion_actual) * gravedad * 0.5 * velocidad_actual2

                maxp = max(maxp, next_state[0,0])

                cumulated_reward = cumulated_reward + reward
                self.save_experience(state, action, reward, next_state, done)
                state = next_state
                i += 1
                # Transferencia de los pesos de la red principal a la objetivo
                if e % 2 == 0:
                  self.transfer_weights()
                if done:                   
                    print("episode: {}/{}, score: {}, epsilon: {:.2}, episode reward: {}, max position: {}".format(e, self.EPISODES, i, self.epsilon, cumulated_reward, maxp))
                self.experience_replay()

In [None]:
agent = DQNAgent()
agent.run() 

episode: 0/10, score: 200, epsilon: 2.0, episode reward: -1.0, max position: -0.42640820608403596
episode: 1/10, score: 200, epsilon: 2.0, episode reward: -1.0, max position: -0.40720178283662134
episode: 2/10, score: 200, epsilon: 2.0, episode reward: -1.0, max position: -0.479664349064859
episode: 3/10, score: 200, epsilon: 2.0, episode reward: -1.0, max position: -0.33012603023866305
episode: 4/10, score: 200, epsilon: 2.0, episode reward: -1.0, max position: -0.3078662800064746
episode: 5/10, score: 200, epsilon: 1.6, episode reward: -1.0, max position: -0.4043783128536717
episode: 6/10, score: 200, epsilon: 1.3, episode reward: -1.0, max position: -0.30745443540877415
episode: 7/10, score: 200, epsilon: 1.1, episode reward: -1.0, max position: -0.2420607536007776
episode: 8/10, score: 200, epsilon: 0.9, episode reward: -1.0, max position: -0.401887294226157
episode: 9/10, score: 200, epsilon: 0.74, episode reward: -1.0, max position: -0.429446610003845


In [None]:
# Librerías para el entorno de gym
!pip install colabgymrender > /dev/null 2>&1 

In [None]:
import gym
from colabgymrender.recorder import Recorder

env = gym.make('MountainCar-v0')
directory = './video'
env = Recorder(env, directory)

# Para que siempre obtenga la máxima recompensa en el testing
agent.epsilon = -1
observation = env.reset()
terminal = False
while not terminal:
  state = np.reshape(observation, [1, agent.state_size])
  action = agent.get_action(state)
  observation, reward, terminal, info = env.step(action)

env.play()