In [1]:
import sys
import numpy as np
import math
import random
import json
import requests

import gym
import gym_maze
from gym_maze.envs.maze_manager import MazeManager
from riddle_solvers import *

In [2]:
from stable_baselines3 import A2C
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback

In [1]:
import numpy as np

import gym
from gym import error, spaces, utils
from gym.utils import seeding
from gym_maze.envs.maze_view_2d import MazeView2D


class MazeEnv(gym.Env):
    metadata = {
        "render.modes": ["human", "rgb_array"],
    }

    ACTION = ["N", "S", "E", "W"]

    def __init__(self, maze_file=None, maze_cells=None, maze_size=None, mode=None, enable_render=True, rescue_item_locations=None, has_loops=False):
        super().__init__()
        self.viewer = None
        self.enable_render = enable_render
        self.rescue_item_locations= rescue_item_locations


        if hasattr(maze_cells, 'shape'):
            self.maze_view = MazeView2D(maze_name="OpenAI Gym - Maze (10 x 10)",
                                        maze_cells=maze_cells,
                                        screen_size=(640, 640), 
                                        enable_render=enable_render, rescue_item_locations=self.rescue_item_locations)
        
        elif maze_file:
            self.maze_view = MazeView2D(maze_name="OpenAI Gym - Maze (%s)" % maze_file,
                                        maze_file_path=maze_file,
                                        screen_size=(640, 640), 
                                        enable_render=enable_render, rescue_item_locations=self.rescue_item_locations)
        elif maze_size:
            self.maze_view = MazeView2D(maze_name="OpenAI Gym - Maze (%d x %d)" % maze_size,
                                        maze_size=maze_size, screen_size=(640, 640),
                                        has_loops=has_loops,
                                        enable_render=enable_render, rescue_item_locations=self.rescue_item_locations)
        else:
            raise AttributeError("One must supply either a maze_file path (str) or the maze_size (tuple of length 2)")

        self.maze_size = self.maze_view.maze_size

        # forward or backward in each dimension
        self.action_space = spaces.Discrete(2*len(self.maze_size))
        self.final_pos = (maze_size[0]-1,maze_size[1]-1)
        # observation is the x, y coordinate of the grid
        low = np.zeros(len(self.maze_size), dtype=int)
        high =  np.array(self.maze_size, dtype=int) - np.ones(len(self.maze_size), dtype=int)
        self.observation_space = spaces.Box(0, 150, dtype=np.uint8, shape = (114,))
        self.reward_range = (-np.inf, np.inf)
        self.action_space = spaces.Discrete(4) 
        self.observation_space = spaces.Dict({
            'my_feature': spaces.Box(low=0, high=120, shape=(114,), dtype=np.float32)
        #     'desired_goal': spaces.Box(low=0, high=120, shape=(114,), dtype=np.float32),
        #     'achieved_goal': spaces.Box(low=0, high=120, shape=(114,), dtype=np.float32)
        })
        # initial condition
        self.state = None
        self.steps_beyond_done = None
        # Simulation related variables.
        self.reset()

        # Just need to initialize the relevant attributes
        self.configure()
        self.seed()

        self.target_locations = self.maze_view.rescue_item_locations
        self.target_reached = 0
        self.tot_reward = 0

        # self.maze_view.maze.save_maze('./hacktrick.npy')

        self.rescued = 0
        self.last_pos_x =self.maze_view.robot[0]
        self.last_pos_y = self.maze_view.robot[1]
        
        self.cur_pos_x = 0
        self.cur_pos_y = 0
        
        self.last_action = 0
        self.steps = None
        self.steps_count = 0
        
        self.temp = []
        self.visited = []
        
        self.done = np.zeros((10,10))
        self.done[0][0] = 1
        self.state = self.add_done_to_state()

        
    def __del__(self):
        if self.enable_render is True:
            self.maze_view.quit_game()

    def configure(self, display=None):
        self.display = display

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def step(self, action):
        
        info = {}
        self.last_pos_x = self.maze_view.robot[0]
        self.last_pos_y = self.maze_view.robot[1]
        
        
        if isinstance(action, int):
            self.maze_view.move_robot(self.ACTION[action])
        else:
            self.maze_view.move_robot(self.ACTION[action])

        distances = self.maze_view.get_rescue_items_locations()[0]
        directions = self.maze_view.get_rescue_items_locations()[1]

        info['rescued_items'] = self.maze_view.rescued_items


        self.state = [self.maze_view.robot, distances, directions]
        # self.state = self.maze_view.robot
        # print("new state",self.state_new)
        self.state = self.fix_state()
        
        np.append(self.state, self.last_action)
        self.last_action = action
        
        
        self.cur_pos_x = self.state[0]
        self.cur_pos_y = self.state[1]
        
        self.done[self.maze_view.robot[0]][self.maze_view.robot[0]] = 1
        
        truncated = False
        
        if self.steps_count > 5000:
            truncated = True
            
        info['riddle_type'] = None
        info['riddle_question'] = None
        
        reward = self.compute_reward()
        terminated = False
        self.tot_reward += reward

        if self.tot_reward < -500:
            terminated = True
            
        self.steps +=1
        self.steps_count+=1
        self.normalize_state()
        
        self.temp = []
        self.temp.append(self.state[0])
        self.temp.append(self.state[1])
        
        self.state = self.add_done_to_state()
        return self.state, reward, terminated, info

    def add_done_to_state(self):
        goal = []
        for i in self.state:
            goal.append(i)
            
        for arr in self.done:
            for n in arr:
                goal.append(n)
        
        return np.array(goal, dtype=float)
                
    def compute_reward(self):
        
        if self.temp not in self.visited:
            self.visited.append(self.temp)
            return 0.2
        reached = False
        
        
        new_targets = []
        for i in self.target_locations:
            if (self.cur_pos_x == i[0]) and (self.cur_pos_y == i[1]):
                reached = True
                continue
            new_targets.append(i)
        self.target_locations = new_targets
        
        if reached:
            return 0.3
    
        elif (self.state[0]  == self.last_pos_x) and (self.last_pos_y == self.state[1]):
            return -0.1
        
        elif (self.state[0] == 9) and (self.state[1] == 9) :
            self.terminated = True
            return 0.5 * (4 - len(self.target_locations))
        
        else:
            return -0.01
        
        
    def get_current_state(self):
        info = {}
        distances = self.maze_view.get_rescue_items_locations()[0]
        directions = self.maze_view.get_rescue_items_locations()[1]

        info['rescued_items'] = self.maze_view.rescued_items


        self.state = [self.maze_view.robot, distances, directions]
        truncated = False
        
        info['riddle_type'] = None
        info['riddle_question'] = None
        
        reward = None
        terminated = False

        return self.state, reward, terminated, truncated, info

    def reset(self):
        self.done = np.zeros((10,10))
        self.done[0][0] = 1
        self.rescue_item_locations = self.get_rescue_points()
        self.seed()
        self.target_locations = self.maze_view.rescue_item_locations
        self.target_reached = 0
        self.tot_reward = 0
        # self.maze_view.maze.save_maze('./hacktrick.npy')
        self.rescued = 0
        self.last_pos_x =self.maze_view.robot[0]
        self.last_pos_y = self.maze_view.robot[1]
        self.cur_pos_x = 0
        self.cur_pos_y = 0
        self.last_action = 0
        self.steps = None
        self.steps_count = 0
        self.visited = []
        self.maze_view.reset_robot()
        self.state = [self.maze_view.robot, self.maze_view.get_rescue_items_locations()[0], self.maze_view.get_rescue_items_locations()[1]]
        self.steps_beyond_done = None
        self.steps = 0
        self.terminated = False
        self.truncated = False
        self.maze_view.reset_rescue_items()
        self.state = self.fix_state()
        self.tot_reward = 0
        self.temp = []
        self.visited = []
        self.last_pos_x = 0
        self.last_pos_y = 0
        self.last_action =0
        np.append(self.state, self.last_action)
        self.pos = (0,0)
        self.target_locations = self.maze_view.rescue_item_locations
        self.normalize_state()
        self.state = self.add_done_to_state()
        return self.state

    def is_game_over(self):
        return self.maze_view.game_over

    def render(self, mode="human", close=False):
        if close:
            self.maze_view.quit_game()

        return self.maze_view.update(mode)

    def fix_state(self):
        state = self.state
        state0 = state[0]
        state1 = state[1]
        state2 = state[2]
    
        ans = []
        for i in state0:
            ans.append(i)
        for i in state1:
            ans.append(i)
        for i in state2:
            for j in i:
                ans.append(j)
        return np.array(ans, dtype=float)
    
    def normalize_state(self):
        dis_sum = sum(self.state[2:6]) 
        self.state[2] /= dis_sum
        self.state[3] /= dis_sum
        self.state[4] /= dis_sum
        self.state[5] /= dis_sum
        
        self.state[0] /= 9
        self.state[1] /= 9
    
    def get_rescue_points(self):
        random_tuples = set()  # Create an empty set to hold the tuples 

        while len(random_tuples) < 4:  # Continue until we have 4 unique tuples 
            x = random.randint(1, 8) 
            y = random.randint(1, 8) 
            new_tuple = (x, y) 
            if new_tuple not in random_tuples:  # Only add the tuple if it's not already in the set 
                random_tuples.add(new_tuple) 
        return random_tuples

        
