In [108]:
import numpy as np
import random as rnd
from tqdm import tqdm
import sys
from PIL import Image
import imageio

rnd.seed(42)

In [109]:
class Maze:

    matrix = np.array
    connectivity = int
    width = int
    height = int
    def __init__(self, path):
        import numpy as np
        import random as rnd
        rnd.seed(42)

        with open(path, 'r') as maze:
            maze = [int(x) for x in maze.readline().split()]
        width, height = maze[0:2]
        connectivity = maze[2]
        maze = maze[3:]
        matrix = []

        for y in range(height):
            matrix.append(maze[0 + height*y : width + height*y])

        self.matrix = np.array(matrix)
        self.width, self.height, self.connectivity, self.shape = \
            width, height, connectivity, (width, height) #NOTE: np.array.shape is (height, width), don't confuse those, although doesn't matter for square labyrinths

        self.target_loc = np.unravel_index(np.argmax(self.matrix, axis=None), self.matrix.shape)


    def up_frame(self, cell_code: int):
        return cell_code//1%2==1

    def right_frame(self, cell_code: int):
        return cell_code//2%2==1

    def bottom_frame(self, cell_code: int):
        return cell_code//4%2==1

    def left_frame(self, cell_code: int):
        return cell_code//8%2==1


    def up_frame_coords(self, coordinates: tuple[int, int]):
        x, y = coordinates
        cell_code = self.matrix[x][y]
        return cell_code//1%2==1

    def right_frame_coords(self, coordinates: tuple[int, int]):
        x, y = coordinates
        cell_code = self.matrix[x][y]
        return cell_code//2%2==1

    def bottom_frame_coords(self, coordinates: tuple[int, int]):
        x, y = coordinates
        cell_code = self.matrix[x][y]
        return cell_code//4%2==1

    def left_frame_coords(self, coordinates: tuple[int, int]):
        x, y = coordinates
        cell_code = self.matrix[x][y]
        return cell_code//8%2==1

In [110]:
class Ant:
    def __init__(self, x:int, y:int):
        self.position_init = (x, y)
        self.position = (x, y)
        self.route = [self.position_init]

    def reset(self):
        self.position = self.position_init
        self.route = [self.position_init]

