In [None]:
import numpy as np
import gym
from gym import spaces
from random import sample
import ray

In [None]:
class find_grid_target_env(gym.Env):
    def __init__(self,config):
        super(find_grid_target_env,self).__init__()
        self.LEFT=0
        self.UP=1
        self.RIGHT=2
        self.DOWN=3
        self.grid_size=config['grid_size']
        self.agent_pos_x=0
        self.agent_pos_y=0
        n_actions=4
        self.action_space=spaces.Discrete(n_actions)
        self.observation_space=spaces.Box(low=np.array([0,0]),high=np.array([self.grid_size,self.grid_size]),
                                         dtype=np.float32)
        self.target_x=config['target_x']
        self.target_y=config['target_y']
        assert self.target_x<self.grid_size and self.target_y<self.grid_size, "target exceeds grid space"
        
        
    def reset(self):
        self.agent_pos_x=0
        self.agent_pos_y=0
        return np.array([self.agent_pos_x,self.agent_pos_y])
    
    def step(self,action):
        if action==self.LEFT:
            self.agent_pos_x-=1
        elif action==self.UP:
            self.agent_pos_y-=1
        elif action==self.RIGHT:
            self.agent_pos_x+=1
        elif action==self.DOWN:
            self.agent_pos_y+=1
            
        self.agent_pos_x=np.clip(self.agent_pos_x,0,self.grid_size)
        self.agent_pos_y=np.clip(self.agent_pos_y,0,self.grid_size)
        
        done=bool(self.agent_pos_x==self.target_x and self.agent_pos_y==self.target_y)
        reward=1 if self.agent_pos_x==self.target_x and self.agent_pos_y==self.target_y else 0
        
        info={}
        
        return np.array([self.agent_pos_x,self.agent_pos_y]),reward,done,info
    
    def render(self,mode='console'):
        if mode != 'console':
            raise NotImplementedError()
        agent_symbol='A'
        target_symbol='T'
        rest_symbol='.'
        
        for i in range(self.grid_size):
            for j in range(self.grid_size):
                if i==self.agent_pos_x and j==self.agent_pos_y:
                    print(agent_symbol,end='')
                elif i==self.target_x and j==self.target_y:
                    print(target_symbol,end='')
                else:
                    print(rest_symbol,end='')
            print('\n')
        print('\n\n\n')
    
    def close(self):
        pass

In [None]:
from ray.rllib.agents import ppo
from ray.tune.registry import register_env

ray.shutdown()
ray.init()

def env_creator(env_config):
    return find_grid_target_env(env_config)

register_env("exp_env_1",env_creator)

CONFIG=ppo.DEFAULT_CONFIG.copy()

CONFIG['log_level']='WARN'
CONFIG['num_workers']=1
CONFIG['framework']='torch'
CONFIG['evaluation_config']={
    'render_env':True
}
CONFIG['evaluation_num_workers']=1
CONFIG['env_config']={
    'grid_size':6,
    'target_x':4,
    'target_y':4
}
CONFIG['horizon']=200

trainer=ppo.PPOTrainer(env="exp_env_1",config=CONFIG)

In [None]:
n_iter=10
for i in range(n_iter):
    trainer.train()

In [None]:
trainer.evaluate()