In [1]:
from codecs import mbcs_decode
from ctypes.wintypes import WORD
from pickle import TUPLE
from platform import python_branch
import gym
import numpy as np
import pygame
from gym import spaces
from tensorflow import keras
from keras import layers
from keras import models
from keras import optimizers
import tensorflow as tf
from keras.optimizers import Adam
from keras import Sequential
from rl.agents import DQNAgent
from rl.memory import SequentialMemory
from rl.policy import BoltzmannQPolicy
#from tensorflow.keras.layers import Dense, Flatten
#import matplotlib.pyplot as plt



class GridWorldEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 3}

    def __init__(self, render_mode=None, size=7):
        self.size = size  # The size of the square grid
        self.window_size = 512  # The size of the PyGame window
        self.human = 'human'
        self.rgb = 'rgb_array'
        # Observations are dictionaries with the agent's and the target's location.
        # Each location is encoded as an element of {0, ..., `size`}^2, i.e. MultiDiscrete([size, size]).
        self.observation_space1 = spaces.Box(0,size-1, shape=(2,),dtype=int)
        self.observation_space = spaces.Dict(
            {
                "agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "obstacle": spaces.Box(0,size-1, shape=(2,), dtype=int),
            }
        )
        
        # We have 4 actions, corresponding to "right", "up", "left", "down",stay
        self.action_space = spaces.Discrete(4)

        """
        The following dictionary maps abstract actions from `self.action_space` to 
        the direction we will walk in if that action is taken.
        I.e. 0 corresponds to "right", 1 to "up" etc.
        """

        self._action_to_direction = {
            0: np.array([1, 0],dtype=int),
            1: np.array([0, 1],dtype=int),
            2: np.array([-1, 0],dtype=int),
            3: np.array([0, -1],dtype=int),
            #4: np.array([0,0])
        }

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None
   

    def _get_obs(self):
        if(self._target_location[0] and self._target_location[1]  in np.array( [self._agent_location + [1,1],
                                                self._agent_location +  [1,-1],
                                                self._agent_location +  [-1,-1],
                                                self._agent_location +  [-1,1],
                                                self._agent_location +  [2,0],
                                                self._agent_location +  [0,2],
                                                self._agent_location +  [-2,0],
                                                self._agent_location +  [0,-2],
                                                self._agent_location +  [1,0],
                                                self._agent_location +  [-1,0],
                                                self._agent_location +  [0,1],
                                                self._agent_location +  [0,-1]], dtype=int)):
            #pygame.quit()
            
            return {"target":self._target_location,"agent": self._agent_location }

        else:
            return {"agent": self._agent_location}

    def _get_info(self):
        return {"distance": np.linalg.norm(self._agent_location - self._target_location, ord=1)}

    def _get_steps(self):
        return {"steps":self.steps}
    #def _get_obstacle(self):
    #    return {"distance": np.linalg.norm(self._agent_location - self._obstacle_location, ord=1)}

    def reset(self, seed=2, options=None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

        # Choose the agent's location uniformly at random
        self._agent_location= self.np_random.integers(
                 self.size-1, self.size, size=2, dtype=int
            ) 
        self.steps = 0
        self.reward = 0
        self._obstacle_location = np.array([2,2])
        
 
        # We will sample the target's location randomly until it does not coincide with the agent's location
        self._target_location = self._agent_location
        while np.array_equal(self._target_location, self._agent_location) or np.array_equal(self._target_location, self._obstacle_location):
            self._target_location = self.np_random.integers(
                 0, self.size, size=2, dtype=int
            )

        self.observation = self._get_obs()
        self.info = self._get_info()
        self.observation_steps = self._get_steps()

        if self.render_mode == self.human:
            self._render_frame()

        return self.observation, self.info, self.observation_steps

    def step(self, action):
            # Map the action (element of {0,1,2,3}) to the direction we walk in
        self.reward_gain = 1
        self.reward_loss = 0.01
        self.steps += 1



        direction = self._action_to_direction[action]

        if(self._agent_location + direction ==self._obstacle_location).all():
            self._agent_location = self._agent_location -direction
        # We use `np.clip` to make sure we don't leave the grid
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1,
            
        )


        # An episode is done iff the agent has reached the target
        self.terminated = np.array_equal(self._agent_location, self._target_location)

        
        self.reward += self.reward_gain if self.terminated else - self. reward_loss  # Binary sparse rewards
        self.observation = self._get_obs()
        self.info = self._get_info()

        if self.render_mode == self.human:
            self._render_frame()

        return (self.observation, self.reward, self.terminated, False, self.info, self.steps)

    def render(self):
        if self.render_mode == self.rgb:
            return self._render_frame()

    def _render_frame(self):
        if self.window is None and self.render_mode == self.human:
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode((self.window_size, self.window_size))
        if self.clock is None and self.render_mode == self.human:
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface((self.window_size, self.window_size))
        canvas.fill((255, 255, 255))
        pix_square_size = (
            self.window_size / self.size
        )  # The size of a single grid square in pixels

        # First we draw the target
        pygame.draw.rect(
            canvas,
            (255, 0, 0),
            pygame.Rect(
                pix_square_size * self._target_location,
                (pix_square_size, pix_square_size),
            ),
        )
        pygame.draw.rect(
            canvas,
            (0, 0, 0),
            pygame.Rect(
                pix_square_size * self._obstacle_location,
                (pix_square_size, pix_square_size),
            ),
        )
        # Now we draw the agent
        pygame.draw.circle(
            canvas,
            (0, 0, 255),
            (self._agent_location + 0.5) * pix_square_size,
            pix_square_size / 3,
        )
        
        
       
        # Finally, add some gridlines
        for x in range(self.size + 1):
            pygame.draw.line(
                canvas,
                0,
                (0, pix_square_size * x),
                (self.window_size, pix_square_size * x),
                width=2,
            )
            pygame.draw.line(
                canvas,
                0,
                (pix_square_size * x, 0),
                (pix_square_size * x, self.window_size),
                width=2,
            )

        if self.render_mode == self.human:
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])
        else:  # rgb_array
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )

    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

    

