In [4]:
import os
import math
import time
import heapq
import random
import json
import zipfile
import io
from collections import defaultdict
from itertools import product
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from google.colab import files

In [7]:
# must upload mazes and unzip
uploaded = files.upload()
for fname in uploaded:
    with zipfile.ZipFile(io.BytesIO(uploaded[fname]), "r") as zip_ref:
        zip_ref.extractall("imperfect_maze")

Saving imperfect_maze.zip to imperfect_maze (2).zip


In [8]:
def split_mazes(maze_files, train_count=100, seed=42):
    # 100 random mazes for training
    random.seed(seed)
    maze_files = list(maze_files)
    random.shuffle(maze_files)
    train_mazes = maze_files[:train_count]
    test_mazes = maze_files[train_count:]
    return train_mazes, test_mazes

In [9]:
def load_maze(path):
    return np.loadtxt(path, dtype=int)

def find_entrance_and_exit(maze):
    h, w = maze.shape
    entrances = [(r,0) for r in range(h) if maze[r,0]==0]
    exits     = [(r,w-1) for r in range(h) if maze[r,w-1]==0]
    if not entrances or not exits:
        raise ValueError("No open boundary cells")
    return entrances[0], exits[0]

def neighbors(pos, maze):
    h, w = maze.shape
    x, y = pos
    for dx, dy in [(1,0),(-1,0),(0,1),(0,-1)]:
        nx, ny = x+dx, y+dy
        if 0 <= nx < h and 0 <= ny < w and maze[nx,ny]==0:
            yield (nx,ny)

In [10]:
class MazeEnvironment:
    def __init__(self, maze, start, goal):
        self.maze = maze
        self.start = start
        self.goal = goal
        self.state = start
        self.h, self.w = maze.shape

    def reset(self):
        self.state = self.start
        return self.state

    def step(self, action):
        x, y = self.state
        dx, dy = action
        nx, ny = x + dx, y + dy
        if 0 <= nx < self.h and 0 <= ny < self.w and self.maze[nx, ny] == 0:
            self.state = (nx, ny)
        if self.state == self.goal:
            return self.state, 0.0, True
        else:
            return self.state, -1.0, False

In [11]:
class TabularQLearning:
    def __init__(self, actions, discount, explorationProb=0.2, initialQ=0.0, step_size=0.5):
        self.actions = actions
        self.discount = discount
        self.explorationProb = explorationProb
        self.Q = defaultdict(lambda: initialQ)
        self.step_size = step_size
        self.numIters = 0

    def getAction(self, state, explore=True):
        if explore:
            self.numIters += 1
        if explore and random.random() < self.explorationProb:
            return random.choice(self.actions)
        else:
            return max(self.actions, key=lambda a: self.Q[(state, a)])

    def getStepSize(self):
        return self.step_size

    def incorporateFeedback(self, state, action, reward, nextState, terminal):
        if terminal:
            target = reward
        else:
            max_q_next = max(self.Q[(nextState, a)] for a in self.actions)
            target = reward + self.discount * max_q_next
        self.Q[(state, action)] += self.getStepSize() * (target - self.Q[(state, action)])


In [12]:
def train_q_learning(maze, start, goal, actions,
                     episodes=500,
                     explorationProb=0.2,
                     discount=1.0,
                     step_size=0.5):

    env = MazeEnvironment(maze, start, goal)
    agent = TabularQLearning(actions=actions, discount=discount,explorationProb=explorationProb, step_size=step_size)

    for _ in range(episodes):
        state = env.reset()
        done = False
        steps = 0
        while not done and steps < 500:
            action = agent.getAction(state)
            next_state, reward, done = env.step(action)
            agent.incorporateFeedback(state, action, reward, next_state, done)
            state = next_state
            steps += 1

    return agent.Q

def learned_heuristic(Q, actions):
    # goal is ignored but needed to be used alongside other heuristic functions
    def h(pos, goal):
        return -min(Q[(pos, a)] for a in actions)
    return h

