In [None]:
# Pygame installation
!pip install pygame

In [None]:
import pygame as pg
from pygame import image as img

In [None]:
# Gym Imports
import gym
from gym import Env
from gym.spaces import Discrete, Box, Dict, Tuple, MultiBinary, MultiDiscrete # different types of spaces

# Helpers
import numpy as np
import random
import os

# Stable baselines
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy

# Time module to make program halt for presentation purposes
import time

In [None]:
# Function definitions for use
def load_image(file):
    """loads an image, prepares it for play"""
    try:
        surface = pg.image.load(file)
    except pg.error:
        raise SystemExit('Could not load image "%s" %s' % (file, pg.get_error()))
    return surface.convert_alpha() # convert_alpha allows for transparency from .pngs

In [None]:
pg.init()
win = pg.display.set_mode((0,0))
pg.display.set_caption("Custom Environment - Guess Path")

class Robot(pg.sprite.Sprite):
    """Visual for AI space and movement direction"""
    images = [load_image("assets/sprites/robo_R.png"), 
              load_image("assets/sprites/robo_L.png"), 
              load_image("assets/sprites/robo_U.png"),
              load_image("assets/sprites/robo_D.png")]

    def __init__(self, pos):
        pg.sprite.Sprite.__init__(self, self.containers)
        self.image = self.images[0]
        self.rect = self.image.get_rect()
        self.rect.x = pos[0] - (self.rect.width / 2.0)
        self.rect.y = pos[1] - (self.rect.height / 2.0)

    def update(self, action: int):
        if action >= 0:
            self.image = self.images[action]
    
    def setPosition(self, newPos):
        self.rect = self.image.get_rect()
        self.rect.x = newPos[0] - (self.rect.width / 2.0)
        self.rect.y = newPos[1] - (self.rect.height / 2.0)

class Space(pg.sprite.Sprite):
    """Visual for space type. Normal, positive or negative"""
    images = [load_image("assets/sprites/normal.png"), 
              load_image("assets/sprites/plus.png"), 
              load_image("assets/sprites/minus.png"),
              load_image("assets/sprites/start.png"), 
              load_image("assets/sprites/goal.png")]
    spaceType = 0

    def __init__(self, pos, jumpDistance):
        pg.sprite.Sprite.__init__(self, self.containers)
        
        self.jumpDistance = jumpDistance
        
        if(jumpDistance == 0):  # neutral movement - does not push Robot
            self.image = self.images[0]
        elif(jumpDistance > 0): # positive movement - pushes robot forward
            self.image = self.images[1]
        else:                    # negative movement - pushes robot backward
            self.image = self.images[2]

        self.rect = self.image.get_rect()
        self.rect.x = pos[0] - (self.rect.width / 2.0)
        self.rect.y = pos[1] - (self.rect.height / 2.0)
        
    def setType(self, spaceType):
            self.image = self.images[spaceType]
            self.spaceType = spaceType
            
            if spaceType == 1:
                self.jumpDistance = 2
            elif spaceType == 2:
                self.jumpDistance = -2

# Initialize Game Groups
all = pg.sprite.RenderUpdates()
    
Space.containers = all
Robot.containers = all

# Board does not require rendering
        
