In [1]:
import numpy as np
import heapq

agent = 2
goal = 3
dim = 50
p = 0.9
walls = 1
path = 0

class State:
    def __init__(self, i, j):
        self.s = [i, j]
    
    def __add__(self, other):
        i, j = self.s
        m, n = other.s
        return State(i+m, j+n)

    def __hash__(self):
        return hash(tuple(self.s))
    
    def __eq__(self, other):
        if isinstance(other, State):
            return self.s == other.s
        return False
    def __lt__(self, other):
        # Compare the priorities of the states
        return self.priority() < other.priority()
    
    def priority(self):
        # Define the priority of the state based on your desired criteria
        # For example, you can use the Manhattan distance to the goal state
        goal_state = State(dim - 1, dim - 1)
        return abs(self.s[0] - goal_state.s[0]) + abs(self.s[1] - goal_state.s[1])

    def neighbors(self, maze):
        neighbors = []
        neighbors.append(self + State(-1, 0))  # Up
        neighbors.append(self + State(1, 0))   # Down
        neighbors.append(self + State(0, -1))  # Left
        neighbors.append(self + State(0, 1))   # Right
        valid_neighbors = []
        for neighbor in neighbors:
            if neighbor.s[0] >= 0 and neighbor.s[0] < maze.shape[0] and \
               neighbor.s[1] >= 0 and neighbor.s[1] < maze.shape[1] and \
               maze[neighbor] != walls:
                valid_neighbors.append(neighbor)
        return valid_neighbors
    
    
class Maze:
    def __init__(self, dim, prob):
        self.maze = {}
        self.shape = (dim, dim)
        for i in range(dim):
            for j in range(dim):
                self.maze[State(i, j)] = np.random.choice([path, walls], p = [1-prob, prob])
        self.states = self.maze.keys()
        
        self.start_state = State(0,0)
        self.goal_state = State(dim-1, dim-1)

        self.maze[self.goal_state] = goal
        self.maze[self.start_state] = agent
    def __getitem__(self, state: State):
        if isinstance(state, State):
            return self.maze[state]
        else:
            try:
                s = State(state[0], state[1])
                return self.maze[s]
            except:
                print("error when calling the get_item of maze")
                exit(0)
    def update(self, state, new_state):
        if self.maze[new_state] != walls and self.maze[new_state] != goal:
            self.maze[state] = path
            self.maze[new_state] = agent

    def a_star_search(self):
        start_state = self.start_state
        goal_state = self.goal_state
        frontier = [(0, start_state)]  # Priority queue, starting with the start state
        came_from = {}  # Dictionary to track the parent state for each visited state
        cost_so_far = {}  # Dictionary to track the cost to reach each state
        
        came_from[start_state] = None
        cost_so_far[start_state] = 0
        
        while frontier:
            _, current_state = heapq.heappop(frontier)  # Pop the state with the lowest priority from the frontier
            
            if current_state == goal_state:
                break
            
            for next_state in current_state.neighbors(self):
                new_cost = cost_so_far[current_state] + 1  # Assuming a cost of 1 to move to the next state
                if next_state not in cost_so_far or new_cost < cost_so_far[next_state]:
                    cost_so_far[next_state] = new_cost
                    priority = new_cost + heuristic(next_state, goal_state)
                    heapq.heappush(frontier, (priority, next_state))
                    came_from[next_state] = current_state
        
        # Reconstruct the path from the goal state to the start state
        path = []
        current_state = goal_state
        while current_state != start_state:
            path.append(current_state)
            current_state = came_from[current_state]
        path.append(start_state)
        path.reverse()
        
        return path


def heuristic(state, goal_state):
    # Calculate the Manhattan distance between two states
    return abs(state.s[0] - goal_state.s[0]) + abs(state.s[1] - goal_state.s[1])
    
