all code from https://www.datacamp.com/tutorial/introduction-q-learning-beginner-tutorial

In [None]:
%%capture
!pip install pyglet==1.5.1
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip3 install pyvirtualdisplay

# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

In [None]:
%%capture
!pip install gym==0.24
!pip install pygame
!pip install numpy

!pip install imageio imageio_ffmpeg

In [2]:
import numpy as np
import gym
import random
import imageio
from tqdm.notebook import trange



In [3]:
env = gym.make("FrozenLake-v1",map_name="4x4",is_slippery=False)

print("Observation Space", env.observation_space)
print("Sample observation", env.observation_space.sample()) # display a random observation
print("Action Space Shape", env.action_space.n)
print("Action Space Sample", env.action_space.sample())

Observation Space Discrete(16)
Sample observation 5
Action Space Shape 4
Action Space Sample 3


In [4]:
state_space = env.observation_space.n
print("There are ", state_space, " possible states")

action_space = env.action_space.n
print("There are ", action_space, " possible actions")


There are  16  possible states
There are  4  possible actions


In [5]:
def initialize_q_table(state_space, action_space):
  Qtable = np.zeros((state_space, action_space))
  return Qtable

Qtable_frozenlake = initialize_q_table(state_space, action_space)

In [6]:
def epsilon_greedy_policy(Qtable, state, epsilon):
  random_int = random.uniform(0,1)
  if random_int > epsilon:
    action = np.argmax(Qtable[state])
  else:
    action = env.action_space.sample()
  return action

In [7]:
def greedy_policy(Qtable, state):
  action = np.argmax(Qtable[state])
  return action

In [8]:
# Training parameters
n_training_episodes = 1000
learning_rate = 0.7        

# Evaluation parameters
n_eval_episodes = 100      

# Environment parameters
env_id = "FrozenLake-v1"   
max_steps = 99             
gamma = 0.95               
eval_seed = []             

# Exploration parameters
max_epsilon = 1.0           
min_epsilon = 0.05           
decay_rate = 0.0005           

In [9]:
def train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable):
  for episode in trange(n_training_episodes):
 
    epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode)
    # Reset the environment
    state = env.reset()
    step = 0
    done = False

    # repeat
    for step in range(max_steps):
   
      action = epsilon_greedy_policy(Qtable, state, epsilon)

   
      new_state, reward, done, info = env.step(action)

   
      Qtable[state][action] = Qtable[state][action] + learning_rate * (reward + gamma * np.max(Qtable[new_state]) - Qtable[state][action])

      # If done, finish the episode
      if done:
        break
     
      # Our state is the new state
      state = new_state
  return Qtable

In [15]:
Qtable_frozenlake = train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable_frozenlake)
Qtable_frozenlake

  0%|          | 0/1000 [00:00<?, ?it/s]

array([[0.73509189, 0.77378094, 0.77378094, 0.73509189],
       [0.73509189, 0.        , 0.81450625, 0.77378094],
       [0.77378094, 0.857375  , 0.77378094, 0.81450625],
       [0.81450625, 0.        , 0.77378094, 0.77378094],
       [0.77378094, 0.81450625, 0.        , 0.73509189],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.9025    , 0.        , 0.81450625],
       [0.        , 0.        , 0.        , 0.        ],
       [0.81450625, 0.        , 0.857375  , 0.77378094],
       [0.81450625, 0.9025    , 0.9025    , 0.        ],
       [0.857375  , 0.95      , 0.        , 0.857375  ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.9025    , 0.95      , 0.857375  ],
       [0.9025    , 0.95      , 1.        , 0.9025    ],
       [0.        , 0.        , 0.        , 0.        ]])

In [27]:
import gym
from gym import envs
import time
import pyautogui
import random
import cv2
import numpy as np
import sys
sys.path.append('C:\\Users\\jim\\git\\cvwork\\src\\gym-maze')
import gym_maze
import matplotlib
matplotlib.use('TkAgg')  # avoid non-GUI warning for matplotlib
from IPython.display import display, HTML
from tqdm.notebook import trange
from gym_maze.envs import MazeEnv
from gym_maze.envs.generators import SimpleMazeGenerator, RandomMazeGenerator, RandomBlockMazeGenerator, \
                                     UMazeGenerator, TMazeGenerator, WaterMazeGenerator
from gym_maze.envs.Astar_solver import AstarSolver
import numpy as np
import gym
from gym import spaces
import random
import imageio
from gym_maze.envs import MazeEnv  # 假设 maze.py 文件在同一目录下
from collections import defaultdict



