# Herramientas para creacion de agente DQL con red convolucional

Vamos a crear algunas herramientas para un agente con una red neuronal convolucional que sea capaz de reconocer imagenes como datos de entrada. Entrenaremos al agente con juegos de Atari, por lo que tambien crearemos las herramientas del entorno.

Recordemos primero el decay.

In [10]:
import gym
import atari_py
import numpy as np
from collections import deque
from gym.spaces.box import Box
import cv2
import random
import torch
import json
from collections import namedtuple

In [1]:
class DecayLineal(object):
    def __init__(self, valor_ini, valor_final, pasos_max):
        assert valor_ini>valor_final, 'valor inicial > valor final'
        self.valor_ini=valor_ini
        self.valor_final=valor_final
        self.decay=(valor_ini-valor_final)/pasos_max

    def __call__(self, num_pasos):
        valor_actual=self.valor_ini-self.decay*num_pasos
        if valor_actual<self.valor_final:
            valor_actual=self.valor_final
        return valor_actual

Seguiremos implementando la red convolucional con Torch de 3 capas.

In [3]:
class CNN(torch.nn.Module):
    
    def __init__(self, dim_entrada, dim_salida, device='cpu'): # device=cpu o cuda
        
        super(CNN, self).__init__()
        
        self.device=device
        
        self.capa1=torch.nn.Sequential(
                    torch.nn.Conv2d(dim_entrad[0],
                                    64,
                                    kernel_size=4,
                                    stride=2,
                                    padding=1),
                    torch.nn.ReLU())
        
        
        self.capa2=torch.nn.Sequential(
                    torch.nn.Conv2d(64,
                                    32,
                                    kernel_size=4,
                                    stride=2,
                                    padding=0),
                    torch.nn.ReLU())
        
        
        self.capa3=torch.nn.Sequential(
                    torch.nn.Conv2d(32,
                                    32,
                                    kernel_size=3,
                                    stride=2,
                                    padding=0),
                    torch.nn.ReLU())
        
        self.salida=torch.nn.Linear(18*18*32, dim_salida)
        
       
    
    
    def forward(self, x):
        
        x=torch.from_numpy(x).float().to(self.device)
        x=self.capa1(x)
        x=self.capa2(x)
        x=self.capa3(x)
        x=x.view(x.shape[0], -1)
        x=self.salida(x)
        return x

Podriamos añadir mas capas a la red.

Ahora que hemos creado la red convolucional, vamos a crear un controlador de hiperparametros a traves de un JSON, siendo de esta manera mas facil manejar la red.

In [5]:
class ParamControl(object):
    
    def __init__(self, archivo): # archivo json, path
        
        self.params=json.load(open(archivo, 'r'))
        
        
    def parametros(self):
        return self.params
    
    
    def param_entorno(self):
        return self.params['env']
      
        
    def param_agente(self):
        return self.params['agent']
    
    
    def actualiza_param_agente(self, **kwargs):
        for k,v in kwargs.items():
            if k in self.params.keys():
                self.params['agent'][k]=v
            
            
    def exporta_param_entorno(self, archivo):
        with open(archivo, 'w') as f:
            json.dump(self.params['env'], f, indent=4, separators=(',',': '), sort_keys=True)
            f.write('\n')

            
    def exporta_param_agente(self, archivo):
        with open(archivo, 'w') as f:
            json.dump(self.params['agent'], f, indent=4, separators=(',', ': '), sort_keys=True)
            f.write('\n')

Ahora implementaremos un perceptron de una capa.

In [6]:
class Perceptron(torch.nn.Module):

    def __init__(self, dim_entrada, dim_salida, device=torch.device('cpu')):

        super(Perceptron, self).__init__()
        self.device=device
        self.dim_entrada=dim_entrada[0]
        self.dim_oculta=40
        self.linear=torch.nn.Linear(self.dim_entrada, self.dim_oculta)
        self.salida=torch.nn.Linear(self.dim_oculta, dim_salida)

    def forward(self, x):
        x=torch.from_numpy(x).float().to(self.device)
        x=torch.nn.functional.relu(self.linear(x))
        x=self.salida(x)
        return x

Replay de la experiencia.

In [7]:
Experiencia=namedtuple('Experiencia', ['obs', 'action', 'reward', 'next_obs', 'done'])

In [8]:
class Memoria(object):
    '''
    Implementacion de un buffer ciclico basado en la memoria de la experiencia
    '''
    def __init__(self, capacidad=int(1e6)):
        """
        capacidad: Max numero de experiencias
        """
        self.capacidad=capacidad
        self.mem_idx=0  # Indice de la experiencia actual
        self.memoria=[]

        
    def guarda(self, experiencia):
        '''
        experiencia: el objeto a ser guardado en memoria
        '''
        if self.mem_idx<self.capacidad:
            # Extiende la memoria y crea un espacio
            self.memoria.append(None)
        self.memoria[self.mem_idx%self.capacidad]=experiencia
        self.mem_idx+=1

        
    def muestra(self, batch_size):
        '''
        batch_size:  tamaño de la muestra
        '''
        assert batch_size<=len(self.memoria), 'El tamaño de la muestra esta disponible en memoria.'
        
        # se devuelve una lista de experiencias con muestreo aleatorio
        return random.sample(self.memoria, batch_size)

    
    def tamaño_muestra(self):
        return len(self.memoria) # numero de experiencias guardadas en memoria

