In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
cd /content/drive/MyDrive/Maze/Maze

/content/drive/MyDrive/Maze/Maze


In [4]:
!pip install pygame

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pygame
  Downloading pygame-2.1.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (21.8 MB)
[K     |████████████████████████████████| 21.8 MB 131.2 MB/s 
[?25hInstalling collected packages: pygame
Successfully installed pygame-2.1.2


In [28]:
import os
os.environ["SDL_VIDEODRIVER"] = "dummy"

In [5]:
import os
import torch
import torch.nn
import torch.optim
import numpy as np
from read_maze import load_maze, get_local_maze_information
import pygame
import sys

pygame 2.1.2 (SDL 2.0.16, Python 3.7.13)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [6]:
#pycharm visualization
SCREENSIZE = W, H = 1200, 800
mazeWH = 800
origin = ((W - mazeWH)/2, (H - mazeWH)/2)
lw = 2 # linewidth of maze-grid

# Colors
GREY = (140,140,140) # (15,15,15)
DARKGREY = (27, 27,0)
RED = (255, 0, 0)
BLUE = (0, 0, 255)
GREEN = (0, 255, 0)
DARKGREEN = (0, 150, 0)
BLACK = (0, 0, 0)
WHITE = (255, 255, 201)


class Canvas:
    def __init__(self):
        self.step_cntr = 0
        self.cntr = 0

        self.maze = load_maze()
        self.shape = 201

        pygame.init()
        self.surface = pygame.display.set_mode(SCREENSIZE)
        self.actor = (1,1)

    def drawSquareCell(self, x, y, dimX, dimY, col=(0, 0, 0)):
        pygame.draw.rect(
            self.surface, col,
            (x, y, dimX, dimY)
        )

    def drawSquareGrid(self, origin, gridWH):
        CONTAINER_WIDTH_HEIGHT = gridWH
        cont_x, cont_y = origin

        # DRAW Grid Border:
        # TOP lEFT TO RIGHT
        pygame.draw.line(
            self.surface, BLACK,
            (cont_x, cont_y),
            (CONTAINER_WIDTH_HEIGHT + cont_x, cont_y), lw)
        # # BOTTOM lEFT TO RIGHT
        pygame.draw.line(
            self.surface, BLACK,
            (cont_x, CONTAINER_WIDTH_HEIGHT + cont_y),
            (CONTAINER_WIDTH_HEIGHT + cont_x,
             CONTAINER_WIDTH_HEIGHT + cont_y), lw)
        # # LEFT TOP TO BOTTOM
        pygame.draw.line(
            self.surface, BLACK,
            (cont_x, cont_y),
            (cont_x, cont_y + CONTAINER_WIDTH_HEIGHT), lw)
        # # RIGHT TOP TO BOTTOM
        pygame.draw.line(
            self.surface, BLACK,
            (CONTAINER_WIDTH_HEIGHT + cont_x, cont_y),
            (CONTAINER_WIDTH_HEIGHT + cont_x,
             CONTAINER_WIDTH_HEIGHT + cont_y), lw)


    def placeCells(self):
        # GET CELL DIMENSIONS...
        cellBorder = 0
        celldimX = celldimY = (mazeWH / self.shape)

        # DOUBLE LOOP
        for rows in range(201):
            for cols in range(201):
                # Is the grid cell tiled ?
                if (self.maze[rows][cols] == 0):
                    self.drawSquareCell(
                        origin[0] + (celldimY * rows)
                        + cellBorder + lw / 2,
                        origin[1] + (celldimX * cols)
                        + cellBorder + lw / 2,
                        celldimX, celldimY, col=BLACK)
                if cols == 199 and rows == 199:
                    self.drawSquareCell(
                        origin[0] + (celldimY * rows)
                        + cellBorder + lw / 2,
                        origin[1] + (celldimX * cols)
                        + cellBorder + lw / 2,
                        celldimX, celldimY, col=BLUE)

    def step(self, visible, idx, path):
        """Run the pygame environment for displaying the maze structure and visible (local) environment of actor
        """
        self.get_event()
        self.set_visible(visible, idx, path)

        self.surface.fill(GREY)
        self.drawSquareGrid(origin, mazeWH)

        self.placeCells()
        self.draw_visible()
        pygame.display.update()
        self.step_cntr += 1

    def set_visible(self, visible, idx, path):
        self.vis = visible
        self.actor = idx
        self.path = path

    def draw_visible(self):
        """Draw the visible environment around the actor

        Notes
        -----
        RED - signifies a fire
        DARKGREY - signifies a visible wall
        WHITE - signifies a path
        GREEN - indicates the actor's position
        """
        celldimX = celldimY = (mazeWH / self.shape)

        #self.visible = self.vis
        for s in self.path:
            self.drawSquareCell(
                origin[0] + (celldimY * s[1])
                + lw / 2,
                origin[1] + (celldimX * s[0])
                + lw / 2,
                celldimX, celldimY, col=DARKGREEN)


        for row in range(3):
            for col in range(3):
                c = self.actor[0] + (col - 1)
                r = self.actor[1] + (row - 1)


                if row == 1 and col == 1:
                    self.drawSquareCell(
                        origin[0] + (celldimY * r)
                        + lw / 2,
                        origin[1] + (celldimX * c)
                        + lw / 2,
                        celldimX, celldimY, col=GREEN)
                else:
                    if self.vis[row][col][1] > 0:

                        self.drawSquareCell(
                            origin[0] + (celldimY * r)
                            + lw / 2,
                            origin[1] + (celldimX * c)
                            + lw / 2,
                            celldimX, celldimY, col=RED)

    def get_event(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                sys.exit()

In [7]:
'''
代码的来源
https://github.com/alvaroprat97/RL-Maze-Solver/blob/master/Complex_Maze/agent.py
https://github.com/luispsantos/EL2805-Reinforcement-Learning/blob/main/lab2/lab2_instructions.pdf
https://zhuanlan.zhihu.com/p/346165057
https://github.com/xiaoyw71/Reinforcement-learning-practice
https://github.com/luispsantos/EL2805-Reinforcement-Learning/blob/4117514ae3cb3003df3785db64a9dc1d3a2b6954/lab1/problem1/maze.py
Source - https://github.com/philtabor/Youtube-Code-Repository/blob/master/ReinforcementLearning/CombinedExperienceReplay/memory_solution.py
https://github.com/lukun199/DQN_maze/blob/main/Main.py
'''


# LIGHT_RED = '#FFC4CC'
# LIGHT_GREEN = '#95FD99'
# BLACK = '#FFFFFF'
# WHITE = '#000000'
# LIGHT_PURPLE = '#E8D0FF'
# LIGHT_ORANGE = '#FAE0C3'


class Qmaze:
    def __init__(self):
        # Reset the total number of steps which the agent has taken
        self.num_steps_taken = 0
        self.rat = (1,1)
        self.goal = (199,199)
        self.rat_path = [self.rat]
        self.around_info = []
        self.observation = self.observe_environment

        
    @property
    def reset(self):
        self.num_steps_taken = 0
        self.rat = (1,1)
        self.goal = (199,199)
        self.rat_path = [self.rat]
        self.around_info = []
        self.observation = self.observe_environment

    @property
    def get_loc_info(self):
        return self.observation

    @property
    def get_rat_pos(self):
        return self.rat

    @property
    def get_rat_path(self):
        return self.rat_path

    @property
    def get_around_info(self):
      col, rol = self.get_rat_pos
      location = get_local_maze_information(rol,col)
      self.around_info = location.copy()
      return self.around_info

    # def get_around_info(self):
    #   return self.around_info
    @property
    def observe_environment(self):
        col, rol = self.get_rat_pos
        location = get_local_maze_information(rol,col)
        temp = []

        for i in range(location.shape[0]):
            for j in range(location.shape[1]):
                if location[i][j][0] == 0:
                    temp.append(0) #wall
                elif location[i][j][0] == 1 and location[i][j][1] ==0:
                    temp.append(1) #no fire and empty
                elif location[i][j][0] == 1 and location[i][j][1] >0:
                    temp.append(location[i][j][1]) #fire
                else:
                    temp.append(1) #自己的位置

        self.rat_path.append(self.rat)
        self.observation = temp
        return self.observation


    def step(self, action, score):
        """Sample environment dependant on action which has occurred

        Action Space
        ------------
        0 - no move
        1 - up
        2 - left
        3 - down
        4 - right

        """
#         global action_dir
        action_dir = {"0": {"id":'stay',
                    "move":(0,0)},
              "1": {"id":'up',
                    "move":(0,-1)},
              "2": {"id":'left',
                    "move":(-1,0)},
              "3": {"id":"down",
                    "move":(0,1)},
              "4": {"id":'right',
                    "move":(1,0)}
              }

#         global rewards_dir
        rewards_dir = {"onwards": -.04,
              "backwards":-0.0,
              "visited":-0.3,
              "blockedin":-0.1,
              "fire":-1.,
              "wall":-.75,
              "stay":-0.3,
              }

        #time.sleep(1) # delay for time animation
        self.num_steps_taken += 1 # increment time


        act_key = str(action)


        x_inc, y_inc = action_dir[act_key]['move'] # fetch movement from position (1,1)

        # If too much time elapsed you die in maze :( (terminate maze at this point)
        if score < -100:
            print('I became an old man and dies in this maze...')
            return self.observe_environment, -1., True # terminate

        obsv_mat = self.get_around_info # get prior position
        x, y = self.rat

        x_loc, y_loc = (1 + x_inc, 1 + y_inc) # Update Local Position

        is_blocked = True
        for o in obsv_mat:
            for s in o:
                if s[0] == 1 and s[1] == 0: # if there is a path and no fire, we are not blocked
                    is_blocked = False

        if action_dir[act_key]['id'] == 'stay' and is_blocked: # if we need to stay then reward
            return self.observe_environment, rewards_dir['blockedin'], False
        elif action_dir[act_key]['id'] == 'stay': # if we stay for no reason then penalise
            return self.observe_environment, rewards_dir['stay'], False

        if obsv_mat[y_loc][x_loc][0] == 0: # check for a wall
            return self.observe_environment, rewards_dir['wall'], False
        if obsv_mat[y_loc][x_loc][1] > 0: # check for a wall
            return self.observe_environment, rewards_dir['fire'], True

        # So if we do successfully move
        self.rat = new_pos = (x + x_inc, y + y_inc) # new global position if we move into a free space
        # Have we reached the end?
        if new_pos == (199, 199):
            return self.observation, rewards_dir['end'], True

        # Have we visited this spot already?
        if self.rat in self.rat_path:
            return self.observe_environment, rewards_dir['visited'], False

        # Are we moving towars the goal?
        if x_inc > 0 or y_inc > 0:
            return self.observe_environment, rewards_dir['onwards'], False

        # finally our only choice is to move away from goal
        return self.observe_environment, rewards_dir['backwards'], False




In [8]:
class ExperienceReplayBuffer():
    def __init__(self, input_shape, max_size, batch_size):
        #initial buffer
        self.batch_size = batch_size
        self.mem_size = max_size 
        self.mem_cntr = 0 
        # initialise state memory,new state memory,action memory and reward_memory
        self.state_memory = np.zeros((self.mem_size, *input_shape), dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, *input_shape), dtype=np.float32)
        self.action_memory = np.zeros(self.mem_size, dtype=np.int64)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)

    def store_buffer(self, state, state_, reward, action):
        # memory position
        index = self.mem_cntr % self.mem_size
        # assign memory
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        self.reward_memory[index] = reward
        self.action_memory[index] = action
        # increment
        self.mem_cntr += 1

    def sample_buffer(self):
        #sample buffer
        max_mem = min(self.mem_size, self.mem_cntr)
        btch = np.random.choice(max_mem, self.batch_size, replace=False)
        states = self.state_memory[btch]
        states_ = self.new_state_memory[btch]
        actions = self.action_memory[btch]
        rewards = self.reward_memory[btch]
        return states, actions, rewards, states_

    def is_sufficient(self):
        return self.mem_cntr > self.batch_size

In [20]:
class Network(torch.nn.Module):
    #The class initialisation function. 
    #This takes as arguments the dimension of the network's input(the dimension of the state), and the dimension of the network's output (the dimension of the action).
    def __init__(self, input_dims, n_actions, lr):
        # Call the initialisation function of the parent class.
        super(Network, self).__init__()
        # Define the network layers. This example network has two hidden layers, each with 100 units.
        self.layer_1 = torch.nn.Linear(input_dims, 100)
        self.layer_2 = torch.nn.Linear(100, 100)
        self.output_layer = torch.nn.Linear(100,n_actions)
        # optimization function, Loss function and device setting
        self.optimiser = torch.optim.SGD(self.parameters(), lr=lr)
        self.loss = torch.nn.MSELoss()
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.to(self.device)

    # Function which sends some input data through the network and returns the network's output.
    def forward(self, input):
        layer_1_output = torch.nn.functional.relu(self.layer_1(input))
        layer_2_output = torch.nn.functional.relu(self.layer_2(layer_1_output))
        action = self.output_layer(layer_2_output)
        return action


In [30]:
class Agent():
    # Function to initialise the agent
    def __init__(self, gamma, epsilon, lr, actions,input_dims, batch_size, mem_size, buffer_size,  episode_min=0.01, episode_dec=5e-7, steps_update=30):
        # Hyperparameter Settings
        self.learn_step_counter  = 0
        self.gamma = gamma
        self.epsilon = epsilon
        self.lr = lr
        self.num_actions = actions
        self.input_dims = input_dims
        self.batch_size = batch_size
        self.episode_min = episode_min
        self.episode_dec = episode_dec
        self.steps_update = steps_update # Steps used to updates Q-network
        self.action_space = [i for i in range(self.num_actions)]
        self.buffer_size = buffer_size
        self.memory = ExperienceReplayBuffer(buffer_size, mem_size, batch_size)
        self.q_network = Network(self.input_dims, self.num_actions, self.lr)
        self.target_q_network = Network(self.input_dims, self.num_actions, self.lr)

    # Take an epsilon-greedy action (uses epsilon_pick equation) and prefered action is found from DQN
    def greedy_epsilon(self, observation):
        # FIND ARGMAX A from predicted Q(S,A)
        if np.random.random() > self.epsilon:
            prediction = self.q_network.forward(torch.tensor([observation], dtype=torch.float).to(self.q_network.device))
            action = torch.argmax(prediction).item()
        # otherwise choose random action
        else:
            action = np.random.choice(self.action_space)
        return action

    # store all the elements
    def store_transition(self, state, state_, reward, action):
        self.memory.store_buffer(state, state_, reward, action)

    # Update target network as a copy of the Q network.
    def update_target_q_network(self):
      if self.learn_step_counter % self.steps_update == 0:
        dqn_dict =  torch.nn.Module.state_dict(self.q_network)
        torch.nn.Module.load_state_dict(self.target_q_network,dqn_dict)
      
    # Function to decrease our epsilon depending if we are exploring or if epsilon is above its threshold
    def dec_epsilon(self):
        if self.epsilon > self.episode_min:
            self.epsilon = self.epsilon - self.episode_dec
        else:
            self.episode_min

    def learn(self):
        # fill the memory
        if not self.memory.is_sufficient():
            return

        # Set all the gradients stored in the optimiser to zero.
        self.q_network.optimiser.zero_grad()
        self.update_target_q_network()

        # set memory
        state, actions, reward, state_ = self.memory.sample_buffer()
        index = np.arange(self.batch_size)
        state = torch.tensor(state).to(self.q_network.device)
        action = torch.tensor(actions).to(self.q_network.device)
        rewards = torch.tensor(reward).to(self.q_network.device)
        state_ = torch.tensor(state_).to(self.q_network.device)
        
        # predict the Q Values of all actions
        network_prediction  = self.q_network.forward(state)[index, actions]
        # successor state prediction from target network
        target_network_prediction = self.target_q_network.forward(state_)
        # Q value predicted by network for successor state
        bellman_temp = torch.max(target_network_prediction,dim = 1)[0] 

        # Best action at successor state tensor
        # q_eval = self.q_network.forward(state_)
        # max_actions = T.argmax(q_eval, dim=1)

        # R + gamma*Q
        q_target = rewards + self.gamma * torch.unsqueeze(bellman_temp,1)
        # Calculate the loss for this transition.
        loss = self.q_network.loss(q_target, network_prediction)
        # Compute the gradients based on this loss,the gradients of the loss with respect to the Q-network parameters.
        loss.backward()
        # Take one gradient step to update the Q-network.
        self.q_network.optimiser.step()
        # Iterate the training counter
        self.learn_step_counter += 1
        # epsilon decrease
        self.dec_epsilon()

In [26]:
def run(display_on =True):
    # Default (Fixed) Parameters
    gamma = 0.90
    epsilon = 1000
    lr = 0.0001
    epsilon_min = 0.1
    epsilon_dec = 1e-6
    # size = [9]
    input_dims = 9
    actions = 5
    replace_testnet = 30
    memsize = 1000000
    batch_size = 64
    buffer_size = [9]

    agent = Agent(
        gamma=gamma, epsilon=epsilon, lr=lr,
        input_dims=input_dims,buffer_size= buffer_size, actions=actions, mem_size=memsize, episode_min=epsilon_min,
                  batch_size=batch_size, episode_dec=epsilon_dec, steps_update=replace_testnet)

    if display_on:
        canv = Canvas()
    else:
        maze = load_maze()

    # plt = Plotter()
    env = Qmaze()
    print('...starting...')
    env.reset
    for i in range(epsilon):
        done = False
        observation = env.observe_environment

        if display_on:
            canv.set_visible(env.get_around_info.copy(), env.get_rat_pos, [])

        score = 0
        while not done:
            canv.step(env.around_info.copy(), env.rat, env.rat_path)
            print("position:",env.rat)
            action = agent.greedy_epsilon(observation)
            print("action:",action)
            observation_, reward, done = env.step(action, score)
            score += reward
            print("score:",score)
            agent.store_transition(observation, observation_, reward, action)
            agent.learn()
            observation = observation_

            
            # print(done)
        if env.rat==env.goal:
          print("sucessful")
          break
        else:
          # env.animate_solution()
          print("-----------------Next iteration-------------------")
    
    # env.animate_solution()
 
        # plt.data_in(score, wall_cntr=env.wall_cntr, stay_cntr=env.stay_cntr, visit_cntr=env.visit_cntr)
        # print(f'Ep {i}, score {score}, avg {plt.scores_avg[-1]}, epsilon {agent.epsilon}, lr {lr}')
        # print(f'    Stayed {env.stay_cntr} : Walls {env.wall_cntr}')
        # Save NN every 10 its
        # if i > 10 and i % 10 == 0:
        #     agent.save_models()
            # plt.live_plot()



In [33]:
if __name__ == '__main__':
    run(True)


...starting...
position: (1, 1)
action: 3
score: -0.04
position: (1, 2)
action: 1
score: -1.04
-----------------Next iteration-------------------
position: (1, 2)
action: 3
score: -1.0
-----------------Next iteration-------------------
position: (1, 2)
action: 1
score: -1.0
-----------------Next iteration-------------------
position: (1, 2)
action: 4
score: -0.75
position: (1, 2)
action: 1
score: -1.75
-----------------Next iteration-------------------
position: (1, 2)
action: 4
score: -0.75
position: (1, 2)
action: 3
score: -1.75
-----------------Next iteration-------------------
position: (1, 2)
action: 1
score: -0.3
position: (1, 1)
action: 1
score: -1.05
position: (1, 1)
action: 0
score: -1.35
position: (1, 1)
action: 1
score: -2.1
position: (1, 1)
action: 0
score: -2.4
position: (1, 1)
action: 0
score: -2.6999999999999997
position: (1, 1)
action: 3
score: -2.9999999999999996
position: (1, 2)
action: 0
score: -3.2999999999999994
position: (1, 2)
action: 1
score: -3.599999999999999


  return F.mse_loss(input, target, reduction=self.reduction)


[1;30;43m流式输出内容被截断，只能显示最后 5000 行内容。[0m
action: 1
score: -1.8
position: (19, 4)
action: 3
score: -2.1
position: (19, 5)
action: 4
score: -2.85
position: (19, 5)
action: 1
score: -3.15
position: (19, 4)
action: 3
score: -3.4499999999999997
position: (19, 5)
action: 3
score: -4.199999999999999
position: (19, 5)
action: 2
score: -5.199999999999999
-----------------Next iteration-------------------
position: (19, 5)
action: 2
score: -1.0
-----------------Next iteration-------------------
position: (19, 5)
action: 0
score: -0.3
position: (19, 5)
action: 1
score: -0.6
position: (19, 4)
action: 0
score: -0.8999999999999999
position: (19, 4)
action: 4
score: -1.65
position: (19, 4)
action: 2
score: -2.4
position: (19, 4)
action: 4
score: -3.15
position: (19, 4)
action: 4
score: -3.9
position: (19, 4)
action: 1
score: -4.2
position: (19, 3)
action: 1
score: -4.95
position: (19, 3)
action: 0
score: -5.25
position: (19, 3)
action: 2
score: -6.0
position: (19, 3)
action: 1
score: -6.75
position: 