class myMazeEnv(MazeEnv):
    def __init__(self, maze_generator, action_type='VonNeumann', render_trace=False):
        super(myMazeEnv, self).__init__(maze_generator, action_type=action_type, render_trace=render_trace)
        
        # 定义新的观察空间，前后左右四个方向，每个方向有4种可能：墙、路、终点
        self.observation_space = spaces.MultiDiscrete([4, 4, 4, 4])

    def _get_observation(self):
        # 获取前后左右四个方向的状态
        # 0: 墙, 1: 路, 2: 终点
        observation = []
        for direction in ['up', 'down', 'left', 'right']:
            observation.append(self._check_direction(direction))
        return np.array(observation)

    def _check_direction(self, direction):
        # 检查指定方向的状态
        # 返回0（墙），1（路）或2（终点）
        x, y = self.state
        if direction == 'up':
            new_x, new_y = x - 1, y
        elif direction == 'down':
            new_x, new_y = x + 1, y
        elif direction == 'left':
            new_x, new_y = x, y - 1
        elif direction == 'right':
            new_x, new_y = x, y + 1
        else:
            raise ValueError("Invalid direction")

        if new_x < 0 or new_x >= self.maze.shape[0] or new_y < 0 or new_y >= self.maze.shape[1]:
            return 0  # 墙
        elif (new_x, new_y) in self.goal_states:
            return 2  # 终点
        elif self.maze[new_x, new_y] == 1:
            return 0  # 墙
        else:
            return 1  # 路
        
        # 初始化迷宫环境
maze = RandomMazeGenerator(width=25, height=15, complexity=.5, density=.5)
env = myMazeEnv(maze, action_type='VonNeumann', render_trace=False)
#env = MazeEnv(maze, action_type='VonNeumann', render_trace=False)
env.reset()
env.init_state = (13, 23)
env.state = env.init_state
env.traces = [env.init_state]
env.goal_states = [(1, 1)]

print("Observation Space", env.observation_space)
#print("Sample observation", env.observation_space.sample())  # 显示一个随机的观察
print("Action Space Shape", env.action_space.n)
#print("Action Space Sample", env.action_space.sample())
#state_space = env.observation_space
state_space = env.observation_space
print("There are ", state_space, " possible states")

action_space = env.action_space.n
print("There are ", action_space, " possible actions")

def initialize_q_table(state_space, action_space):
    # For MultiDiscrete observation space, we need to calculate the total number of states
    num_states = np.prod(state_space.nvec)
    Qtable = np.zeros((num_states, action_space))
    return Qtable

Qtable_frozenlake = initialize_q_table(env.observation_space, action_space)

def epsilon_greedy_policy(Qtable, state, epsilon):
  random_int = random.uniform(0,1)
  if random_int > epsilon:
    action = np.argmax(Qtable[state])
  else:
    action = env.action_space.sample()
  return action

def greedy_policy(Qtable, state):
  action = np.argmax(Qtable[state])
  return action



# Training parameters
n_training_episodes = 1000
learning_rate = 0.7        

# Evaluation parameters
n_eval_episodes = 100      

# Environment parameters
env_id = "FrozenLake-v1"   
max_steps = 99             
gamma = 0.95               
eval_seed = []             

# Exploration parameters
max_epsilon = 1.0           
min_epsilon = 0.05           
decay_rate = 0.0005   
     
       
# Q-learning parameters
alpha = 0.1          # Learning rate
gamma = 0.9          # Discount factor
epsilon = 1.0        # Exploration rate
epsilon_decay = 0.995
min_epsilon = 0.01
num_episodes = 1000  # Number of episodes for training
max_steps = 100      # Max steps per episode

# Initialize Q-table as a dictionary of dictionaries (for state-action pairs)
Q_table = defaultdict(lambda: np.zeros(env.action_space.n))

# Function to choose an action based on epsilon-greedy policy
def choose_action(state):
    if random.uniform(0, 1) < epsilon:
        # Explore: choose a random action
        return env.action_space.sample()
    else:
        # Exploit: choose the best action based on current Q-table
        return np.argmax(Q_table[state])

# Q-learning training loop
def train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable, learning_rate=0.1, gamma=0.99):
    for episode in trange(n_training_episodes):
        # Decay epsilon
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay_rate * episode)

        # Reset the environment
        state = env.reset()
        done = False

        for step in range(max_steps):
            # Choose action according to the epsilon-greedy policy
            action = epsilon_greedy_policy(Qtable, state, epsilon)
            
            # Take the action and observe the new state and reward
            new_state, reward, done, _ = env.step(action)

            # Update the Q-table using the Q-learning formula
            Qtable[state][action] += learning_rate * (reward + gamma * np.max(Qtable[new_state]) - Qtable[state][action])

            # Update the state
            state = new_state

            # If done, finish the episode
            if done:
                break

    return Qtable




Observation Space MultiDiscrete([4 4 4 4])
Action Space Shape 4
There are  MultiDiscrete([4 4 4 4])  possible states
There are  4  possible actions


In [28]:
from tqdm.notebook import trange

trained_Qtable = train(1000, 0.01, 1.0, 0.0005, env, 100, Qtable_frozenlake)
print(trained_Qtable)

  0%|          | 0/1000 [00:00<?, ?it/s]

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 ...
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