class MazeEnvSample5x5(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvSample5x5, self).__init__(maze_file="maze2d_5x5.npy", enable_render=enable_render, maze_cells=maze_cells, rescue_item_locations= rescue_item_locations)


class MazeEnvRandom5x5(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvRandom5x5, self).__init__(maze_size=(5, 5), enable_render=enable_render, maze_cells=maze_cells, mode='plus', rescue_item_locations= rescue_item_locations)


class MazeEnvSample10x10(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None, maze_file=None):
        super(MazeEnvSample10x10, self).__init__(maze_file=maze_file, enable_render=enable_render, maze_cells=maze_cells, rescue_item_locations= rescue_item_locations)


class MazeEnvRandom10x10(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvRandom10x10, self).__init__(maze_size=(10, 10), enable_render=enable_render, maze_cells=maze_cells, mode='plus', rescue_item_locations=rescue_item_locations)


class MazeEnvSample3x3(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvSample3x3, self).__init__(maze_file="maze2d_3x3.npy", enable_render=enable_render, maze_cells=maze_cells, rescue_item_locations= rescue_item_locations)


class MazeEnvRandom3x3(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvRandom3x3, self).__init__(maze_size=(3, 3), enable_render=enable_render, maze_cells=maze_cells, rescue_item_locations= rescue_item_locations)