In [13]:
def manhattan(a, b):
    return abs(a[0]-b[0]) + abs(a[1]-b[1])

def euclidean(a, b):
    return math.hypot(a[0]-b[0], a[1]-b[1])

In [14]:
def astar(maze, start, goal, heuristic):
    """
    A* search on a 2D grid of 0=open,1=wall.
    Returns the path as a list of (r,c), or [] if no path.
    """
    open_set = [(heuristic(start, goal), 0, start, None)]
    came_from = {}
    gscore = { start: 0 }

    while open_set:
        f, g, current, parent = heapq.heappop(open_set)
        if current in came_from:
            continue
        came_from[current] = parent

        if current == goal:
            # reconstruct path
            path = []
            while current:
                path.append(current)
                current = came_from[current]
            return path[::-1]

        for nb in neighbors(current, maze):
            tentative_g = g + 1
            if nb in gscore and tentative_g >= gscore[nb]:
                continue
            gscore[nb] = tentative_g
            fscore = tentative_g + heuristic(nb, goal)
            heapq.heappush(open_set, (fscore, tentative_g, nb, current))

    return []  # no path


In [15]:
def tune_qlearning_hyperparams(
    maze_files,
    episode_values=[200, 500],
    exploration_probs=[0.1, 0.2],
    discounts=[0.9, 1.0],
    step_sizes=[0.1, 0.5],
    actions=[(1,0), (-1,0), (0,1), (0,-1)],
    num_starts=25,
    score_weights=(1.0, 1.0, 0.1)
):

    random.seed(42)
    maze_files = random.sample(maze_files, 100)  # Randomly select 100 mazes

    w_train, w_solve, w_path = score_weights
    results = []

    combos = list(product(episode_values, exploration_probs, discounts, step_sizes))

    for i, (ep, eps, gamma, alpha) in enumerate(combos, 1):
        print(f"🔍 Config {i}/{len(combos)} — episodes={ep}, ε={eps}, γ={gamma}, α={alpha}")

        total_lengths = []
        total_times = []
        total_paths = 0
        total_success = 0
        train_durations = []

        for maze_file in maze_files:
            maze = load_maze(maze_file)
            goal = find_entrance_and_exit(maze)[1]
            starts = random.sample(all_open_cells(maze), k=num_starts)

            t_train_start = time.perf_counter()
            Q = train_q_learning(
                maze, goal, goal, actions,
                episodes=ep,
                explorationProb=eps,
                discount=gamma,
                step_size=alpha
            )
            t_train_end = time.perf_counter()
            train_durations.append(t_train_end - t_train_start)

            hfunc = learned_heuristic(Q, actions)

            for start in starts:
                if start == goal:
                    continue

                t0 = time.perf_counter()
                path = astar(maze, start, goal, hfunc)
                t1 = time.perf_counter()

                total_paths += 1
                if path:
                    total_success += 1
                    total_lengths.append(len(path))
                    total_times.append(t1 - t0)

        avg_train_time = np.mean(train_durations)
        avg_solve_time = np.mean(total_times) if total_success else float('inf')
        avg_path_length = np.mean(total_lengths) if total_success else float('inf')

        score = (
            w_train * avg_train_time +
            w_solve * avg_solve_time +
            w_path  * avg_path_length
        )

        results.append({
            "episodes": ep,
            "exploration_prob": eps,
            "discount": gamma,
            "step_size": alpha,
            "avg_path_length": avg_path_length,
            "avg_solve_time": avg_solve_time,
            "avg_train_time": avg_train_time,
            "success_rate": total_success / total_paths if total_paths > 0 else 0.0,
            "total_paths": total_paths,
            "score": score
        })

    results.sort(key=lambda r: r["score"])
    return results

In [16]:
input_dir = "imperfect_maze/imperfect_maze"
maze_files = [os.path.join(input_dir, f) for f in sorted(os.listdir(input_dir)) if f.endswith(".txt")]
train_mazes, test_mazes = split_mazes(maze_files)

