In [146]:
import numpy as np
import matplotlib.pyplot as plt
import random
from matplotlib.colors import ListedColormap, BoundaryNorm
import heapq
from heapq import heapify, heappush, heappop

In [147]:
class ship:
    def __init__(self, data, open = set(), leak = [()], bot = (), closed = set()):
        self.data = data # this should be numpy matrix
        self.open = open # set of tuples, each tuple has indices of open cell
        self.leak = leak #tuple
        self.bot = bot # tuple
        self.closed = closed # list of tuples

    
    @classmethod
    def CreateShip(cls,n):
        ship = cls(np.ones((n,n), dtype = float))
        #Pick random cell in interior to open
        x_rand, y_rand = random.randint(1, ship.data.shape[0]-2) , random.randint(1, ship.data.shape[1]-2)
        ship.data[x_rand][y_rand] = 0
        ship.open = {(x_rand, y_rand)} # We use {} to make a set, now we don't have to worry about any duplicates
        return ship  
    
    def ShowShip(self): 
        # white = open cells, black = closed cells, red = fire, blue = bot, green = leak
        colors = ['white', 'black', 'red', 'blue', 'green']
        custom_map = ListedColormap(colors)
        bounds = [-.1, .5, 1.5, 2.5, 3.5, 4.5]
        norm = BoundaryNorm(bounds, custom_map.N)
        plt.imshow(self.data, cmap = custom_map, norm=norm)
        
        #Ensure shows plot with increments of 1
        plt.xticks(np.arange(-0.5, self.data.shape[1], 1), labels=[])
        plt.yticks(np.arange(-0.5, self.data.shape[0], 1), labels=[])

        #Show gridlines 
        plt.grid(True)

        #Show colorbar
        plt.colorbar()
        plt.show()

    #Helper method that takes in indices and returns list of tuples that is 4 cardinal neighbors
    @staticmethod
    def getNeighbors(ind):
        up = (ind[0]-1, ind[1])
        down = (ind[0]+1, ind[1])
        left = (ind[0], ind[1]-1)
        right = (ind[0], ind[1]+1)
        return [up, down, left, right]
    
     # Helper to see if indices are within upper and lower bounds
    @staticmethod
    def checkBounds(upper, lower, ind):
        for i in ind:
            if i > upper or i < lower:
                return False
        return True
    
    #Helper method that takes a list of directions and ensures they're w/in bounds
    def getInbounds(self, dir):
        k = 0
        while k in range(len(dir)):
            if not ship.checkBounds(self.data.shape[0]-1, 0, dir[k]):
                del dir[k]
            else:
                k+=1
        return dir
    
    # Helper method, takes in an open cell and returns list of closed neighbors
    def checkNeighborsOpen(self, openInd):
        neighbors = ship.getNeighbors(openInd)
        valid,k = self.getInbounds(neighbors),0
        while k in range(len(valid)):
            r, c = valid[k] #unpack tuple
            if self.data[r][c] != 1:
                del valid[k]
            else:
                k+=1
        return valid
        
    #Takes in index which is a tuple that represents a cell. Checks to see if it has exactly one open neighbor
    def checkNeighborsClosed(self, Ind):
        neighbors = ship.getNeighbors(Ind)
        valid = self.getInbounds(neighbors)
        numOpen = 0
        for direction in valid:
            i,j = direction
            if self.data[i][j] == 0:
                numOpen+=1
        if numOpen == 1:return True
        return False
    
    def getDeadEnds(self):
        dead_ends = []
        for openInd in self.open:
            if self.checkNeighborsClosed(openInd):
                dead_ends.append(openInd)
        return dead_ends

    def openDeadEnds(self):
        dead_ends = self.getDeadEnds()
        for i in range(len(dead_ends)//2): # Want to open ~half the dead ends
            valid_neighbors = self.getInbounds(ship.getNeighbors(dead_ends[i]))
            closed_neighbors = []
            for neighbor in valid_neighbors:
                i,j = neighbor
                if self.data[i][j] == 1:
                    closed_neighbors.append(neighbor)

            #Now we have closed neighbors of dead end, choose one at random and open
            ri = None
            if len(closed_neighbors)-1 > 0:
                ri = random.randint(0, len(closed_neighbors)-1)
                rx, ry = closed_neighbors[ri]
                self.data[rx][ry] = 0

    # In order to more efficiently open the ship we only check the blocked cells that are next to open cells
    # Then we can see which of those blocked cells has exactly 1 open neighbor.
    # While this set is not empty, we choose a random cell in the set to open
    def OpenShip(self):
        while True:         
            possibly_valid_blocked, valid_blocked = [],[]
            for open in self.open:
                possibly_valid_blocked += self.checkNeighborsOpen(open)
            for possibly_valid in possibly_valid_blocked:
                if self.checkNeighborsClosed(possibly_valid): valid_blocked.append(possibly_valid)
            if len(valid_blocked) != 0:
                #choose random valid blocked and open it
                ri = random.randint(0, len(valid_blocked)-1)
                i, j = valid_blocked[ri]
                self.data[i][j] = 0 # Open randomly chosen cell
                self.open.add((i,j)) # Add this to set of open cells
            else:
                break
        self.openDeadEnds()

    def populateClosedCells(self):
        for i in range(len(self.data)):
            for j in range(len(self.data[0])):
                if self.data[i][j]==1:
                    self.closed.add((i,j))
    

    # Get random spawn points for bot, leak and fire
    def chooseInitSpawns(self):
        openlist = list(self.open)
        obj = random.sample(openlist, 1)
        for i in range(len(obj)):
            self.open.discard(obj[i])
            x,y = obj[i]
            self.data[x][y] = i+2
        # self.leak = obj[1]
        self.bot = obj[0]

    def shortest_path(self, matrix, start_cell, destination_cell):
        # Initialize a dictionary to store distances from the start cell
        distances = {}
        distances[start_cell] = 0

        # Initialize a set to store visited cells
        visited = set()

        # Create a queue to store cells to be explored
        queue = [(0, start_cell)]  # (distance, cell)

        while queue:
            # Get the cell with the smallest distance from the queue
            current_distance, current_cell = heapq.heappop(queue)

            # If the current cell is the destination, we've found the shortest path
            if current_cell == destination_cell:
                return current_distance

            # Mark the current cell as visited
            visited.add(current_cell)

            # Explore all unvisited neighbors of the current cell
            for neighbor_cell in self.get_neighbors(matrix, current_cell):
                if neighbor_cell not in visited:
                    new_distance = current_distance + 1
                    distances[neighbor_cell] = new_distance
                    heapq.heappush(queue, (new_distance, neighbor_cell))

        # If we reach this point, there is no path from the start cell to the destination cell
        return None

    def get_neighbors(self, matrix, current_cell):
        neighbors = []
        x, y = current_cell[0], current_cell[1]

        if x > 0:
            neighbors.append((x - 1, y))
        if x < len(matrix) - 1:
            neighbors.append((x + 1, y))
        if y > 0:
            neighbors.append((x, y - 1))
        if y < len(matrix[0]) - 1:
            neighbors.append((x, y + 1))

        return neighbors


    def create_detection_square(self, matrix, pivot, k):
        """Creates a detection square of size 2k+1 x 2k+1 around the pivot point.

        Args:
            matrix: A 2D numpy array.
            pivot: A tuple of (row, col) coordinates.
            k: An integer greater than 0 and less than 8.

        Returns:
            A 2D numpy array containing the detection square.
        """
        # Check if the pivot point is within the bounds of the matrix.
        if pivot[0] < 0 or pivot[0] >= matrix.shape[0] or pivot[1] < 0 or pivot[1] >= matrix.shape[1]:
            raise ValueError("Pivot point is out of bounds.")

        # Calculate the start and end indices of the detection square.
        start_row = pivot[0] - k
        end_row = pivot[0] + k + 1
        start_col = pivot[1] - k
        end_col = pivot[1] + k + 1

        # Check if the detection square goes out of bounds of the matrix.
        if start_row < 0:
            start_row = 0
        if end_row >= matrix.shape[0]:
            end_row = matrix.shape[0]
        if start_col < 0:
            start_col = 0
        if end_col >= matrix.shape[1]:
            end_col = matrix.shape[1]

        # Create the detection square.
        detection_square = []
        for i in range(start_row, end_row):
            for j in range(start_col, end_col):
                detection_square.append((i,j))
        # print(matrix[start_row:end_row, start_col:end_col])

        all_coordinates = []
        for row in range(matrix.shape[0]):
            for col in range(matrix.shape[1]):
                all_coordinates.append((row, col))

        outside_coordinates = []
        for coordinate in all_coordinates:
            if coordinate not in detection_square:
                outside_coordinates.append(coordinate)

        for coors in detection_square:
            x,y = coors
            if self.data[x][y] == 1 or self.data[x][y] == 3:
                detection_square.remove(coors)

        for coors in detection_square:
            if coors in self.closed or coors in self.bot:
                detection_square.remove(coors)
        
        for coors in outside_coordinates:
            x,y = coors
            if self.data[x][y] == 1:
                outside_coordinates.remove(coors)

        # for coors in outside_coordinates:
        #     if coors in self.closed or coors in self.bot:
        #         outside_coordinates.remove(coors)

        return (detection_square, outside_coordinates)  
    
    def manhattan(self, bot, pos):
        botx, boty = bot[0], bot[1]
        posx, posy = pos[0], pos[1]
        sqx = (botx-posx)**2
        sqy = (boty - posy)**2
        sqrt = (sqx + sqy)**(1/2)
        return int(sqrt)

    def moveBotToPos(self, start, dest, vis):

        def findPath(start, dest):
            q = []
            parent = {}
            q.append(start)
            vis.add(start)
            parent[start] = None
            while len(q) > 0:
                curr = q.pop(0)
                if curr == dest:
                    path = []
                    while curr is not None:
                        path.append(curr)
                        curr = parent[curr]
                        if curr == self.leak:
                            actions = len(path)
                            path[0] = 's'
                            return actions, path[0]
                    path.reverse()
                    return path
                neighbors = self.findBFSNeighbors(curr)
                for neighbor in neighbors:
                    if neighbor not in vis:
                        q.append(neighbor)
                        vis.add(neighbor)
                        parent[neighbor] = curr
            return []
        
        path = findPath(start, dest)
        actions = len(path)
        return actions, path

    def moveBot(self, oldPos, newPos):
        self.open.add(oldPos)
        self.bot = newPos
        self.data[newPos[0]][newPos[1]] = 3
        self.data[oldPos[0]][oldPos[1]] = 0

    def findBFSNeighbors(self, cell):
        closed = self.closed
        up = (cell[0] - 1, cell[1])
        if up in self.closed:
            up = None 
        down = (cell[0] + 1, cell[1])
        if down in self.closed:
            down = None
        right = (cell[0], cell[1]+ 1)
        if right in self.closed:
            right = None
        left = (cell[0], cell[1]-1)
        if left in self.closed:
            left = None
        pot = [up, down, left, right]
        valid = []
        for n in pot:
            if n != None:
                if self.checkBounds(self.data.shape[0]-1, 0, n):
                    valid.append(n)
        return valid

    def SSSP(self, matrix, start_cell):
        distances = {}
        visited = set()
        queue = [(0, start_cell)]

        while queue:
            current_distance, current_cell = heapq.heappop(queue)
            if current_cell in visited:
                continue
            distances[current_cell] = current_distance
            visited.add(current_cell)

            for neighbor_cell in self.get_neighbors(matrix, current_cell):
                if neighbor_cell not in visited:
                    new_distance = current_distance + 1
                    heapq.heappush(queue, (new_distance, neighbor_cell))

        return distances
    
    def get_neighbors(self, matrix, current_cell):
        neighbors = []
        x, y = current_cell[0], current_cell[1]

        if x > 0:
            neighbors.append((x - 1, y))
        if x < len(matrix) - 1:
            neighbors.append((x + 1, y))
        if y > 0:
            neighbors.append((x, y - 1))
        if y < len(matrix) - 1:
            neighbors.append((x, y + 1))

        return neighbors

    def sort_by_second_value(self, tuple):
                return tuple[1]

    def bot1(self, k):
        openlist = list(self.open)
        obj = random.sample(openlist, 1)[0]
        self.open.discard(obj)
        self.data[obj[0]][obj[1]] = 3
        self.bot = obj
        detectionSquare, outSideCoors = self.create_detection_square(self.data, self.bot, k)
        leakSpots = random.sample(outSideCoors,1)[0]
        self.open.discard(leakSpots)
        self.data[leakSpots[0]][leakSpots[1]] = 4
        self.leak = leakSpots
        actions = 0
        visited = set()
        visited.add(self.bot)
        mayContainLeak = list(self.open)
        mayContainLeak.append(self.leak)
        # self.ShowShip()
        while self.leak != self.bot:
            detectionSquare, outSideCoors = self.create_detection_square(self.data, self.bot, k); actions +=1
            oldBot = self.bot
            visited.add(oldBot)
            if self.leak not in detectionSquare:
                for cells in detectionSquare:
                    if cells in mayContainLeak:
                        mayContainLeak.remove(cells)
            else:
                newMayContainLeak = []
                for cell in detectionSquare:
                    if cell in mayContainLeak: 
                        newMayContainLeak.append(cell)
                mayContainLeak = newMayContainLeak
            path = []
            path.append(0)
            sdist = []
            dist = self.SSSP(self.data, self.bot)
            for key in dist.keys():
                if key in mayContainLeak:
                    sdist.append((key, dist[key]))
            sdist = sorted(sdist, key=self.sort_by_second_value)
            smallest_dist = sdist[0][1]
            smallest_dist_list = []
            for c in sdist:
                if c[1] == smallest_dist and smallest_dist not in visited:
                    smallest_dist_list.append(c[0])
            if len(smallest_dist_list) <= 0:
                path[0] = random.sample(sdist,1)[0]
            else:
                path[0] = random.sample(smallest_dist_list,1)
            act, travel =  self.moveBotToPos(self.bot, path[0][0], visited) #return action steps
            actions += act
            for cells in travel:
                visited.add(cells)
            self.moveBot(self.bot, path[0][0])
            if self.bot in mayContainLeak:
                mayContainLeak.remove(self.bot)
        return actions
    
    def bot5(self, k):
        openlist = list(self.open)
        obj = random.sample(openlist, 1)[0]
        self.open.discard(obj)
        self.data[obj[0]][obj[1]] = 3
        self.bot = obj
        detectionSquare, outSideCoors = self.create_detection_square(self.data, self.bot, k)
        leakSpots = random.sample(outSideCoors,2)
        self.open.discard(leakSpots[0])
        self.open.discard(leakSpots[1])
        self.data[leakSpots[0][0]][leakSpots[0][1]] = 4
        self.data[leakSpots[1][0]][leakSpots[1][1]] = 4
        self.leak = leakSpots
        mayContainLeak = list(self.open)
        mayContainLeak.append(self.leak[0])
        mayContainLeak.append(self.leak[1])
        visited = set()
        visited.add(self.bot)
        actions = 0
        while len(self.leak) > 0:
            detectionSquare, outSideCoors = self.create_detection_square(self.data, self.bot, k); actions +=1
            oldBot = self.bot
            visited.add(oldBot)
            if self.leak not in detectionSquare:
                for cells in detectionSquare:
                    if cells in mayContainLeak:
                        mayContainLeak.remove(cells)
            else:
                newMayContainLeak = []
                for cell in detectionSquare:
                    if cell in mayContainLeak: 
                        newMayContainLeak.append(cell)
                mayContainLeak = newMayContainLeak
            path = []
            path.append(0)
            sdist = []
            dist = self.SSSP(self.data, self.bot)
            for key in dist.keys():
                if key in mayContainLeak:
                    sdist.append((key, dist[key]))
            sdist = sorted(sdist, key=self.sort_by_second_value)
            if len(sdist) == 0:
                for cells in self.leak:
                    val = self.shortest_path(self.data, self.bot, cells)
                    sdist.append((cells, val))
            smallest_dist = sdist[0]
            smallest_dist = smallest_dist[1]
            smallest_dist_list = []
            for c in sdist:
                if c[1] == smallest_dist and smallest_dist not in visited:
                    smallest_dist_list.append(c[0])
            if len(smallest_dist_list) <= 0:
                path[0] = random.sample(sdist,1)[0]
            else:
                path[0] = random.sample(smallest_dist_list,1)
            act, travel =  self.moveBotToPos(self.bot, path[0][0], visited) #return action steps
            actions += act
            for cells in travel:
                visited.add(cells)
            self.moveBot(self.bot, path[0][0])
            for cells in self.leak:
                if cells == self.bot:
                    x,y = cells
                    self.data[x][y] = 0
                    self.leak.remove(cells)
                    self.open.add(cells)
            path = None
            if self.bot in mayContainLeak:
                mayContainLeak.remove(self.bot)
        return actions
    

    def probablisticDjikstra(self, probs, start, goal):
        prob_dist = {}
        prev_cells = {}
        pq = [(1/probs[start], start)]
        prob_dist[start] = 0
        while pq:
            cur_prob, cur_cell = heapq.heappop(pq)
            if cur_cell == goal:
                break
            neighbors = self.get_neighbors(self.data, cur_cell)
            for n in neighbors:
                new_prob = cur_prob + (1/probs[n])
                if n not in prob_dist or new_prob > prob_dist[n]:
                    prob_dist[n] = new_prob
                    prev_cells[n] = cur_cell
                    heapq.heappush(pq, (new_prob, n))
        max_prob_paths = self.find_all_paths(prev_cells, start, goal)
        return max_prob_paths
    
    def find_all_paths(self, prev_cells, start, goal):
        # Initialize the list of paths
        paths = []

        # Find all paths from the goal cell to the start cell
        cur_cell = goal

        while cur_cell != start:
            paths.append(cur_cell)
            cur_cell = prev_cells[cur_cell]

        # Reverse the paths to get them in the correct order
        paths.reverse()

        return paths


           


In [148]:
            # for pos in mayContainLeak:
            #     dist[pos] = self.manhattan(self.bot, pos)
            # min_key = min(dist, key=lambda k: dist[k])
            # if min_key in visited:
            #     del dist[min_key]
            #     min_key = min(dist, key=lambda k: dist[k])  
            # path[0] = (min_key)
'''space_ship = ship.CreateShip(50)
totalActions = 0
totalRuns = 100
for i in range(totalRuns):
    space_ship.OpenShip()
    space_ship.openDeadEnds()
    space_ship.populateClosedCells()
    val = space_ship.bot1(15) 
    if val == 'failure':
        continue
    else:
        totalActions += val
        totalRuns-=1
print(totalActions/100)'''

"space_ship = ship.CreateShip(50)\ntotalActions = 0\ntotalRuns = 100\nfor i in range(totalRuns):\n    space_ship.OpenShip()\n    space_ship.openDeadEnds()\n    space_ship.populateClosedCells()\n    val = space_ship.bot1(15) \n    if val == 'failure':\n        continue\n    else:\n        totalActions += val\n        totalRuns-=1\nprint(totalActions/100)"

In [149]:
# totalActions = 0
# totalSims = 100
# k = 1
# while k != 16:
#     totalActions = 0
#     for i in range(1):
#         space_ship = ship.CreateShip(30)
#         space_ship.OpenShip()
#         space_ship.openDeadEnds()
#         space_ship.populateClosedCells()
#         totalActions += space_ship.bot5(k)
#     print("For k value " + str(k) + " and average actions: " + str(totalActions/totalSims))
#     k+=1

# space_ship = ship.CreateShip(30)
# space_ship.OpenShip()
# space_ship.openDeadEnds()
# space_ship.populateClosedCells()  
# space_ship.bot5(10)

Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "c:\Users\godha\AppData\Local\Programs\Python\Python311\Lib\site-packages\IPython\core\interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\godha\AppData\Local\Temp\ipykernel_10312\1211433638.py", line 19, in <module>
    print(space_ship.bot5(2))
          ^^^^^^^^^^^^^^^^^^
  File "C:\Users\godha\AppData\Local\Temp\ipykernel_10312\456828968.py", line 473, in bot5
    val = self.shortest_path(self.data, self.bot, cells)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\godha\AppData\Local\Temp\ipykernel_10312\456828968.py", line -1, in shortest_path
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\godha\AppData\Local\Programs\Python\Python311\Lib\site-packages\IPython\core\interactiveshell.py", line 2052, in showtraceback
    stb = self.InteractiveTB.structured_traceba

In [None]:
# def Bot3(ship, alpha, threshold_value):
#     opens = list(ship.open)
#     while True:
#         botloc, leakloc = random.sample(opens, 2)
#         if botloc != leakloc: break
#     print(f'bot started at: {botloc}, leak is at: {leakloc}')
#     start_prob = 1/(len(opens)-1) #Probability leak is in any open cell is 1/# of possible cells
#     probs = {cell: start_prob for cell in opens if cell != botloc}
#     probs[botloc] = 0
#     game_over,t, moves = False,0,[botloc]
#     while not game_over:
#         senseRes,_ = sense(botloc, leakloc, alpha, ship)
#         t+=1 # Sense action takes one time step
#         distances = SSSP(ship, botloc)
#         probs = updateProbs(probs, senseRes, ship, distances,alpha)
#         max_prob = max(probabilities.values())
#         c = 1/alpha #create a failsafe in case there seems to be no higher probability value
#         while max_prob < threshold_value: #till we have found a max_probability cell that is better than what we have set as our threshold we keep sesning 
#             senseRes,_ = sense(botloc, leakloc, alpha, ship)
#             t+=1
#             distances = SSSP(ship, botloc)
#             probs = updateProbs(probs, senseRes, ship, distances,alpha)
#             max_prob = max(probabilities.values())
#             c-=1
#             if c <= 0: #failsafe to stop infinite loops
#                 break
#         for loc in distances[next_cell][1]:
#             moves.append(loc)
#             if loc == leakloc:
#                 game_over = True
#                 return game_over, t, moves
#             # Set the p(leak in this cell) = 0, and update other probs thru normalization. 
#             # Note that this won't change cell with max prob b/c ratio of probabilities will remain the same
#             probs = newCell(loc, probs)
#             t+=1
#         if set(moves)==ship.open: break # This means something went wrong and we fucked up
#     return game_over, t, ['Bozo'] bot8(ship, alpha)   