def create_maze(dim = 10, p = 0.1):
    #we create a maze with dimension dim x dim that have walls with probability p
    maze = np.random.choice([0,1], size = (dim,dim), p = [1-p,p])
    maze[0,0] = agent
    maze[dim-1,dim-1] = goal
    #the agent will start in position 0,0 and the goal is to reach position dim-1,dim-1
    return maze.astype("int")

maze = Maze(dim, p)
print(maze)

<__main__.Maze object at 0x7f5edf6039d0>


In [2]:
actions = [State(-1,0),State(1,0),State(0,-1),State(0,1), State(0, 0)] #up, down, left, right
states = []
for i in range(maze.shape[0]):
    for j in range(maze.shape[1]):
        states.append(State(i,j))

def eval_state(maze, state, dim, action):
    if action.s == [0, 0]: return -10
    x, y = state.s
    if x < 0 or x >= dim or y < 0 or y >= dim: return -10*dim
    cell = maze[state]
    if cell == 0: return -1
    if cell == 1: return -10*dim
    if cell == goal: return 10*dim
    if cell == agent: return -dim

print(eval_state(maze, State(dim-1, dim-1), dim, State(0,1)))

500


In [3]:
def available_actions(state, actions, dim):
    available = []
    for action in actions:
        new_state = state + action
        x, y = new_state.s
        if x >= 0 and x < dim and y >= 0 and y < dim: available.append(action)
    
    return available

class ProposedActions:
    def __init__(self, dim):
        self.maze = maze
        self.dict = {}
        for i in range(dim):
            for j in range(dim):
                self.dict[State(i,j)] = available_actions(State(i, j), actions, dim)[0]
    
    def __getitem__(self, state):
        if isinstance(state, State):
            return self.dict[state]
        else:
            try:
                s = State(state[0], state[1])
                return self.dict[s]
            except:
                print("error when calling the get_item of maze")
                exit(0)
    
    
proposed_actions = ProposedActions(dim)
proposed_actions[0, 0].s

[1, 0]

In [4]:
#check if all the proposed states are in available actions
for state in states:
    for proposed in available_actions(state, actions, dim):
        temp = state + proposed
        temp = temp.s
        if temp[0] < 0 or temp[0] >= dim or temp[1] < 0 or temp[1] >= dim:
            print(state, temp, proposed)

for state in states:
    try:
        proposed = proposed_actions[state]
    except IndexError:
        print(state)
        break
    new_state = state + proposed
    temp = new_state.s
    if temp[0] < 0 or temp[0] >= dim or temp[1] < 0 or temp[1] >= dim:
        print(state, temp, proposed)

In [5]:
class QTable:
    def __init__(self, maze, actions, dim):
        self.q_table = {}
        for state in maze.states:
            self.q_table[state] = {}
            for available in available_actions(state, actions, dim):
                self.q_table[state][available] = 0
        
    
    def __getitem__(self, index):
        state, action = index
        if action in available_actions(state, actions, dim):
            return self.q_table[state][action]
        else:
            return None

    def update(self, state, action, new_state, reward, alpha, gamma):
        current_q_value = self.q_table[state][action]
        max_future_q_value = max(self.q_table[new_state].values())

        new_q_value = (1 - alpha) * current_q_value + alpha * (reward + gamma * max_future_q_value)
        self.q_table[state][action] = new_q_value

    def get_action(self, state, epsilon, actions, dim):
        availables = available_actions(state, actions, dim)
        if np.random.random() < epsilon:
            action = np.random.choice(availables)
        else:
            action = max(self.q_table[state], key = self.q_table[state].get)

        return action

Q = QTable(maze, actions, dim)


state = State(0,0)
action = State(1, 0)
Q[state, action]

0

In [6]:
#defining parameters
lr = 0.05
gamma = 0.9
eps = 0.1 #exploration-exploration trade off, the higher the more exploration

In [7]:
from tqdm import tqdm