In [111]:
class ACO:

    def __init__(self, maze:Maze, ant_num:int = None, evaporation_rate:float = 0.1,
                 criterion:str = "max_iter", route_param_limit:int = 15, max_iter:int = 1000,
                 exploitation_const:float = 1.0, trace_log_base:float = 1, shuffle:bool = False):
        self.shuffle = shuffle          #if fit() shuffles the ant_positions every iteration.
                                        # Improves time and its theoretical limit for ant_num less than total surface of a maze
        self.color_map = None                           #will be updated in visualisation part, check self.get_color_map()
        self.maze = maze                                #stores the object of the maze fo ACO to work on
        self.evaporation_rate = evaporation_rate        #how fast the pheromones evaporate
        self.criterion = criterion                      #stopping criteria. Those that work reliably are: "connectivity",
        # "max_iter", "max_route" and "avg_route". The latter two require route_param_limit
        self.route_param_limit = route_param_limit      #the limit for both "max_route" and "avg_route" depending on criterion chosen
        self.max_iter = max_iter
        self.exploitation_const = exploitation_const    #used to adjust how more likely the explored positions are to be chosen by an ant
        self.trace_log_base = trace_log_base

        if ant_num is None:
            ant_num = maze.width*maze.height
        self.ant_num = ant_num
        ant_positions = rnd.sample([(x, y) for x in range(maze.width) for y in range(maze.height)], k=self.ant_num)
        self.ants = [Ant(pos[0], pos[1]) for pos in ant_positions]

        self.temp_trace_map = np.array([[0.0]*maze.width]*maze.height)      #an incremental unit for trace_map, is temporal for every iteration
        self.trace_map = np.array([[1.0]*maze.width]*maze.height)           #pheromone map, i.e. heuristics matrix

        self.depth = 0                  #the longest path in the maze
        self.came2target = 0

        self.images = []                #log container for .gif generation

    def clear_traces(self):         #empties temporal trace map
        self.temp_trace_map = np.array([[0.0]*self.maze.width]*self.maze.height)

    def move(self, ant:Ant, exploiting:bool = False, draw:bool = False, heat_map:bool = False):
        self.exploitation_const = 1
        x, y = ant.position
        if ant.position == self.maze.target_loc:
            return ant.position

        ways = [(x+1, y),
                (x-1, y),
                (x, y+1),
                (x, y-1)]
        checks = [self.maze.bottom_frame_coords,
                  self.maze.up_frame_coords,
                  self.maze.right_frame_coords,
                  self.maze.left_frame_coords]

        ways = [ways[i] for i in range(len(ways)) if not (checks[i](ant.position) or ways[i] in ant.route)]

        if len(ways) == 0:
            return None

        if self.maze.target_loc in ways:
            final_way = self.maze.target_loc
            self.came2target += 1
        else:
            weights = [self.trace_map[way]**self.exploitation_const for way in ways]
            final_way = rnd.choices(population=ways, weights=weights, k=1)[0]
            if exploiting:
                final_way = ways[np.argmax(weights)]

        ant.position = final_way
        ant.route.append(final_way)
        if draw:
            self.draw_all(ant=ant, heat_map=heat_map)
        return ant.position

    def fit(self, draw:bool = False, heat_map:bool = False):
        if draw:
            self.images = []
        for _ in tqdm(range(self.max_iter)):
            if self.shuffle:
                ant_positions = rnd.sample([(x, y) for x in range(self.maze.width) for y in range(self.maze.height)], k=self.ant_num)
                self.ants = [Ant(pos[0], pos[1]) for pos in ant_positions]
            connectivity = sum([int(aco.get_route((x, y))[2])
                 for x in range(aco.maze.width)
                 for y in range(aco.maze.height)])/(self.maze.height * aco.maze.width)

            if connectivity == 1 and self.criterion == "connectivity":
                break
            if self.criterion == "avg_route" and sum([int(aco.get_route((x, y))[1])
                                                      for x in range(aco.maze.width)
                                                      for y in range(aco.maze.height)])\
                    /(self.maze.height * aco.maze.width) == 1:
                break
            if self.criterion == "max_route" and self.depth < self.route_param_limit:
                break
            if min([int(aco.get_route((x, y))[1])
                    for x in range(aco.maze.width)
                    for y in range(aco.maze.height)]) == 1 < self.route_param_limit and self.criterion == "min_route":
                break

            for i, ant in enumerate(self.ants):
                self.trace_log_base = 0.1
                while self.move(ant) not in [None, self.maze.target_loc]:
                    continue
                route = ant.route
                n = len(route)
                if draw and connectivity <= 0.8:
                    self.draw_all(ant=ant, heat_map=heat_map)

                if self.depth<n:
                    self.depth = n

                if self.maze.target_loc in route:
                    self.trace_log_base *= 15.0

                for point in route:
                    self.temp_trace_map[point] += (self.trace_log_base**1.5)/n
                ant.reset()

                self.trace_map = self.trace_map + self.temp_trace_map
                self.clear_traces()
            self.trace_map = self.trace_map * (1-self.evaporation_rate)
        if draw:
            imageio.mimsave('fit.gif', self.images, fps=20)
        self.trace_map = (self.trace_map - np.min(self.trace_map))/np.max(self.trace_map)
        self.trace_map[self.trace_map == 0] = sys.float_info.min
        return self.trace_map

    def get_route(self, coordinates: tuple[int, int], draw:bool = False, heat_map:bool = False):
        x, y = coordinates
        explorer_ant = Ant(x, y)
        if draw:
            self.images = []

        while self.move(explorer_ant, exploiting=True, draw=draw, heat_map=heat_map) not in [None, self.maze.target_loc]:
            continue
        if draw:
            imageio.mimsave('check.gif', self.images, duration=0.5)
        return explorer_ant.route, len(explorer_ant.route), explorer_ant.position==self.maze.target_loc, self.depth

    def draw_up(self, img: Image, thickness:int=10, color:tuple[int, int, int] = (0,0,0)):
        pixels = img.load()
        for y in range(img.size[0]):
            for x in range(thickness):
                pixels[y, x] = color
        return img

    def draw_bottom(self, img: Image, thickness:int=10, color:tuple[int, int, int] = (0,0,0)):
        pixels = img.load()
        for y in range(img.size[0]):
            for x in range(img.size[0]-thickness, img.size[0]):
                pixels[y, x] = color
        return img

    def draw_right(self, img: Image, thickness:int=10, color:tuple[int, int, int] = (0,0,0)):
        pixels = img.load()
        for y in range(thickness):
            for x in range(img.size[0]):
                pixels[y, x] = color
        return img

    def draw_left(self, img: Image, thickness:int=10, color:tuple[int, int, int] = (0,0,0)):
        pixels = img.load()
        for y in range(img.size[0]-thickness, img.size[0]):
            for x in range(img.size[0]):
                pixels[y, x] = color
        return img

    def draw_cross(self, img: Image, thickness:int=10, color:tuple[int, int, int] = (0,0,0)):
        pixels = img.load()
        for y in range(0, img.size[0]-thickness+1):
            pixels[y, y] = color
            pixels[y+thickness-1, y] = color

            pixels[-y, y] = color
            pixels[-y-thickness+1, y] = color

    def get_concat_h(self, im1, im2):
        dst = Image.new('RGB', (im1.width + im2.width, im1.height))
        dst.paste(im1, (0, 0))
        dst.paste(im2, (im1.width, 0))
        return dst

    def get_concat_v(self, im1, im2):
        dst = Image.new('RGB', (im1.width, im1.height + im2.height))
        dst.paste(im1, (0, 0))
        dst.paste(im2, (0, im1.height))
        return dst

    def draw_dot(self, img: Image, thickness:int=4, color:tuple[int, int, int] = (255,0,0)):
        pixels = img.load()
        center = img.size[0]//2
        for y in range(center-thickness, center+thickness-1):
            for x in range(center-thickness, center+thickness-1):
                pixels[x, y] = color
        pixels[center, center] = color
        pixels[center-1, center] = color
        pixels[center-1, center-1] = color
        pixels[center, center-1] = color
        return img

    def get_color_map(self):
        color_map = (self.trace_map - np.min(self.trace_map))/np.max(self.trace_map)
        return color_map



    def draw_all(self, ant:Ant=Ant(-1, -1), heat_map:bool = True):
        h_img = None
        v_img = None
        tiles = np.ndarray(shape=self.maze.shape, dtype=object)
        if heat_map:
            self.color_map = self.get_color_map()
        for row in range(self.maze.shape[0]):
            for col in range(self.maze.shape[1]):
                code = self.maze.matrix[row][col]
                thickness = 2
                if heat_map:
                    brightness = 255 - int(self.color_map[row][col]*255)
                    img = Image.new( 'RGB', (64, 64), color=(brightness, 255, brightness))
                else:
                    img = Image.new( 'RGB', (64, 64), "white")


                if (row, col) in ant.route:
                    self.draw_dot(img)

                self.draw_up(img, thickness//2, color=(200, 200, 200))
                self.draw_left(img, thickness//2, color=(200, 200, 200))
                self.draw_bottom(img, thickness//2, color=(200, 200, 200))
                self.draw_right(img, thickness//2, color=(200, 200, 200))

                if M.up_frame(cell_code=code):
                    self.draw_up(img, thickness)
                if M.right_frame(cell_code=code):
                    self.draw_left(img, thickness)
                if M.bottom_frame(cell_code=code):
                    self.draw_bottom(img, thickness)
                if M.left_frame(cell_code=code):
                    self.draw_right(img, thickness)
                if M.target_loc == (row, col):
                    self.draw_cross(img, thickness)

                tiles[row][col] = img
                if col==0:
                    h_img = img
                else:
                    h_img = self.get_concat_h(h_img, img)
            if row==0:
                v_img = h_img
            else:
                v_img = self.get_concat_v(v_img, h_img)
        self.images.append(v_img)

In [112]:
M = Maze("mazes/M25_1.mz")
aco = ACO(M, criterion="connectivity", shuffle=True, ant_num=130)

trace_map = aco.fit(draw=True, heat_map=True)

  8%|▊         | 83/1000 [00:44<08:16,  1.85it/s] 


In [113]:
aco.get_route((13, 14), draw=True, heat_map=True)

([(13, 14),
  (13, 13),
  (12, 13),
  (12, 12),
  (12, 11),
  (11, 11),
  (11, 10),
  (11, 9),
  (11, 8),
  (10, 8),
  (10, 7),
  (10, 6),
  (9, 6)],
 13,
 True,
 74)