Inicializa los pesos con el metodo Xavier. (Xavier Glorot, Yoshua Bengio, "Understanding the
difficulty of training deep feedforward neural networks").                      

In [9]:
def xavier(m):
    if isinstance(m, torch.nn.Conv2d) or isinstance(m, torch.nn.Linear):
        torch.nn.init.xavier_normal_(m.weight)

Frames de observacion.

In [11]:
class Frames(gym.ObservationWrapper):
    
    def __init__(self, entorno):
        
        super(ResizeReshapeFrames, self).__init__(entorno)
        if len(self.observation_space.shape)==3: 
            self.ancho=84
            self.alto=84
            self.canales=self.observation_space.shape[2]
            # canales x alto x ancho 
            self.espacio_obs=gym.spaces.Box(0, 255, (self.canales, 
                                                     self.alto,
                                                     self.ancho), 
                                            dtype=np.uint8)


    def observacion(self, obs):
        if len(obs.shape)==3:
            obs=cv2.resize(obs, (self.ancho, self.alto))
            if obs.shape[2]<obs.shape[0]:
                obs=np.reshape(obs, (obs.shape[2], obs.shape[1], obs.shape[0]))
        return obs


**Entorno (Atari)**

Utiles para definir y manejar el entorno.

In [None]:
# funcion para definir el entorno

def entorno(_id, conf):
    ent=gym.make(_id)
    if 'NoFrameskip' in _id:
        assert 'NoFrameskip' in ent.spec.id
        ent=NResetEnt(ent, noop_max=30)
        ent=MaxSkipEnt(ent, skip=conf['skip_rate'])

    if conf['episodic_life']:
        ent=VidaEpisodioEnt(ent)

    try:
        if 'FIRE' in ent.unwrapped.get_action_meanings():
            ent=FResetEnv(ent)
    except AttributeError:
        pass

    ent=AtariReescala(ent, conf['useful_region'])

    if conf['normalize_observation']:
        ent=NormalizaEnt(ent)

    ent=FrameStack(ent, conf['num_frames_to_stack'])

    #if conf['clip_reward']:  # La recompensa del clip se hace por el agente usando sus parametros
    #    ent=RecompensaClip(ent)
    
    
    return ent

In [12]:
# lista de juegos

def lista_juegos():
    return atari_py.list_games()

In [14]:
# recompensa dada por clip

class RecompensaClip(gym.RewardWrapper):
    
    def __init__(self, ent):
        gym.RewardWrapper.__init__(self, ent)

        
    def recompensa(self, recom):
        '''La recompensa del clip puede ser -1, 0 o +1'''  
        return np.sign(recom)


In [15]:
# para frames

def frame_84(frame, conf):
    frame=frame[conf['crop1']:conf['crop2']+160, :160]    # selecciona
    frame=frame.mean(2)                                   # media  
    #frame=frame.astype(np.float32)                       # a float
    #frame*=(1./255.)                                     # normalizacion
    frame=cv2.resize(frame, (84, conf['dimension2']))     # resize
    frame=cv2.resize(frame, (84, 84))
    frame=np.reshape(frame, [1, 84, 84])                  # reshape
    return frame


In [16]:
# reescalado de los frames

class AtariReescala(gym.ObservationWrapper):
    
    def __init__(self, ent, conf):
        gym.ObservationWrapper.__init__(self, ent)
        
        self.observation_space=Box(0, 255, [1, 84, 84], dtype=np.uint8)
        self.conf=onf

    def observacion(self, observacion):
        return frame_84(observacion, self.conf)


In [17]:
# normalizacion entorno

class NormalizaEnt(gym.ObservationWrapper):
    
    def __init__(self, ent=None):
        gym.ObservationWrapper.__init__(self, ent)
        
        self.media=0
        self.std=0
        self.alfa=0.9999
        self.num_pasos=0

    def observacion(self, observacion):
        self.num_pasos+=1
        self.media=self.media*self.alfa+observacion.mean()*(1-self.alfa)
        self.std=self.std*self.alfa+observacion.std()*(1-self.alfa)

        sin_bias_media=self.media/(1-pow(self.alfa, self.num_pasos))
        sin_bias_std=self.std/(1-pow(self.alfa, self.num_pasos))

        return (observacion-sin_bias_media)/(sin_bias_std+1e-8)

In [None]:
# noop reset del entorno

class NResetEnt(gym.Wrapper):
    
    def __init__(self, ent, noop_max=30):
        ' No-op se asume que es accion 0.'
        gym.Wrapper.__init__(self, ent)
        self.n_max=noop_max
        self.n_accion=0
        assert ent.unwrapped.get_action_meanings()[0]=='NOOP'

    def reset(self):
        ' Accion no-op para un numero de pasos en [1, noop_max].'
        self.ent.reset()
        noops=random.randrange(1, self.n_max+1)  
        assert noops>0
        obs=None
        
        for _ in range(noops):
            obs, _, done, _ = self.ent.step(self.n_action)
        return obs

    def paso(self, ac):
        return self.ent.step(ac)


In [None]:
# fire reset del entorno

class FResetEnt(gym.Wrapper):
    def __init__(self, ent):
        'Toma la accion en el reset para entornos que estan fijos hasta el fire.'
        gym.Wrapper.__init__(self, ent)
        assert ent.unwrapped.get_action_meanings()[1]=='FIRE'
        assert len(ent.unwrapped.get_action_meanings())>=3

    def reset(self):
        self.ent.reset()
        obs, _, done, _=self.ent.step(1)
        if done:
            self.ent.reset()
        obs, _, done, _=self.ent.step(2)
        if done:
            self.ent.reset()
        return obs

    def paso(self, ac):
        return self.ent.step(ac)


In [None]:
#

class VidaEpisodioEnt(gym.Wrapper):
    def __init__(self, ent):
        'Se hace fin-de-vida==fin-de-episodio, pero solo se resetea en el verdadero fin de juego.'
        
        gym.Wrapper.__init__(self, ent)
        self.vidas=0
        self.verdadero_fin=True

    def paso(self, accion):
        obs, recompensa, done, info=self.ent.step(accion)
        self.verdadero_fin=True
        # check current lives, make loss of life terminal,
        # then update lives to handle bonus lives
        vidas=info['ale.lives']
        if vidas<self.vidas and vidas>0:
            # para el Qbert a veces es vidas==0 la condicion para unos pocos frames
            # asi que es importante mantener las vidas>0, se resetea cuendo el entorno devuelve done.
            done=True
            self.verdadero_fin=False
        self.vidas=vidas
        return obs, recompensa, done, info

    def reset(self):
        """Reset only when lives are exhausted.
        This way all states are still reachable even though lives are episodic,
        and the learner need not know about any of this behind-the-scenes.
        """
        if self.verdadero_fin:
            obs=self.ent.reset()
            self.vidas=0
        else:
            # no-op step to advance from terminal/lost life state
            obs, _, _, info=self.ent.step(0)
            self.vidas=info['ale.lives']
        return obs



In [None]:
#

class MaxSkipEnt(gym.Wrapper):
    def __init__(self, env=None, skip=4):
        """Return only every `skip`-th frame"""
        gym.Wrapper.__init__(self, env)
        # most recent raw observations (for max pooling across time steps)
        self._obs_buffer = deque(maxlen=2)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        done = None
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            self._obs_buffer.append(obs)
            total_reward += reward
            if done:
                break

        max_frame = np.max(np.stack(self._obs_buffer), axis=0)
        return max_frame, total_reward, done, info

    def reset(self):
        """Clear past frame buffer and init. to first obs. from inner env."""
        self._obs_buffer.clear()
        obs = self.env.reset()
        self._obs_buffer.append(obs)
        return obs


In [None]:
#

class FrameStack(gym.Wrapper):
    def __init__(self, env, k):
        """Stack k last frames.

        Returns lazy array, which is much more memory efficient.
        From baselines atari_wrapper
        """
        gym.Wrapper.__init__(self, env)
        self.k = k
        self.frames = deque([], maxlen=k)
        shp = env.observation_space.shape
        self.observation_space = Box(low=0, high=255, shape=(shp[0] * k , shp[1], shp[2]), dtype=np.uint8)

    def reset(self):
        ob = self.env.reset()
        for _ in range(self.k):
            self.frames.append(ob)
        return self._get_ob()

    def step(self, action):
        ob, reward, done, info = self.env.step(action)
        self.frames.append(ob)
        return self._get_ob(), reward, done, info

    def _get_ob(self):
        assert len(self.frames) == self.k
        return LazyFrames(list(self.frames))



In [None]:
#

class LazyFrames(object):
    def __init__(self, frames):
        """This object ensures that common frames between the observations are only stored once.
        It exists purely to optimize memory usage which can be huge for DQN's 1M frames replay
        buffers.
        This object should only be converted to numpy array before being passed to the model.
        """
        self._frames = frames
        self._out = None

    def _force(self):
        if self._out is None:
            self._out = np.concatenate(self._frames, axis=0)
            self._frames = None
        return self._out

    def __array__(self, dtype=None):
        out = self._force()
        if dtype is not None:
            out = out.astype(dtype)
        return out

    def __len__(self):
        return len(self._force())

    def __getitem__(self, i):
        return self._force()[i]