class Board():
    """Board that sets up and displays all spaces"""
    def __init__(self, maxCols, pos):
        self.spaces = [[], [], [], [], []]
        
        self.maxCols = maxCols
        
        if self.maxCols < 1:
            self.maxCols = 1 # cap minimum value to max rows in case of emergency
        
        self.middle = (len(self.spaces) // 2)
        self.playerPos = [0,self.middle]
        self.goal = [self.middle, self.maxCols - 1]
        
        # Thanks to Jack Malone for help with this loop
        for index, value in enumerate(self.spaces):
                    for y in range(self.maxCols):
                            value.append(Space(((64 * y) + pos[0], (64 * index) + pos[1]), 0))
                        
    def setStartEnd(self):
        self.spaces[self.playerPos[1]][self.playerPos[0]].setType(3)
        self.spaces[self.goal[0]][self.goal[1]].setType(4)
                        
    def update(self, robot: Robot, action: int):
        if action >= 0:
            if action == 0: # right
                  if self.playerPos[0] < self.maxCols - 1:
                        self.playerPos[0] += 1
            elif action == 1: # left
                 if self.playerPos[0] > 0:
                        self.playerPos[0] -= 1
            elif action == 2: # up
                if self.playerPos[1] > 0:
                        self.playerPos[1] -= 1
            elif action == 3: # down
                if self.playerPos[1] < len(self.spaces) - 1:
                        self.playerPos[1] += 1
            
            # first check to see if the position has to be updated
            # as the robot may move onto a push space
            
            pushValue = self.spaces[self.playerPos[1]][self.playerPos[0]].jumpDistance
            
            if pushValue != 0: # only do push calculations if the robot has to be pushed
                self.playerPos[0] += pushValue

                # now check to see if the player has jumped outside the bounds of the board
                if self.playerPos[0] > self.maxCols - 1:
                    self.playerPos[0] = self.maxCols - 1
                elif self.playerPos[0] < 0:
                    self.playerPos[0] = 0
                
            # now that the robot has moved, update it's position
            robot.setPosition(((self.playerPos[0] * 64) + 96, (self.playerPos[1] * 64) + 96))
            
    def initializeSpaces(self):
        self.spaces[self.middle][3].setType(1)
        self.spaces[0][4].setType(2)
        self.spaces[1][4].setType(2)
        self.spaces[2][4].setType(2)
        self.spaces[3][4].setType(2)
        self.spaces[4][4].setType(2)
        
        self.spaces[0][7].setType(1)
        self.spaces[0][8].setType(2)
        self.spaces[1][8].setType(2)
        self.spaces[2][8].setType(2)
        self.spaces[3][8].setType(2)
        self.spaces[4][8].setType(2)
        
        self.spaces[3][11].setType(1)
        self.spaces[0][12].setType(2)
        self.spaces[1][12].setType(2)
        self.spaces[2][12].setType(2)
        self.spaces[3][12].setType(2)
        self.spaces[4][12].setType(2)
        
    def winCheck(self):
        if(self.playerPos[1] == self.goal[0] 
           and self.playerPos[0] == self.goal[1]):
            return True
        return False

pg.quit()



In [None]:
class PathEnv(Env):
    win = pg.display.set_mode((1600,800))
    pg.display.set_caption("Custom Environment - Guess Path RL")
    bg = load_image('assets/sprites/background.png')
    board = Board(16, (100,100))
    board.setStartEnd()
    board.initializeSpaces()
    robo = Robot(((board.playerPos[0] * 64) + 96, 
              (board.playerPos[1] * 64) + 96))
    action = -1
    clock = pg.time.Clock()
    
    def __init__(self):
        # Actions: 0 - Left, 1 - Up, 2 - Right, 3 - Down
        self.action_space = Discrete(4)
        # Create observation space
        # Our observation space will assume that
        # the player's starting position,
        # the goal, and what is to their left, right, up or down
        # will be a Array with 2 values, with a range of 0 to 5 (inclusive).
        # Using a data type of int 8 means it can only pick numbers from 0 -> 255
        self.observation_space = Dict({
            "currentPos": Box(low=0, high=5, shape= (2,), dtype=np.int8),
            "goalPos":    Box(low=0, high=5, shape= (2,), dtype=np.int8),
            "left":       Box(low=-1, high=2, shape= (1,), dtype=np.int8),
            "up":         Box(low=-1, high=2, shape= (1,), dtype=np.int8),
            "right":      Box(low=-1, high=2, shape= (1,), dtype=np.int8),
            "down":       Box(low=-1, high=2, shape= (1,), dtype=np.int8),
        })
        # Determine starting state upon initialization
        self.state = self.determineState()
        # 
        self.alloted_length = 600

    def step(self, action):
        
        self.clock.tick(30)
        
        # Update Environment elements
        self.robo.update(action)
        
        self.board.update(self.robo, action)
        
        # With elements updated, determine state, reward etc
        self.state = self.determineState()
        
        # Reduce alloted length to use environment by time moved via pygame clock
        self.alloted_length -= self.clock.get_time()

        # TODO: Calculate reward
        reward = 0
        
        # Check if environment is done
        if self.alloted_length <= 0: 
            done = True
        else:
            done = False
            
        if self.board.winCheck():
            reward = 100
            done = True
        
        # Needed during the return
        info = {}
  
        # Return step information
        return self.state, reward, done, info

    def render(self):
        win.blit(self.bg, (0,0))
        for r in self.board.spaces:
                for c in r: # position all spaces to the correct top left position
                    win.blit(c.image, (c.rect.x, c.rect.y))

        win.blit(self.robo.image, (self.robo.rect.x, self.robo.rect.y))
        pg.display.update()
    
    def reset(self):
        # Reset game elements
        
        self.board.playerPos = [0,self.board.middle] # reset player pos on board
        
        self.robo.setPosition(((self.board.playerPos[0] * 64) + 96, 
              (self.board.playerPos[1] * 64) + 96)) # place robo onto the correct spot
        
        # Reset starting state
        self.state = self.determineState()
        # Reset alloted time to interact
        self.alloted_length = 600
        return self.state
    
    def calculateReward(self):
        # calculate reward based for AI
        pass
    
    def determineState(self):
        # generate a dictionary that is the current state of the game
        # we will tell the AI it's position on the board, the goal's position on the board
        # and what each space type is in each direction
        
        # we need to determine if it is possible to move in all 4 directions
        # as the AI may be at the edge of a board
        left = self.board.playerPos[0] - 1 if self.board.playerPos[0] > 0 else -1
        up = self.board.playerPos[1] - 1 if self.board.playerPos[1] > 0 else -1
        right = self.board.playerPos[0] + 1 if self.board.playerPos[0] < self.board.maxCols - 1 else -1
        down = self.board.playerPos[1] + 1 if self.board.playerPos[1] < 2 else -1
        
        state = {
            "playerPos": self.board.playerPos,
            "goalPos": self.board.goal,
            "left": self.board.spaces[self.board.playerPos[1]][left].spaceType if left > -1 else -1,
            "up": self.board.spaces[up][self.board.playerPos[0]].spaceType if up > -1 else -1,
            "right": self.board.spaces[self.board.playerPos[1]][right].spaceType if right > -1 else -1,
            "down": self.board.spaces[down][self.board.playerPos[0]].spaceType if down > -1 else -1,
        }

        return state
pg.quit()

In [None]:
env = PathEnv()

In [None]:
env.action_space.sample()

In [None]:
# Run this cell to test that the Environment works properly
# This will randomly pick from a Discrete action step, no model is used here.

pg.init()
win = pg.display.set_mode((1600,800))
pg.display.set_caption("Custom Environment - Guess Path RL")

episodes = 10
for episode in range(1, episodes+1):
    obs = env.reset()
    done = False
    score = 0
    
    while not done:
        pg.event.get()
        env.render()
        action = env.action_space.sample()
        obs, reward, done, info = env.step(action)
        score += reward         
    print('Episode:{} Score:{}'.format(episode, score))
env.close()
pg.quit()

In [None]:
pg.quit() # optional pygame quit in case of error