In [2]:
human = 'human'
rgb = 'rgb_array'
env = GridWorldEnv(render_mode=rgb)
env.observation_space1.sample()

array([5, 2])

In [16]:
env.observation_space.sample()
env.reset()
steps = 0
episodes = 20
for episode in range(1, episodes+1):
    env.observation = env.reset()
    steps = 0
    done = False
    score = 0
    while not done:
        action = env.action_space.sample()
        env.observation = env.step(action)
        env.render()
        steps +=1
        if(np.array_equal(env._agent_location, env._target_location)):
            done = True
            score += env.reward
            
    print('Episode:{} Score:{}'.format(episode,score), "steps:{} ".format(steps))
env.close()

Episode:1 Score:-0.43000000000000105 steps:144 
Episode:2 Score:-0.33000000000000096 steps:134 
Episode:3 Score:-4.189999999999934 steps:520 
Episode:4 Score:-0.5400000000000011 steps:155 
Episode:5 Score:0.04999999999999938 steps:96 
Episode:6 Score:-0.6800000000000013 steps:169 
Episode:7 Score:-0.030000000000000693 steps:104 
Episode:8 Score:0.3199999999999996 steps:69 
Episode:9 Score:-3.2299999999999542 steps:424 
Episode:10 Score:-0.9300000000000015 steps:194 
Episode:11 Score:0.7599999999999999 steps:25 
Episode:12 Score:0.74 steps:27 
Episode:13 Score:0.85 steps:16 
Episode:14 Score:0.94 steps:7 
Episode:15 Score:0.05999999999999939 steps:95 
Episode:16 Score:0.86 steps:15 
Episode:17 Score:0.24999999999999956 steps:76 
Episode:18 Score:-1.9599999999999809 steps:297 
Episode:19 Score:0.3899999999999997 steps:62 
Episode:20 Score:-1.9199999999999817 steps:293 


In [17]:
states  = env.observation_space.shape
actions = env.action_space.n

In [28]:

def build_model(states, actions):
        model = Sequential() 
        model.add(layers.Dense(24, activation='relu', input_shape=(2,)))
        #model.add(layers.Dense(24, activation='relu', input_shape=(1,3)))
        model.add(layers.Dense(24, activation='relu'))
        model.add(layers.Dense(actions, activation='linear'))
        #model.add(layers.Flatten()) 
        return model



In [6]:
#del model

In [29]:
model = build_model(states, actions)
print(model.output_shape)

(None, 4)


In [30]:
model.summary()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_12 (Dense)            (None, 24)                72        
                                                                 
 dense_13 (Dense)            (None, 24)                600       
                                                                 
 dense_14 (Dense)            (None, 4)                 100       
                                                                 
Total params: 772
Trainable params: 772
Non-trainable params: 0
_________________________________________________________________


In [31]:
def build_agent(model,actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=50000, window_length=1)
    dqn = DQNAgent(model=model, memory=memory, policy=policy, 
        nb_actions=actions, nb_steps_warmup=10, target_model_update=1e-2)
    return dqn

In [32]:
dqn = build_agent(model, actions)
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=60000, visualize=False, verbose=1)

Training for 60000 steps ...
Interval 1 (0 steps performed)


ValueError: Error when checking input: expected dense_12_input to have 2 dimensions, but got array with shape (1, 1, 3)