In [1]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import pygame
import cv2
from stable_baselines3 import PPO


2023-10-11 16:47:15.680667: I tensorflow/core/util/port.cc:111] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-10-11 16:47:15.704411: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-10-11 16:47:15.704445: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-10-11 16:47:15.704463: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-10-11 16:47:15.709674: I tensorflow/core/platform/cpu_feature_g

In [2]:

class GridworldEnv(gym.Env):
    def __init__(self, grid_size):
        self.grid_size = grid_size
        self.grid = np.zeros((grid_size, grid_size), dtype=int)
        self.agent_pos = None
        self.apple_pos = None
        self.plate_pos = None
        self.apple_agent = False    
        self.agent_plate = False
        self.done = False
        self.reward = 0

        self.action_space = gym.spaces.Discrete(4)  # 4 possible actions: Up, Down, Left, Right
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(grid_size, grid_size), dtype=int)

        pygame.init()
        self.screen_size = (400, 400) 
        self.screen = pygame.display.set_mode(self.screen_size)
        self.apple_img = pygame.image.load("apple.png")
        self.plate_img = pygame.image.load("plate.png")
        self.robot_img = pygame.image.load("robot.png")

        # Scale apple and plate images to fit grid cells
        cell_size = self.screen_size[0] // grid_size
        self.apple_img = pygame.transform.scale(self.apple_img, (cell_size, cell_size))
        self.plate_img = pygame.transform.scale(self.plate_img, (cell_size, cell_size))
        self.robot_img = pygame.transform.scale(self.robot_img, (cell_size, cell_size))
        self.clock = pygame.time.Clock()
        self.target_fps = 3 



    def reset(self):

        # Randomly place the agent, apple, and plate in the grid
        self.agent_pos = np.random.randint(0, self.grid_size, size=2)
        self.apple_pos = np.random.randint(0, self.grid_size, size=2)
        while np.array_equal(self.apple_pos, self.agent_pos):
            self.apple_pos = np.random.randint(0, self.grid_size, size=2)
        self.plate_pos = np.random.randint(0, self.grid_size, size=2)
        while np.array_equal(self.plate_pos, self.agent_pos) or np.array_equal(self.plate_pos, self.apple_pos):
            self.plate_pos = np.random.randint(0, self.grid_size, size=2)

        self.grid = np.zeros((self.grid_size, self.grid_size), dtype=int)
        self.grid[tuple(self.agent_pos)] = 1
        self.grid[tuple(self.apple_pos)] = 2
        self.grid[tuple(self.plate_pos)] = 3

        self.done = False
        self.apple_agent = False
        self.agent_plate = False
        
        self.reward = 0
        return self.grid.copy()

    def step(self, action):
        if self.done:
            raise ValueError("Episode is already done. Please reset the environment.")

        movements = {
            0: (-1, 0),  # Up
            1: (1, 0),   # Down
            2: (0, -1),  # Left
            3: (0, 1)    # Right
        }
        agent_new_pos = tuple(self.agent_pos + np.array(movements[int(action)]))

        # Check for boundary constraints
        if 0 <= agent_new_pos[0] < self.grid_size and 0 <= agent_new_pos[1] < self.grid_size:
            if self.apple_agent:
                self.grid[tuple(self.apple_pos)] = 0 
            if self.agent_plate:
                self.grid[tuple(self.plate_pos)] = 3
                
            self.grid[tuple(self.agent_pos)] = 0
            self.agent_pos = np.array(agent_new_pos)
            self.grid[tuple(self.agent_pos)] = 1



        if np.array_equal(self.agent_pos, self.plate_pos):
            self.agent_plate = True
        if np.array_equal(self.agent_pos, self.apple_pos):
            self.apple_agent = True

        if self.apple_agent:

        # Check if the agent has successfully completed the task
            if np.array_equal(self.agent_pos, self.plate_pos):

                self.done = True
                self.reward = 100  

        return self.grid.copy(), self.reward, self.done, {}


    def render(self):
        self.screen.fill((255, 255, 255)) 
        cell_size = self.screen_size[0] // self.grid_size

        for row in range(self.grid_size):
            for col in range(self.grid_size):
                cell_value = self.grid[row, col]
                x, y = col * cell_size, row * cell_size

                if cell_value == 1:
                    self.screen.blit(self.robot_img, (x, y))  
                elif cell_value == 2:
                    self.screen.blit(self.apple_img, (x, y))  
                elif cell_value == 3:
                    self.screen.blit(self.plate_img, (x, y))  
                else:
                    color = (255, 255, 255)  # White for empty

                pygame.draw.rect(self.screen, (0, 0, 0), (x, y, cell_size, cell_size), 1)

        pygame.display.flip()
        self.clock.tick(self.target_fps)

    def close(self):
        pass


