In [17]:

"""
License
https://www.youtube.com/c/LearningOrbis
Copyright (c) 2021 Muhammad Ahsan Naeem
mahsan.naeem@gmail.com


Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

In [2]:
from pyamaze import maze, agent, textLabel
# here i created and saved mazes for the comparisons
mazeWidth, mazeHeight = 70,70
m = maze(mazeWidth, mazeHeight)
m.CreateMaze(loopPercent=50, saveMaze=True, theme = 'light')
a = agent(m, color='red', filled='True')
textLabel(m, 'Size of Maze', f'{mazeWidth}x{mazeHeight}')
textLabel(m, 'Start Point', 'Red Cell')
textLabel(m, 'End Point', 'Green Cell')
m.run()

Search Algorithms - BFS, DFS and A*

In [1]:
from pyamaze import maze, agent, COLOR, textLabel
from queue import PriorityQueue
import time
import math
import tracemalloc

#this is my base class for search algorithms
class SearchAlgos:
    def __init__(self, mazeSize=(1,1), start=None):
        self.m = maze()
        mazeFileName = f'{mazeSize[0]}x{mazeSize[1]}Maze.csv'
        self.m.CreateMaze(loadMaze=mazeFileName, theme='light')
        #self.m = maze(mazeSize[0], mazeSize[1])
        #self.m.CreateMaze(loopPercent=50, saveMaze=True, theme = 'light')
        textLabel(self.m, 'Size of Maze', f'{mazeSize[0]}x{mazeSize[1]}')
        self.start = start if start else (self.m.rows, self.m.cols)
        self.path = {}
        self.searchPath = [self.start]
        self.totalTime = 0
        self.nodesExplored = 0
        self.initAgents()

    def initAgents(self):

        self.agents = {
            'searchAgent': agent(self.m, footprints=True, color=COLOR.cyan, shape='square', filled=True),
            'pathAgent': agent(self.m, footprints=True, color=COLOR.red, shape='arrow', filled=False),
   
        }

    def vizSearchPath(self):

        _, _, fwdPath = self._vizSearchPath_internal()
        fpathlist = [self.start]
        cel = self.start
        while cel in fwdPath:
            cel = fwdPath[cel]
            fpathlist.append(cel)

        self.m.tracePath({self.agents['searchAgent']: self.searchPath}, delay=10)
        self.m.tracePath({self.agents['pathAgent']: fpathlist}, delay=10)

    def _vizSearchPath_internal(self):
        fwdPath = {}
        cel = self.m._goal
        while cel != self.start:
            fwdPath[self.path[cel]] = cel
            cel = self.path[cel]
        return self.searchPath, self.path, fwdPath

    def algoStats(self, algorithm_name="Search Algorithm", mazeSize=(1,1)):
        _, _, fwdPath = self._vizSearchPath_internal()
        bestPathLen = len(fwdPath) + 1
        print("Statistics for {} on Maze Size {}x{}:".format(algorithm_name, mazeSize[0], mazeSize[1]))
        print(f"Nodes Explored by {algorithm_name}: {self.nodesExplored}")
        print(f"Time Taken: {self.totalTime:.2f} milliseconds")
        print(f"Shortest Path Length: {bestPathLen}")
        print(f"Memory Used: {self.memoryUsed / 1024:.2f} KB")

    def _move(self, cel, dirr):
        if dirr == 'E':
            return (cel[0], cel[1] + 1)
        elif dirr == 'W':
            return (cel[0], cel[1] - 1)
        elif dirr == 'S':
            return (cel[0] + 1, cel[1])
        elif dirr == 'N':
            return (cel[0] - 1, cel[1])

#class for BFS
class BFS(SearchAlgos):
    def __init__(self, mazeSize=(1, 1), start=None):
        super().__init__(mazeSize, start)
        self.queue = [self.start]


    def startSearch(self):
        tracemalloc.start()
        beginT = time.time()
        while self.queue:
            presentcel = self.queue.pop(0)
            self.nodesExplored += 1
            if presentcel == self.m._goal:
                break
            for d in 'NSEW':
                if self.m.maze_map[presentcel][d]:
                    nextcel = self._move(presentcel, d)
                    if nextcel in self.searchPath:
                        continue
                    self.queue.append(nextcel)
                    self.searchPath.append(nextcel)
                    self.path[nextcel] = presentcel
        self.totalTime = (time.time() - beginT) * 1000
        self.memoryUsed = tracemalloc.get_traced_memory()[1]  
        tracemalloc.stop()
        self.vizSearchPath()

 #class for DFS       
class DFS(SearchAlgos):
    def __init__(self, mazeSize=(1, 1), start=None):
        super().__init__(mazeSize, start)
        self.stack = [self.start]

    def startSearch(self):
        tracemalloc.start()
        beginT = time.time()
        while self.stack:
            presentcel = self.stack.pop()
            self.nodesExplored += 1
            if presentcel == self.m._goal:
                break
            for d in 'NSEW':
                if self.m.maze_map[presentcel][d]:
                    nextcel = self._move(presentcel, d)
                    if nextcel in self.searchPath:
                        continue
                    self.stack.append(nextcel)
                    self.searchPath.append(nextcel)
                    self.path[nextcel] = presentcel
        self.totalTime = (time.time() - beginT) * 1000
        self.memoryUsed = tracemalloc.get_traced_memory()[1]  
        tracemalloc.stop()
        self.vizSearchPath()

#class for A star with Manhattan distance
class AStar_Manhat(SearchAlgos):
    def __init__(self, mazeSize=(1, 1), start=None):
        super().__init__(mazeSize, start)
        self.open = PriorityQueue()
        self.gcost = {}
        self.fcost = {}

    def heuristic(self, cel1, cel2):
        return abs(cel1[0] - cel2[0]) + abs(cel1[1] - cel2[1])

    def startSearch(self):
        
        goal = (self.m._goal[0], self.m._goal[1])
        self.gcost[self.start] = 0
        self.fcost[self.start] = self.heuristic(self.start, goal)
        self.open.put((self.fcost[self.start], self.heuristic(self.start, goal), self.start))
        tracemalloc.start()
        beginT = time.time()
        while not self.open.empty():
            _, _, presentcel = self.open.get()
            if presentcel == goal:
                break
            for d in 'NSEW':
                if self.m.maze_map[presentcel][d]:
                    nextcel = self._move(presentcel, d)
                    temp_gcost = self.gcost.get(presentcel, float('inf')) + 1
                    temp_fcost = temp_gcost + self.heuristic(nextcel, goal)


                    if temp_fcost < self.fcost.get(nextcel, float('inf')):
                        self.path[nextcel] = presentcel
                        self.gcost[nextcel] = temp_gcost
                        self.fcost[nextcel] = temp_fcost
                        self.open.put((temp_fcost, self.heuristic(nextcel, goal), nextcel))
            self.nodesExplored += 1
        self.totalTime = (time.time() - beginT) * 1000
        self.memoryUsed = tracemalloc.get_traced_memory()[1]  
        tracemalloc.stop()
        self.vizSearchPath()

#class for A star with Euclidian        
class AStar_Euclid(SearchAlgos):
    def __init__(self, mazeSize=(1, 1), start=None):
        super().__init__(mazeSize, start)
        self.open = PriorityQueue()
        self.gcost = {}
        self.fcost = {}

    def heuristic(self, cel1, cel2):
        return math.sqrt((cel1[0] - cel2[0])**2 + (cel1[1] - cel2[1])**2)

    def startSearch(self):
        
        goal = (self.m._goal[0], self.m._goal[1])
        self.gcost[self.start] = 0
        self.fcost[self.start] = self.heuristic(self.start, goal)
        self.open.put((self.fcost[self.start], self.heuristic(self.start, goal), self.start))
        tracemalloc.start()
        beginT = time.time()
        while not self.open.empty():
            _, _, presentcel = self.open.get()
            if presentcel == goal:
                break
            for d in 'NSEW':
                if self.m.maze_map[presentcel][d]:
                    nextcel = self._move(presentcel, d)
                    temp_gcost = self.gcost.get(presentcel, float('inf')) + 1
                    temp_fcost = temp_gcost + self.heuristic(nextcel, goal)

                    if temp_fcost < self.fcost.get(nextcel, float('inf')):
                        self.path[nextcel] = presentcel
                        self.gcost[nextcel] = temp_gcost
                        self.fcost[nextcel] = temp_fcost
                        self.open.put((temp_fcost, self.heuristic(nextcel, goal), nextcel))
            self.nodesExplored += 1
        self.totalTime = (time.time() - beginT) * 1000
        self.memoryUsed = tracemalloc.get_traced_memory()[1]  
        tracemalloc.stop()
        self.vizSearchPath()
        
        

In [2]:
mazeSize = (30, 30) 
bfs = BFS(mazeSize=mazeSize)

bfs.startSearch()

bfs.algoStats("BFS",mazeSize)
bfs.m.run()


Statistics for BFS on Maze Size 30x30:
Nodes Explored by BFS: 896
Time Taken: 16.00 milliseconds
Shortest Path Length: 61
Memory Used: 60.31 KB


In [2]:
mazeSize = (50, 50) 
bfs = BFS(mazeSize=mazeSize)

bfs.startSearch()

bfs.algoStats("BFS",mazeSize)
bfs.m.run()


Statistics for BFS on Maze Size 50x50:
Nodes Explored by BFS: 2487
Time Taken: 106.51 milliseconds
Shortest Path Length: 105
Memory Used: 120.07 KB


In [2]:
mazeSize = (70, 70) 
bfs = BFS(mazeSize=mazeSize)

bfs.startSearch()

bfs.algoStats("BFS",mazeSize)
bfs.m.run()


Statistics for BFS on Maze Size 70x70:
Nodes Explored by BFS: 4900
Time Taken: 486.45 milliseconds
Shortest Path Length: 149
Memory Used: 344.09 KB


In [2]:
mazeSize = (30, 30) 
dfs = DFS(mazeSize=mazeSize)

dfs.startSearch()

dfs.algoStats("DFS", mazeSize)
dfs.m.run()

Statistics for DFS on Maze Size 30x30:
Nodes Explored by DFS: 580
Time Taken: 9.00 milliseconds
Shortest Path Length: 301
Memory Used: 61.21 KB


In [2]:
mazeSize = (50, 50) 
dfs = DFS(mazeSize=mazeSize)

dfs.startSearch()

dfs.algoStats("DFS", mazeSize)
dfs.m.run()

Statistics for DFS on Maze Size 50x50:
Nodes Explored by DFS: 1638
Time Taken: 67.51 milliseconds
Shortest Path Length: 677
Memory Used: 121.49 KB


In [2]:
mazeSize = (70, 70) 
dfs = DFS(mazeSize=mazeSize)

dfs.startSearch()

dfs.algoStats("DFS", mazeSize)
dfs.m.run()

Statistics for DFS on Maze Size 70x70:
Nodes Explored by DFS: 3430
Time Taken: 276.04 milliseconds
Shortest Path Length: 1265
Memory Used: 287.48 KB


In [2]:
mazeSize = (30, 30) 
astar_manhat = AStar_Manhat(mazeSize=mazeSize)

astar_manhat.startSearch()

astar_manhat.algoStats('AStar_Manhat', mazeSize)
astar_manhat.m.run()

Statistics for AStar_Manhat on Maze Size 30x30:
Nodes Explored by AStar_Manhat: 165
Time Taken: 2.00 milliseconds
Shortest Path Length: 61
Memory Used: 35.13 KB


In [2]:
mazeSize = (50, 50) 
astar_manhat = AStar_Manhat(mazeSize=mazeSize)

astar_manhat.startSearch()

astar_manhat.algoStats('AStar_Manhat', mazeSize)
astar_manhat.m.run()

Statistics for AStar_Manhat on Maze Size 50x50:
Nodes Explored by AStar_Manhat: 1562
Time Taken: 21.00 milliseconds
Shortest Path Length: 105
Memory Used: 273.74 KB


In [2]:
mazeSize = (70, 70) 
astar_manhat = AStar_Manhat(mazeSize=mazeSize)

astar_manhat.startSearch()

astar_manhat.algoStats('AStar_Manhat', mazeSize)
astar_manhat.m.run()

Statistics for AStar_Manhat on Maze Size 70x70:
Nodes Explored by AStar_Manhat: 2431
Time Taken: 32.51 milliseconds
Shortest Path Length: 149
Memory Used: 271.85 KB


In [2]:
mazeSize = (30, 30) 
astar_euc = AStar_Euclid(mazeSize=mazeSize)

astar_euc.startSearch()

astar_euc.algoStats('AStar_Euclid', mazeSize)
astar_euc.m.run()

Statistics for AStar_Euclid on Maze Size 30x30:
Nodes Explored by AStar_Euclid: 652
Time Taken: 12.01 milliseconds
Shortest Path Length: 61
Memory Used: 146.01 KB


In [2]:
mazeSize = (50, 50) 
astar_euc = AStar_Euclid(mazeSize=mazeSize)

astar_euc.startSearch()

astar_euc.algoStats('AStar_Euclid', mazeSize)
astar_euc.m.run()

Statistics for AStar_Euclid on Maze Size 50x50:
Nodes Explored by AStar_Euclid: 2227
Time Taken: 44.52 milliseconds
Shortest Path Length: 105
Memory Used: 291.71 KB


In [3]:
mazeSize = (70, 70) 
astar_euc = AStar_Euclid(mazeSize=mazeSize)

astar_euc.startSearch()

astar_euc.algoStats('AStar_Euclid',mazeSize)
astar_euc.m.run()

Statistics for AStar_Euclid on Maze Size 70x70:
Nodes Explored by AStar_Euclid: 4247
Time Taken: 83.00 milliseconds
Shortest Path Length: 149
Memory Used: 664.68 KB


MDP Policy and Value Iteration

In [1]:
import time
from pyamaze import maze, agent, COLOR, textLabel
import pandas as pd
import tracemalloc

#this is my base class for MDP 
class MarkovDP:
    def __init__(self, mazeSize=(1, 1)):
        self.maze = maze()
        mazeFileName = f'{mazeSize[0]}x{mazeSize[1]}Maze.csv'
        self.maze.CreateMaze(loadMaze=mazeFileName, theme='light')
        #self.maze = maze(mazeSize[0], mazeSize[1])
        #self.maze.CreateMaze(loopPercent=50, saveMaze=True, theme = 'light')
        textLabel(self.maze, 'Size of Maze', f'{mazeSize[0]}x{mazeSize[1]}')
        self.targt = self.maze._goal
        self.start = (self.maze.rows, self.maze.cols)
        self.mdp_val = {node: 0 if node != self.targt else 100 for node in self.maze.grid}
        self.mdp_reward = {node: -1 if node != self.targt else 100 for node in self.maze.grid}
        self.mdp_Policy = {node: 'N' for node in self.maze.grid if node != self.targt}
        self.gamma = 0.9
        self.threshold = 0.0001
        self.agents = {'pathAgent': agent(self.maze, footprints=True, color=COLOR.red, shape='arrow', filled=False)}

    def calValue(self, cel, dirr):
        if self.maze.maze_map[cel][dirr]: 
            if dirr == 'N':
                newCel = (cel[0] - 1, cel[1])
            elif dirr == 'S':
                newCel = (cel[0] + 1, cel[1])
            elif dirr == 'E':
                newCel = (cel[0], cel[1] + 1)
            elif dirr == 'W':
                newCel = (cel[0], cel[1] - 1)
        else:
            newCel = cel  
        return self.mdp_reward.get(newCel, 0) + self.gamma * self.mdp_val.get(newCel, 0)


    def pathFind(self):
        cel = self.start
        path = {}
        while cel != self.targt:
            dirr = self.mdp_Policy.get(cel)
            if dirr == 'N':
                newCel = (cel[0] - 1, cel[1])
            elif dirr == 'S':
                newCel = (cel[0] + 1, cel[1])
            elif dirr == 'E':
                newCel = (cel[0], cel[1] + 1)
            elif dirr == 'W':
                newCel = (cel[0], cel[1] - 1)
            else:
                break  
            path[cel] = newCel
            cel = newCel
        return path
    
    def displayPath(self, path):
        self.maze.tracePath({self.agents['pathAgent']: path}, delay=50)

    def algoStats(self, totalTime, memory_usage):
        path = self.pathFind()
        bestPathLen = len(path) + 1
        print("Statistics:")
        print(f"Time Taken: {totalTime:.2f} milliseconds")
        print(f"Shortest Path Length: {bestPathLen}")
        print(f"Memory Usage: {memory_usage / 1024:.2f} KB")

#class for Policy Iteration
class MDP_PolicyIter(MarkovDP):
    def __init__(self, mazeSize=(1, 1)):
        super().__init__(mazeSize)
        
    def startmdp_PolicyIter(self):
        tracemalloc.start()
        beginT = time.time()
        mdp_PolicyConverged = False
        while not mdp_PolicyConverged:
            while True:
                changeMax = 0
                for cel in self.maze.grid:
                    if cel == self.targt:
                        continue
                    lv = self.mdp_val[cel]
                    self.mdp_val[cel] = self.calValue(cel, self.mdp_Policy[cel])
                    changeMax = max(changeMax, abs(lv - self.mdp_val[cel]))
                if changeMax < self.threshold:
                    break

            mdp_PolicyConverged = True
            for cel in self.maze.grid:
                if cel == self.targt:
                    continue
                oact = self.mdp_Policy[cel]
                self.mdp_Policy[cel] = max(['N', 'S', 'E', 'W'], key=lambda dirr: self.calValue(cel, dirr))
                if oact != self.mdp_Policy[cel]:
                    mdp_PolicyConverged = False

        _, topmem = tracemalloc.get_traced_memory()
        tracemalloc.stop()
        stopT = time.time()
        self.displayPath(self.pathFind())
        self.algoStats((stopT - beginT) * 1000, topmem)

#class for Value Iteration
class MDP_ValueIter(MarkovDP):
    def __init__(self, mazeSize=(1, 1)):
        super().__init__(mazeSize)
        self.agents = {
            'pathAgent': agent(self.maze, footprints=True, color=COLOR.red, shape='arrow', filled=False)
        }

    def startmdp_ValueIter(self):
        tracemalloc.start()
        beginT = time.time()
        while True:
            changeMax = 0
            for cel in self.maze.grid:
                if cel == self.targt:
                    continue
                lv = self.mdp_val[cel]
                self.mdp_val[cel] = max(self.calValue(cel, dirr) for dirr in ['N', 'S', 'E', 'W'])
                changeMax = max(changeMax, abs(lv - self.mdp_val[cel]))
            if changeMax < self.threshold:
                break

        for cel in self.maze.grid:
            if cel == self.targt:
                continue
            self.mdp_Policy[cel] = max(['N', 'S', 'E', 'W'], key=lambda dirr: self.calValue(cel, dirr))

        _, topmem = tracemalloc.get_traced_memory()
        tracemalloc.stop()
        stopT = time.time()
        self.displayPath(self.pathFind())
        self.algoStats((stopT - beginT) * 1000, topmem ) 

In [2]:
mazeSize = (30, 30) 
policyIter = MDP_PolicyIter(mazeSize=mazeSize)
policyIter.startmdp_PolicyIter()
policyIter.maze.run()

Statistics:
Time Taken: 737.63 milliseconds
Shortest Path Length: 61
Memory Usage: 21.47 KB


In [2]:
mazeSize = (50, 50) 
policyIter = MDP_PolicyIter(mazeSize=mazeSize)
policyIter.startmdp_PolicyIter()
policyIter.maze.run()

Statistics:
Time Taken: 2955.13 milliseconds
Shortest Path Length: 105
Memory Usage: 58.80 KB


In [2]:
mazeSize = (70, 70) 
policyIter = MDP_PolicyIter(mazeSize=mazeSize)
policyIter.startmdp_PolicyIter()
policyIter.maze.run()

Statistics:
Time Taken: 8292.71 milliseconds
Shortest Path Length: 149
Memory Usage: 115.05 KB


In [2]:
mazeSize = (30, 30) 
valueItr = MDP_ValueIter(mazeSize=mazeSize)
valueItr.startmdp_ValueIter()
valueItr.maze.run()

Statistics:
Time Taken: 228.55 milliseconds
Shortest Path Length: 61
Memory Usage: 21.48 KB


In [2]:
mazeSize = (50, 50) 
valueItr = MDP_ValueIter(mazeSize=mazeSize)
valueItr.startmdp_ValueIter()
valueItr.maze.run()

Statistics:
Time Taken: 1515.40 milliseconds
Shortest Path Length: 105
Memory Usage: 59.14 KB


In [2]:
mazeSize = (70, 70) 
valueItr = MDP_ValueIter(mazeSize=mazeSize)
valueItr.startmdp_ValueIter()
valueItr.maze.run()

Statistics:
Time Taken: 3296.00 milliseconds
Shortest Path Length: 151
Memory Usage: 115.23 KB