def trip(maze, dim, actions, Q, lr = 0.2, gamma = 0.9, eps = 0.1):
    player = State(0, 0)
    iter = 100000
    tot_reward = 0
    path = []
    path.append(player)
    while maze[player] != goal and iter != 0:
        iter -= 1
        action = Q.get_action(player, eps, actions, dim)
        new_state = player + action            
        reward = eval_state(maze, new_state, dim, action)
        Q.update(player, action, new_state, reward, lr, gamma)
        player = new_state
        path.append(player)
        tot_reward += reward
    
    return player, tot_reward, path

def train(maze, actions, dim, epochs = 3000):
    Q = QTable(maze, actions, dim)
    finished = 0
    best_reward = -1000000
    for epoch in tqdm(range(epochs)):
        state, reward, _ = trip(maze, dim, actions, Q)
        if reward == -100:
            continue
        
        if state.s == [dim-1, dim-1]: finished += 1
        if best_reward < reward: best_reward = reward
    
    return finished / epochs, best_reward, Q

In [12]:
import sys
try:
    maze.a_star_search()
    finishing_rate, best, Q = train(maze, actions, dim)
    print(finishing_rate, best)
except KeyError:
    print("The maze is impossible to resolve")
    sys.exit(0)

The maze is impossible to resolve


AttributeError: 'tuple' object has no attribute 'tb_frame'

In [10]:
def color(maze, i, j, rl_player, a_player, history, history_a, A_star = False):
    # Define colors
    PATH_COLOR = (255, 255, 255)
    WALL_COLOR = (0, 0, 0)
    GOAL_COLOR = (255, 0, 0)
    RL_AGENT_COLOR = (0, 0, 255)
    A_AGENT_COLOR = (0, 255, 255)
    TRACK_COLOR = (0, 255, 0)
    TRACK_COLOR_A = (128, 128, 0)

    val = maze[State(i, j)]
    if [i, j] in history and [i, j] != rl_player.s and [i, j] != a_player.s:
        return TRACK_COLOR   
    if [i, j] in history_a and [i, j] != rl_player.s and [i, j] != a_player.s:
        return TRACK_COLOR_A   
    if val == agent or [i, j] == rl_player.s:
        return RL_AGENT_COLOR
    if val == agent or [i, j] == a_player.s:
        return A_AGENT_COLOR
    elif val == walls:
        return WALL_COLOR
    elif val == goal:
        return GOAL_COLOR
    else: return PATH_COLOR

In [51]:
import pygame
from time import sleep

# Initialize Pygame and create a window
pygame.init()
wait = 100
screen_size = 1000
screen = pygame.display.set_mode((screen_size, screen_size))
WHITE = (255, 255, 255)

# Define grid parameters
grid_size = dim
cell_size = screen_size / grid_size

# Main visualization loop
running = True
history = []
history_a = []
player = State(0, 0)
eps = 0.0 #no exploration
total_reward = 0
player, reward, rl_path = trip(maze, dim, actions, Q, lr = 0.1, gamma = 0.9, eps = 0.0)
a_path = maze.a_star_search()

while running:
    #pygame.time.wait(3000)
    rl_player = rl_path[0]
    a_player = a_path[0]
    for state1, state2 in zip(rl_path[1:], a_path[1:]):
        # Handle events
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False

        history.append(rl_player.s)
        history_a.append(a_player.s)
        rl_player = state1
        a_player = state2
        # Clear the screen
        screen.fill(WHITE)

        # Draw the grid
        for row in range(grid_size):
            for col in range(grid_size):
                # Calculate the cell position and dimensions
                x = col * cell_size
                y = row * cell_size
                points = [
                    (x, y),
                    (x + cell_size, y),
                    (x + cell_size, y + cell_size),
                    (x, y + cell_size),
                ]
                # Draw the cell
                pygame.draw.polygon(screen, color(maze, row, col, rl_player, a_player, history, history_a), points)

        # Update the display
        pygame.display.flip()
        pygame.time.wait(wait)
    
    running = False

# Clean up Pygame
pygame.quit()

print(f"A* path len: {len(a_path)}, RL path len: {len(rl_path)}")

KeyError: <__main__.State object at 0x7efea68d9ed0>

: 