We visualize the environment in the cell below:

In [None]:
grid_size = 5
env = GridworldEnv(grid_size)
observation = env.reset()
done = False


fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter('gridworld_video.avi', fourcc, 3, env.screen_size)


for i in range(10000):    
    action = env.action_space.sample()
    observation, reward, done, _ = env.step(action)
    env.render()
    pygame.event.get()  
    frame = pygame.surfarray.array3d(env.screen)
    frame = np.transpose(frame, (1, 0, 2)) 
    video_writer.write(frame) 

    if done:
        break

video_writer.release()


pygame.quit()


Trainning PPO with the sparse reward:

In [3]:

# Create a gym environment
gridworld_env = GridworldEnv(grid_size=5)
pygame.quit()
# Create and train the PPO agent
model = PPO("MlpPolicy", gridworld_env, verbose=1,learning_rate=0.0001,gamma= 0.99,tensorboard_log=
            "ppo_gridworld_tensorboard/")

model.learn(total_timesteps=100000,tb_log_name='sparse')  # Adjust the number of timesteps as needed

# Save the trained model
model.save("ppo_gridworld")
del model  # Delete trained model to demonstrate loading

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ppo_gridworld_tensorboard/sparse_1




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 158      |
|    ep_rew_mean     | 100      |
| time/              |          |
|    fps             | 3578     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 135          |
|    ep_rew_mean          | 100          |
| time/                   |              |
|    fps                  | 2231         |
|    iterations           | 2            |
|    time_elapsed         | 1            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0030164951 |
|    clip_fraction        | 0.00444      |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.39        |
|    explained_variance   | 0.000291     |
|    learning_r

You may test the trained agent using sparse reward using the code below. It will visulaize the agent and the trajectories using pygame: 

In [15]:
gridworld_env = GridworldEnv(grid_size=5)

model = PPO.load("ppo_gridworld")
# Test the trained model
obs = gridworld_env.reset()

while True:
    action, _states = model.predict(obs)
    obs, reward, done, _ = gridworld_env.step(action)
    gridworld_env.render()
    if done:
        break

pygame.quit()
print(reward)

gridworld_env.close()




100


__Reward Shaping__: In this part, we shaped the reward using the l1 norm distance between the agent and the apple until it finds the apple, and then we added the negative of the l1 norm distance between the agent and the plate as the new reward for the second phase of the episode. Finally, if the agent gets to the plate, it will get a rewrd of 100. 

In [16]:
def l1distance(point1, point2):

    return np.linalg.norm(np.array(point1)-np.array(point2), ord=1)

In [None]:

