In [1]:
import numpy as np

import gym
from gym import spaces

import pygame
from pygame import gfxdraw

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


In [40]:
class Point:
    def __init__(self, x, y, value):
        self.x = x
        self.y = y
        self.value = value
        self.coordinates = np.array([x, y], dtype=int)
        
    def update(self, x, y):
        self.x = x
        self.y = y
        self.coordinates = np.array([x, y], dtype=int)

In [46]:
points = []
width = 10
height = 20
for i in range(3):
    points.append(Point(x=np.random.randint(width - 1),
                   y=np.random.randint(height - 1),
                   value=2))
player = Point(x=np.random.randint(width - 1),
                             y=np.random.randint(height - 1),
                             value=1)

In [51]:
player.x, player.y

(6, 3)

In [59]:
points.remove(points[0])

In [61]:
points

[<__main__.Point at 0x2603355a8c8>, <__main__.Point at 0x2603355a988>]

In [53]:
np.argmin([abs(point.x - player.x) + abs(point.y - player.y) for point in points])

2

In [57]:
np.argmin([1, 1, 2])

0

In [30]:
class MazeEnv(gym.Env):
    def __init__(self, width=10, height=12):
        self.width = width
        self.height = height
        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Box(low=-max([height, width]),
                                            high=max([height, width]),
                                            shape=(1, 2),
                                            dtype=np.int16)
        self.current_episode = 0
        self.scores = []
        
    def add_point(self, x, y, value):
        """ Create and add a point to all coordinates """
        point = Point(x, y, value)
        self.coordinates.append(point)
        return point
        
    def create_numpy_map(self):
        """ Convert all coordinates to a numpy representation """
        world = np.zeros((self.height, self.width))
        
        for point in self.coordinates:
            world[(world.shape[0] - point.y -1, point.x)] = point.value
            
        return world
        
    def reset(self):
        """ Reset the environment to the beginning """
        
        # Init variables
        self.total_reward = 0
        self.coordinates = []
        self.current_step = 0
        self.max_step = 30
        self.state = "P"
        
        # Initialize exit and player
        self.exit = self.add_point(x = np.random.randint(self.width-1), 
                                   y = np.random.randint(self.height-1), 
                                   value = 2)
        self.player = self.add_point(x = np.random.randint(self.width-1), 
                                     y = np.random.randint(self.height-1), 
                                     value = 1)

        obs = np.array([self.exit.x - self.player.x, self.exit.y - self.player.y])
        
        return obs
    
    def move(self, action):
        """ Move the player to one space adjacent (up, right, down, left) """
        if action == 0 and self.player.y != (self.height - 1) : # up
            self.player.update(self.player.x, self.player.y + 1)
        elif action == 1 and self.player.x != (self.width - 1) : # right
            self.player.update(self.player.x + 1, self.player.y)
        elif action == 2 and self.player.y != 0 : # down
            self.player.update(self.player.x, self.player.y - 1)
        elif action == 3 and self.player.x != 0 : # left
            self.player.update(self.player.x - 1, self.player.y)
            
    def get_reward(self):
        """ Extract reward and whether the game has finished """
        if np.array_equal(self.player.coordinates, self.exit.coordinates):
            self.state = "W"
#             print(f'You won! - {self.current_episode}')
            reward = 200
            self.total_reward += 200
            done = True
        elif self.current_step == self.max_step:
            self.state = "L"
#             print(f"You lost - Didn't make it in time... {self.current_episode}")
            reward = -200
            self.total_reward -= 200
            done = True    
        elif self.state == 'P':
            reward = -1
            self.total_reward -= 1
            done = False
            
        return reward, done
        
    
    def step(self, action):
        """ Move a single step """
        self.current_step += 1
        self.move(action)
        reward, done = self.get_reward()
            
        if done:
            self.current_episode += 1
            self.scores.append(self.total_reward)

        obs = np.array([self.exit.x - self.player.x, self.exit.y - self.player.y])

        return obs, reward, done, {}
    
    def render(self):
        pygame.init()
        multiplier = 20
        self.screen = pygame.display.set_mode((round(self.width)*multiplier, round(self.height)*multiplier))
        clock = pygame.time.Clock()
        clock.tick(5)
        self.screen.fill((255, 255, 255))
        
        pygame.gfxdraw.filled_circle(self.screen, round(self.player.x*multiplier), round(self.player.y*multiplier), 
                                     int(multiplier/2), (255, 0, 0))
        pygame.gfxdraw.filled_circle(self.screen, round(self.exit.x*multiplier), round(self.exit.y*multiplier), 
                                     int(multiplier/2), (0, 255, 0))
        
        pygame.display.update()
        
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                return False
        return True

In [33]:
from stable_baselines.common.policies import MlpPolicy
# from stable_baselines.deepq.policies import MlpPolicy
from stable_baselines.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines import PPO2, DQN, SAC

env = DummyVecEnv([lambda: MazeEnv()])
model = PPO2(MlpPolicy, env, learning_rate=0.001, verbose=1)
# model = DQN(MlpPolicy, env, learning_rate=0.001, verbose=1)

model.learn(100_000, log_interval=100)

--------------------------------------
| approxkl           | 0.00077575486 |
| clipfrac           | 0.0           |
| explained_variance | 8.27e-05      |
| fps                | 376           |
| n_updates          | 1             |
| policy_entropy     | 1.3854253     |
| policy_loss        | -0.0034411512 |
| serial_timesteps   | 128           |
| time_elapsed       | 0             |
| total_timesteps    | 128           |
| value_loss         | 5901.9834     |
--------------------------------------
-------------------------------------
| approxkl           | 0.0006597665 |
| clipfrac           | 0.001953125  |
| explained_variance | 0.0039       |
| fps                | 1580         |
| n_updates          | 100          |
| policy_entropy     | 0.3426526    |
| policy_loss        | 0.0041638543 |
| serial_timesteps   | 12800        |
| time_elapsed       | 8.39         |
| total_timesteps    | 12800        |
| value_loss         | 8508.478     |
-------------------------------------

KeyboardInterrupt: 

In [34]:
obs = env.reset()
while True:
    action, _states = model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    if not env.render():
        break