class MazeEnvSample100x100(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvSample100x100, self).__init__(maze_file="maze2d_100x100.npy", enable_render=enable_render, maze_cells=maze_cells, rescue_item_locations= rescue_item_locations)


class MazeEnvRandom100x100(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvRandom100x100, self).__init__(maze_size=(100, 100), enable_render=enable_render, maze_cells=maze_cells, rescue_item_locations= rescue_item_locations)


class MazeEnvRandom10x10Plus(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvRandom10x10Plus, self).__init__(maze_size=(10, 10), mode="plus", enable_render=enable_render, maze_cells=maze_cells, rescue_item_locations= rescue_item_locations)


class MazeEnvRandom20x20Plus(MazeEnv):

    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvRandom20x20Plus, self).__init__(maze_size=(20, 20), mode="plus", enable_render=enable_render, maze_cells=maze_cells, rescue_item_locations= rescue_item_locations)


class MazeEnvRandom30x30Plus(MazeEnv):
    def __init__(self, enable_render=True, maze_cells=None, rescue_item_locations=None):
        super(MazeEnvRandom30x30Plus, self).__init__(maze_size=(30, 30), mode="plus", enable_render=enable_render, maze_cells=maze_cells, rescue_item_locations= rescue_item_locations)

In [2]:
rescue_items_location = [(1, 1), (8, 8), (4, 8), (3, 5)]