class GridworldEnv(gym.Env):
    def __init__(self, grid_size):
        self.grid_size = grid_size
        self.grid = np.zeros((grid_size, grid_size), dtype=int)
        self.agent_pos = None
        self.apple_pos = None
        self.plate_pos = None
        self.apple_agent_dist = 5
        self.plate_agent_dist = 5
        self.apple_agent = False    
        self.agent_plate = False
        self.done = False
        self.reward = 0

        self.action_space = gym.spaces.Discrete(4)  # 4 possible actions: Up, Down, Left, Right
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(grid_size, grid_size), dtype=int)

        pygame.init()
        self.screen_size = (400, 400)  
        self.screen = pygame.display.set_mode(self.screen_size)
        self.apple_img = pygame.image.load("images/apple.png")
        self.plate_img = pygame.image.load("images/plate.png")
        self.robot_img = pygame.image.load("images/robot.png")

        cell_size = self.screen_size[0] // grid_size
        self.apple_img = pygame.transform.scale(self.apple_img, (cell_size, cell_size))
        self.plate_img = pygame.transform.scale(self.plate_img, (cell_size, cell_size))
        self.robot_img = pygame.transform.scale(self.robot_img, (cell_size, cell_size))
        self.clock = pygame.time.Clock()
        self.target_fps = 3 



    def reset(self):

        self.agent_pos = np.random.randint(0, self.grid_size, size=2)
        self.apple_pos = np.random.randint(0, self.grid_size, size=2)
        while np.array_equal(self.apple_pos, self.agent_pos):
            self.apple_pos = np.random.randint(0, self.grid_size, size=2)
        self.plate_pos = np.random.randint(0, self.grid_size, size=2)
        while np.array_equal(self.plate_pos, self.agent_pos) or np.array_equal(self.plate_pos, self.apple_pos):
            self.plate_pos = np.random.randint(0, self.grid_size, size=2)

        self.grid = np.zeros((self.grid_size, self.grid_size), dtype=int)
        self.grid[tuple(self.agent_pos)] = 1
        self.grid[tuple(self.apple_pos)] = 2
        self.grid[tuple(self.plate_pos)] = 3

        self.done = False
        self.apple_agent = False
        self.agent_plate = False
        
        self.reward = 0
        return self.grid.copy()

    def step(self, action):
        if self.done:
            raise ValueError("Episode is already done. Please reset the environment.")

        # Define movement actions
        movements = {
            0: (-1, 0),  # Up
            1: (1, 0),   # Down
            2: (0, -1),  # Left
            3: (0, 1)    # Right
        }

        agent_new_pos = tuple(self.agent_pos + np.array(movements[int(action)]))

        # Check for boundary constraints
        if 0 <= agent_new_pos[0] < self.grid_size and 0 <= agent_new_pos[1] < self.grid_size:
            if self.apple_agent:
                self.grid[tuple(self.apple_pos)] = 0 
            if self.agent_plate:
                self.grid[tuple(self.plate_pos)] = 3
                
            self.grid[tuple(self.agent_pos)] = 0
            self.agent_pos = np.array(agent_new_pos)
            self.grid[tuple(self.agent_pos)] = 1





        if np.array_equal(self.agent_pos, self.plate_pos):
            self.agent_plate = True
        if np.array_equal(self.agent_pos, self.apple_pos):
            self.apple_agent = True


        if not self.apple_agent:
            self.reward = -l1distance(self.agent_pos, self.apple_pos)/100
        else:
            self.reward += -l1distance(self.agent_pos, self.plate_pos)/100

            if np.array_equal(self.agent_pos, self.plate_pos):

                self.done = True
                self.reward += 100  
        


        return self.grid.copy(), self.reward, self.done, {}


    def render(self):
        self.screen.fill((255, 255, 255))  # Fill the screen with white
        cell_size = self.screen_size[0] // self.grid_size

        for row in range(self.grid_size):
            for col in range(self.grid_size):
                cell_value = self.grid[row, col]
                x, y = col * cell_size, row * cell_size

                if cell_value == 1:
                    self.screen.blit(self.robot_img, (x, y))  # Draw robot image
                elif cell_value == 2:
                    self.screen.blit(self.apple_img, (x, y))  # Draw apple image
                elif cell_value == 3:
                    self.screen.blit(self.plate_img, (x, y))  # Draw plate image
                else:
                    color = (255, 255, 255)  # White for empty

                pygame.draw.rect(self.screen, (0, 0, 0), (x, y, cell_size, cell_size), 1)

        pygame.display.flip()
        self.clock.tick(self.target_fps)

    def close(self):
        pass


Trainning the agent using PPO and the shaped reward: 

In [18]:
gridworld_env = GridworldEnv(grid_size=5)
pygame.quit()

model = PPO("MlpPolicy", gridworld_env, verbose=1,learning_rate=0.0001,gamma= 0.99,tensorboard_log=
            "ppo_gridworld_tensorboard/")
model.learn(total_timesteps=100000,tb_log_name='Shaped')  

model.save("ppo_gridworld")
del model 

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ppo_gridworld_tensorboard/Shaped_1




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 126      |
|    ep_rew_mean     | 33.5     |
| time/              |          |
|    fps             | 3706     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 103          |
|    ep_rew_mean          | 60.8         |
| time/                   |              |
|    fps                  | 2443         |
|    iterations           | 2            |
|    time_elapsed         | 1            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0023710337 |
|    clip_fraction        | 0.000537     |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.39        |
|    explained_variance   | 0.00496      |
|    learning_r

Test the network:

In [19]:
model = PPO.load("ppo_gridworld")

gridworld_env = GridworldEnv(grid_size=5)


# Test the trained model
obs = gridworld_env.reset()

while True:
    action, _states = model.predict(obs)
    obs, reward, done, _ = gridworld_env.step(action)
    gridworld_env.render()
    if done:
        break

pygame.quit()