In [17]:
def all_open_cells(maze):
    h, w = maze.shape
    return [(r, c) for r in range(h) for c in range(w) if maze[r, c] == 0]

In [26]:
tuned_results = tune_qlearning_hyperparams(train_mazes, score_weights=(1.0, 1.0, 0.2))

df_tuning = pd.DataFrame(tuned_results)
df_tuning.to_csv("tuning_results.csv", index=False)

with open("tuning_results.json", "w") as f:
    json.dump(tuned_results, f, indent=2)

In [18]:
#best = tuned_results[0]
best = {
            "episodes": 500,
            "exploration_prob": 0.2,
            "discount": 0.9,
            "step_size": 0.5,
        }
actions = [(1,0), (-1,0), (0,1), (0,-1)]
results = []
maze_count = 0
total_mazes = len(test_mazes)

for fname in sorted(test_mazes):
    maze_count += 1
    print(f"Testing maze {maze_count}/{total_mazes}: {fname}")
    maze = load_maze(fname)

    _, goal = find_entrance_and_exit(maze)
    starts = random.sample(all_open_cells(maze), k=min(25, len(all_open_cells(maze))))

    heuristics = [("manhattan", manhattan), ("euclidean", euclidean)]

    # train Q-learning once per maze using best hyperparameters
    Q = train_q_learning(
        maze, goal, goal, actions,
        episodes=best["episodes"],
        explorationProb=best["exploration_prob"],
        discount=best["discount"],
        step_size=best["step_size"]
    )
    heuristics.append(("qlearned", learned_heuristic(Q, actions)))

    # evaluate each heuristic from every valid start
    for name, hfunc in heuristics:
        for start in starts:
            if start == goal:
                continue
            t0 = time.perf_counter()
            path_cells = astar(maze, start, goal, hfunc)
            t1 = time.perf_counter()

            results.append({
                "file":        fname,
                "heuristic":   name,
                "start":       start,
                "goal":        goal,
                "solve_time":  t1 - t0,
                "path_length": len(path_cells) if path_cells else -1,
                "success":     bool(path_cells)
            })

    print(f"✔ {fname} ({len(starts)} starts × {len(heuristics)} heuristics)")

output_file = "final_evaluation.json"
with open(output_file, "w") as f:
    json.dump(results, f, indent=2)

df = pd.DataFrame(results)
df.to_csv("final_evaluation.csv", index=False)

print("Done")

Testing maze 1/1400: imperfect_maze/imperfect_maze/maze0_dim52.txt
✔ imperfect_maze/imperfect_maze/maze0_dim52.txt (25 starts × 3 heuristics)
Testing maze 2/1400: imperfect_maze/imperfect_maze/maze1000_dim83.txt
✔ imperfect_maze/imperfect_maze/maze1000_dim83.txt (25 starts × 3 heuristics)
Testing maze 3/1400: imperfect_maze/imperfect_maze/maze1001_dim28.txt
✔ imperfect_maze/imperfect_maze/maze1001_dim28.txt (25 starts × 3 heuristics)
Testing maze 4/1400: imperfect_maze/imperfect_maze/maze1002_dim141.txt
✔ imperfect_maze/imperfect_maze/maze1002_dim141.txt (25 starts × 3 heuristics)
Testing maze 5/1400: imperfect_maze/imperfect_maze/maze1003_dim57.txt
✔ imperfect_maze/imperfect_maze/maze1003_dim57.txt (25 starts × 3 heuristics)
Testing maze 6/1400: imperfect_maze/imperfect_maze/maze1004_dim143.txt
✔ imperfect_maze/imperfect_maze/maze1004_dim143.txt (25 starts × 3 heuristics)
Testing maze 7/1400: imperfect_maze/imperfect_maze/maze1005_dim43.txt
✔ imperfect_maze/imperfect_maze/maze1005_dim

In [20]:
files.download("tuning_results.csv")
files.download("tuning_results.json")
files.download("final_no_training_evaluation.csv")
files.download("final_no_training_evaluation.json")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>