In [3]:
sample_maze = np.load("hackathon_sample.npy")

In [4]:
env = MazeEnv(maze_file='hackathon_sample.npy',maze_size=(10,10), rescue_item_locations=rescue_items_location, mode='human', maze_cells=sample_maze)

NameError: name 'random' is not defined

In [None]:
env.reset()

array([0.        , 0.        , 0.05263158, 0.42105263, 0.31578947,
       0.21052632, 1.        , 1.        , 1.        , 1.        ,
       1.        , 1.        , 1.        , 1.        , 1.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.     

In [5]:
import pygame
pygame.quit()

In [6]:
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
import tqdm
from stable_baselines3 import PPO


In [7]:
model= DQN("MlpPolicy", env, learning_rate=0.0001, buffer_size=1000000, learning_starts=50000, batch_size=32, tau=1.0, gamma=0.99, train_freq=4, gradient_steps=1, replay_buffer_class=None, replay_buffer_kwargs=None, optimize_memory_usage=False, target_update_interval=10000, exploration_fraction=0.5, exploration_initial_eps=1.0, exploration_final_eps=0.3, max_grad_norm=3, tensorboard_log=None, policy_kwargs=None, verbose=1, seed=15, device='auto', _init_setup_model=True)

NameError: name 'env' is not defined

In [None]:
model.learn(total_timesteps=1000000 , log_interval=5)  

----------------------------------
| rollout/            |          |
|    ep_len_mean      | 1.55e+04 |
|    ep_rew_mean      | -500     |
|    exploration_rate | 0.892    |
| time/               |          |
|    episodes         | 5        |
|    fps              | 1826     |
|    time_elapsed     | 42       |
|    total_timesteps  | 77388    |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.011    |
|    n_updates        | 6846     |
----------------------------------


<stable_baselines3.dqn.dqn.DQN at 0x24248593e80>

In [443]:
model.save("PPO.zip")

In [21]:
sample_maze = np.load("hackathon_sample.npy")

In [22]:
env = MazeEnv(maze_file='hackathon_sample.npy',maze_size=(10,10), rescue_item_locations=rescue_items_location, mode='human', maze_cells=sample_maze)

In [505]:
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=3)
print("Mean reward:", mean_reward, "±", std_reward)



KeyboardInterrupt: 

In [24]:
import cv2
import time

In [70]:
state, r, done, info = env.step(2)
print(state)

[ 0.44444444  0.          0.13333333  0.4         0.26666667  0.2
 -1.          1.          1.          1.          0.          1.
 -1.          1.          1.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  0.          1.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  1.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          1.
  0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          1.          0.
  0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  0.          0.        

In [47]:
import keyboard
state =env.reset()
for i in range(5000):
    action = model.predict(state)
    state,reward,done,info = env.step(action[0])
    img = env.render('rgb_array')
    img = cv2.resize(img,(720,720))
    img = cv2.putText(img,org=(50,20),text=f"reward:{reward:.3f},{done}",
                        fontFace=cv2.FONT_HERSHEY_SIMPLEX,fontScale=0.5,color=(0,255,0))
    cv2.imshow('sss',img)
    time.sleep(.01)
cv2.destroyAllWindows()

error: OpenCV(4.7.0) D:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\resize.cpp:4062: error: (-215:Assertion failed) !ssize.empty() in function 'cv::resize'
