## Dependencies

In [1]:
from enum import Enum
import random
import math
from time import sleep
from matplotlib import pyplot as plt
import numpy as np
import copy
import json
import pandas as pd
import csv
from abc import ABC, abstractmethod
from collections import deque
from queue import PriorityQueue

In [2]:
# Directional Vectors
DY = [-1, 1, 0, 0]
DX = [0, 0, 1, -1]   

# Parent class of all Enums
class Flags(Enum):
    def __eq__(self, other):
        if isinstance(other, Flags):
            return self.value == other.value
        return self.value == other

# Represent Cell Types
class Cell(Flags):
    BLOCKED = 0
    OPEN = 1
    LEAK = 3

# Represent Belief Updates
class Belief(Flags):
    NO_LEAK = 0
    NO_BEEP = 1
    BEEP = 2


In [4]:
# Helper Methods
def manhattan(curr, goal):
    """Compute manhattan distance between two cells"""
    r, c = curr
    goal_r, goal_c = goal
    return abs(goal_r - r) + abs(goal_c - c)


In [5]:
# Load Seeds
SEEDS_FILE_PATH = 'seeds.txt'
number_of_seeds = 100 # gets first [number_of_seeds] from seeds.txt (has 1k seeds)
SEEDS = []
with open(SEEDS_FILE_PATH, 'r') as seeds_file:
    reader = csv.reader(seeds_file)
    for row in reader:
        if number_of_seeds == 0:
            break
        number_of_seeds -= 1
        SEEDS.append(int(row[0]))   

In [6]:
# Generate Boards and Distances
n = 30
DIR_PATH = "../boards/"

if False:
    for seed in SEEDS:
        print(f"Generating Ship for seed {seed}")
        
        ship = Ship(D = n, seed = seed, testing = False)
        board = [[0 for _ in range(n)] for _ in range(n)]
        for i in range(n):
            for j in range(n):
                board[i][j] = str(ship.board[i][j].value)
                
        distances = ship.distances
        
        board_path = DIR_PATH + f"board_{seed}.csv"
        distance_path = DIR_PATH + f"distances_{seed}.npy"
        with open(board_path, "w", newline = '') as f:
            writer = csv.writer(f)
            writer.writerows(board)
    
        np.save(distance_path, distances)

In [7]:
# Testing Board Loading
seed = 169806895
ship_a = load_ship(seed, 2)

NameError: name 'load_ship' is not defined

In [None]:
# Seed Generation
if False:
    file1 = open('seeds.txt', 'w')
    for seed in np.random.randint(low=0, high=1000000000, size=1000):
        file1.write(str(seed) + "\n")
        print(seed)
    file1.close()

## The Ship
The layout of the ship (walls, hallways, etc) is on a square grid, generated in the following way:

• Start with a square grid, D × D, of ‘blocked’ cells. Define the neighbors of cell as the adjacent cells in the
up/down/left/right direction. Diagonal cells are not considered neighbors.

• Choose a square in the interior to ‘open’ at random.

• Iteratively do the following:

– Identify all currently blocked cells that have exactly one open neighbor.

– Of these currently blocked cells with exactly one open neighbor, pick one at random.

– Open the selected cell.

– Repeat until you can no longer do so.

• Identify all cells that are ‘dead ends’ - open cells with one open neighbor.

• For approximately half these cells, pick one of their closed neighbors at random and open it.


In [8]:
# Represents the Ship
class Ship:
    def __init__(self, 
                 seed = None, 
                 leaks = 1, 
                 D = 50, 
                 testing = True,
                 load_board = None,
                 load_distances = None):
        
        self.actions = {}
        self.D = D
        self.leaks = leaks
        self.board = []
        self.open_cells = []
        self.initial_leaks = []
        random.seed(seed)
        

        if load_board is None:
            self.init_board()
        else:
            self.board = load_board
            for i in range(D):
                for j in range(D):
                    if self.board[i][j] != Cell.BLOCKED:
                        self.board[i][j] = Cell.OPEN
                        self.open_cells.append((i, j))
            
        self.init_leaks()
        self.distances = self.init_distances(testing) if load_distances is None else load_distances

    
    def init_board(self):
        """Initialize the 50x50 board following Project 1 Algorithm"""
        n = self.D
        self.board = [[Cell.BLOCKED for _ in range(n)] for _ in range(n)]
        rand_row = random.randint(0, n - 1)
        rand_col = random.randint(0, n - 1)
        
        self.board[rand_row][rand_col] = Cell.OPEN
        valid_cells = set()
        open_cells = [(rand_row, rand_col)]
        valid_cells.add((rand_row, rand_col))

        while True:
            candidates = self.get_candidates(valid_cells)
            if not candidates:
                break

            rand_cell = random.choice(candidates)
            r, c = rand_cell
            self.board[r][c] = Cell.OPEN
            valid_cells.add((r, c))
            open_cells.append((r, c))

        # Collect all open cells with exactly 1 open neighbor
        dead_ends = []
        for oc in open_cells:
            r, c = oc
            open_neighbors = self.get_neighbor_count(r, c, Cell.OPEN)
            if open_neighbors == 1:
                dead_ends.append(oc)

        # Open approximately half of the dead ends
        num_dead_ends = len(dead_ends)
        while len(dead_ends) > num_dead_ends // 2:

            # Pick a random dead end and open one of it's neighbors
            rand_idx = random.randint(0, len(dead_ends) - 1)
            r, c = dead_ends[rand_idx]
            closed_neighbors = self.get_neighbors(r, c, Cell.BLOCKED)
            if closed_neighbors:
                rand_cell = random.choice(closed_neighbors)
                nr, nc = rand_cell
                self.board[nr][nc] = Cell.OPEN
                open_cells.append((nr, nc))
                dead_ends.pop(rand_idx)

                # Check for newly opened dead ends
                open_neighbors = self.get_neighbors(nr, nc, Cell.OPEN)
                for neighbor in open_neighbors:
                    r, c = neighbor

                    # If this pair is also a dead end, remove it
                    try:
                        idx = dead_ends.index((r, c))
                        dead_ends.pop(idx)
                    except ValueError: # was not a dead end
                        continue
                        
        self.open_cells = open_cells

    def init_leaks(self):
        """Randomly selects a cell to be the leak"""
        for i in range(self.leaks):
            open_cell = random.choice(self.open_cells) # Choose a random cell from the open cells
            r, c = open_cell
    
            self.board[r][c] = Cell.LEAK
            self.initial_leaks.append((r, c))
        
    # Helper Methods
    def is_valid(self, r, c):
        """Determine if a row x col is within the bounds of the board"""
        return r >= 0 and c >= 0 and r < self.D and c < self.D

    def get_neighbors(self, r, c, target):
        """Get all of the valid neighbors of the desired type"""
        res = []
        for i in range(4):
            nr = DY[i] + r
            nc = DX[i] + c
            if self.is_valid(nr, nc) and self.board[nr][nc] == target:
                res.append((nr, nc))
        
        return res

    def get_neighbor_count(self, r, c, target):
        """Get the number of valid neighbors of the desired type"""
        return len(self.get_neighbors(r, c, target))

    def get_candidates(self, open_cells):
        n = self.D
        candidates = []
        to_remove = []
        for pair in open_cells:
            r, c = pair
            closed_neighbors = self.get_neighbors(r, c,  Cell.BLOCKED)
            valid_found = 0
            for neighbor in closed_neighbors:
                n_r, n_c = neighbor
                open_neighbors = self.get_neighbor_count(n_r, n_c, Cell.OPEN)
                if open_neighbors == 1:
                    valid_found += 1
                    candidates.append((n_r, n_c))

            if valid_found == 0:
                to_remove.append(pair) 

        for pair in to_remove:
            open_cells.remove(pair)
            
        return candidates

    def init_distances(self, testing = True):
        """Calculate distance from the leak to every other cell in the board"""
        # param: `testing` defaults to True
        # This is a flag that, when True, will compute manhattan distances instead of literal distances
        # Do not change this until we run actual tests, as BFS from every cell takes 3min+
        
        n = len(self.board)
        distances = np.zeros((n, n, n, n))

        # Get the distance between any cell and every other cell
        def distances_from_cell(start):

            # Testing flag to use MANHATTAN DISTANCE instead of LITERAL DISTANCE
            x, y = start
            if testing:
                for i in range(n):
                    for j in range(n):
                        dist = manhattan(start, (i, j))
                        distances[i, j, x, y] = dist
                        distances[x, y, i, j] = dist
            else:

                queue = deque()
                vis = set()
                queue.append([start, 0])
                while queue:
                    curr, dist = queue.popleft()
                    r, c = curr
                    vis.add(curr)
                    distances[r, c, x, y] = dist
                    distances[x, y, r, c] = dist
                    for i in range(4):
                        nr = DY[i] + r
                        nc = DX[i] + c
                        if self.is_valid(nr, nc) and (nr, nc) not in vis and self.board[nr][nc] != Cell.BLOCKED:
                            queue.append([(nr, nc), dist + 1])
                            
        for cell in self.open_cells:
            distances_from_cell(cell)

        return distances
    
    def display(self, path = None):
        """Display a grid image of the current board (FOR USE IN NOTEBOOK)"""
        if isinstance(path, tuple):
            loc = path
            path = [loc]
        
        n = self.D
        copy_board = [row[:] for row in self.board]
        leaks = []
        if not isinstance(copy_board[0][0], (float, int)):
            for i in range(n):
                for j in range(n):
                    copy_board[i][j] = copy_board[i][j].value
                    if copy_board[i][j] == Cell.LEAK:
                        leaks.append((i, j))

        if path:
            for pair in path:
                r, c = pair
                copy_board[r][c] = 2

        for leak in leaks:
            r, c = leak
            copy_board[r][c] = 3
                
        image_data = np.array(copy_board)
        plt.imshow(image_data, "Blues")
        # plt.axis("off")
        plt.show()
        

## The Bots
The bot occupies an open cell somewhere in the ship (to be determined shortly). The bot can move to one adjacent cell
every time step (up/down/left/right). Each bot will have access to a different kind of sensor data to help determine
where the atmosphere leak in the ship is - the purpose of the bot is to utilize the available sensor information to
locate and reach the atmosphere leak as quickly as possible. The bot will start at a randomly selected open square
in the ship. The bot knows / can sense when it enters the cell where the leak is located.

##### At each timestep - the bot must choose whether to move or to sense.

In [9]:
# Putting this here because I think it could be an efficient architecture, basically skeleton code
class Bot(ABC):
    def __init__(self, ship):
        self.ship = ship
        self.timestep = 0
        self.path = []
        self.init_bot()
    
    @abstractmethod
    def simulate(self):
        raise NotImplementedError()

    @abstractmethod
    def move(self):
        raise NotImplementedError()

    @abstractmethod
    def sense(self):
        raise NotImplementedError()

    def init_bot(self):
        """Place bot in random location on the board"""
        r, c = random.choice(self.ship.open_cells)
        self.position = (r, c)


    def bfs(self, src, goal):
        """Shortest Path from SRC to GOAL -> Returns GOAL, PATH"""

        # Add support for multiple goal states
        if isinstance(goal, tuple):
            state = goal
            goal = set()
            goal.add(state)
        
        queue = deque()
        queue.append((src, deque()))
        vis = set()
        while queue: 
            curr, path = queue.popleft()
            new_path = path.copy()
            new_path.append(curr)
            vis.add(curr)

            if curr in goal:
                return (curr, new_path)
              
            r, c = curr
            for i in range(4):
                nr = DY[i] + r
                nc = DX[i] + c
                if self.ship.is_valid(nr, nc) and (nr, nc) not in vis and self.ship.board[nr][nc] != Cell.BLOCKED:
                    queue.append(((nr, nc), new_path))   

        return (None, deque())        

In [10]:
# Super class for all Probabilistic Bots
class ProbabilisticBot(Bot):
    def __init__(self, ship):
        super().__init__(ship)
        self.prob_matrix = self.init_prob_matrix()
        
    def init_prob_matrix(self):
        """Initialize Probability Matrix, originally giving all OPEN CELLS equal chance of having the leak"""
        board = self.ship.board
        n = len(board)
        num_open_cells = len(self.ship.open_cells)
        default_prob = (1 / num_open_cells)
        prob_matrix = [[0 for _ in range(n)] for _ in range(n)]
        for i in range(n):
            for j in range(n):
                if board[i][j] != Cell.BLOCKED:
                    prob_matrix[i][j] = default_prob
        return prob_matrix

    def get_beep_prob(self, cell, leak):
        """Calculate probability of hearing a beep given distance from leak"""
        cell_r, cell_c = cell
        leak_r, leak_c = leak
        dist_from_leak = self.ship.distances[cell_r, cell_c, leak_r, leak_c]
        return np.e ** ( (-self.alpha) * (dist_from_leak - 1) )

    def find_highest_prob(self): # Maybe problematic for multiple leaks
        """Get the cell with the current highest chance of containing the leak"""
        n = len(self.ship.board)
        max_prob = -np.inf
        min_dist = np.inf
        best_cell = None
        total = 0

        # Iterate over every potentially leaky cell, find cell(s) with best chance
        for r in range(n):
            for c in range(n):
                if self.ship.board[r][c] == Cell.BLOCKED:
                    continue
                    
                curr_prob = self.probability_at_pos((r, c))
                curr_dist = self.ship.distances[self.position][r][c]
                total += curr_prob
    
                # New Best Found
                if curr_prob > max_prob or (curr_prob == max_prob and curr_dist < min_dist):
                    max_prob = curr_prob
                    min_dist = curr_dist
                    best_cell = (r, c)

        return best_cell

    def probability_at_pos(self, pos):
        r, c = pos
        return self.prob_matrix[r][c]
        
    def dijkstra(self, src, goal):
        """Calculate the path of maximum probability to the goal node"""
        res, bestProb = deque(), 1
        maxHeap = PriorityQueue()
        maxHeap.put((-1*self.probability_at_pos(src), src, deque(), self.probability_at_pos(src), 1))
        vis = set()
        
        while maxHeap.qsize() != 0:
            prob, curr, path, sumProb, pathLen = maxHeap.get()
            new_path = path.copy()
            new_path.append(curr)
            vis.add(curr)

            if curr == goal and prob < bestProb:
                res = new_path
                bestProb = prob
                continue
            
            r, c = curr
            for i in range(4):
                nr = DY[i] + r
                nc = DX[i] + c
                if self.ship.is_valid(nr, nc) and (nr, nc) not in vis and self.ship.board[nr][nc] != Cell.BLOCKED:
                    newSumProb = sumProb + self.probability_at_pos((nr, nc))
                    len = pathLen + 1
                    newProb = -1*(newSumProb / len)
                    if self.probability_at_pos((nr, nc)) == 0.0:
                        newProb = 0
                    maxHeap.put((newProb, (nr, nc), new_path, newSumProb, len))
        return (abs(bestProb), res)

### Bot 1: 

All cells outside the initial detection square start with the possibility of containing the leak
(essentially, the bot starts having taken a sense action, and detected nothing). When the bot enters
a cell (or starts in a cell), however, if it is not the leak cell, it is marked as not containing the leak. If the
bot detects no leak in proximity - all cells in the detection square are marked as not containing the leak. If
the bot detects a leak in proximity - all cells in the detection square not already marked as not containing the
leak are marked as possibly containing the leak, and all cells outside the detection square are marked as not
containing the leak. Note that if a single square remains that is marked as containing the leak and all others
do not contain the leak - the leak must be in that one marked cell. Bot 1 acts in the following way:

– At any time that has not detected a leak, it will proceed to the nearest cell that might contain the leak
(breaking ties at random), enter it, and take the sense action, updating what it knows based on the results.

– At any time that a leak has been detected, it will proceed to the nearest cell that might contain the leak,
enter it, and in doing so either find the leak or rule that cell out.

This proceeds until the leak is discovered.

In [None]:
class BotOne(Bot):
    NAME = "bot_1"
    def __init__(self, ship, k = 1):
        super().__init__(ship)

        self.k = k
        self.unseen = set(ship.open_cells)
        self.not_leaky = set() 
        self.leaky = set() 
        self.future_path = deque()       
                
    def action(self):
        """Choose to move or sense"""
        pos = self.position
        
        if pos in self.ship.initial_leaks:
            self.ship.initial_leaks.remove(pos)
            self.sense(self.k)
            return True
        elif pos in self.leaky:
            self.not_leaky.add(pos)
            self.leaky.remove(pos)

        if self.leaky:
            self.move()
        else:
            if pos in self.not_leaky:
                self.move()
            else:
                self.sense(self.k)               
                
        return False

    def move(self):
        """Update self.position and cell enum to mark path"""
        curr = self.position
        r, c = curr
        self.path.append(curr)

        # If you've reached the destination, path plan
        if not self.future_path:        
            self.future_path = self.bfs(curr, self.leaky if self.leaky else self.unseen)[1]

        # Update path to next point in path
        next_position = self.future_path.popleft()
        nr, nc = next_position
        self.position = next_position
        
    def sense(self, k = 1):
        """Sense if leak is within (2k + 1)^2 square"""
        detection_square = []
        r, c = self.position

        leakExists = False

        # Run the Detection square
        for row in range(r - k, r + k + 1):
            for col in range (c - k, c + k + 1):
                if self.ship.is_valid(row, col) and self.ship.board[row][col] != Cell.BLOCKED:
                    loc = (row, col)
                    detection_square.append(loc)

                    if loc in self.ship.initial_leaks:
                        leakExists = True 

        if not leakExists:
            # If detection square isn't leaky, add all cells to not_leaky set and remove any that may exist in leaky set
            for cell in detection_square:
                self.not_leaky.add(cell)
                if cell in self.unseen:
                    self.unseen.remove(cell)
                if cell in self.leaky:
                    self.leaky.remove(cell)
        else:
            # If detection square is leaky, add all squares that aren't already checked into leaky set
            for cell in detection_square:
                if cell not in self.not_leaky:
                    self.leaky.add(cell)
                if cell in self.unseen:
                    self.unseen.remove(cell)

        self.temp = detection_square
            
        return leakExists

    def simulate(self):
        success = False

        while not success:
            res = self.action()
            if res:
                success = True

            self.timestep += 1

        return success, self.path, self.timestep

In [None]:
ship = Ship(seed=SEEDS[0], D = 30)
bot = BotOne(ship, 3)

ship.display(bot.position)
print("Bot position: %s" % (bot.position,))
print("Leak position: %s" % (ship.initial_leaks,))

In [None]:
res = bot.simulate()
print(str(res[0]) + " " + str(res[2]) + " steps")
ship.display(bot.path)

In [None]:
if False:
    print("100 tests:")
    sum = 0

    for seed in SEEDS: #np.random.randint(low=0, high=1000000000, size=100):
        seed = int(seed)
        ship = Ship(seed)
        bot = BotOne(ship, 2)
        res = bot.simulate()
        sum += res[2]
        print(str(seed) + ": " + str(res[0]) + " " + str(res[2]) + " steps")

    avg = sum / 100
    print("\naverage: " + str(avg) + " steps")

### Bot 2:

A bot of your own design that uses the data from the detection square.

Same as bot 1. However, instead of travelling to the nearest unseen cell, run a full BFS throughout the ship and find the optimal cell to scan at, which is given by the following formula: $score(cell) = \textrm{unseen in } detection_{cell}*.99^{\textrm{distance to cell}}$

In [None]:
class BotTwo(BotOne):
    NAME = "bot_2"
    def __init__(self, ship, k=1, ratio=.99):
        super().__init__(ship, k=k)
        self.ratio = ratio

    def compute_score(self, curr, path):
        """Compute the score for every cell in the grid"""
        new_seen = self.hypothetical_sense(self.k, coords=curr)[1]
        dist_traveled = max(1, len(path) - 1)

        #return float(new_seen) / dist_traveled
        return new_seen * (self.ratio**dist_traveled)

    def bfs_optimal(self, src, goal):
        """Shortest Path from SRC to GOAL -> Returns GOAL, PATH"""

        optimal_score = -1
        optimal_cell = None
        optimal_path = None

        # Add support for multiple goal states
        if isinstance(goal, tuple):
            state = goal
            goal = set()
            goal.add(state)
        
        queue = deque()
        queue.append((src, deque()))
        vis = set()
        while queue: 
            curr, path = queue.popleft()
            new_path = path.copy()
            new_path.append(curr)
            vis.add(curr)

            if curr in goal:
                temp_score = self.compute_score(curr, new_path)
                if temp_score > optimal_score:
                    #print(f"CHECKPOINT: {temp_score} {curr} {len(new_path)}")

                    optimal_score = temp_score
                    optimal_cell = copy.deepcopy(curr)
                    optimal_path = new_path
                # return (curr, new_path)
              
            r, c = curr
            for i in range(4):
                nr = DY[i] + r
                nc = DX[i] + c
                if self.ship.is_valid(nr, nc) and (nr, nc) not in vis and self.ship.board[nr][nc] != Cell.BLOCKED:
                    queue.append(((nr, nc), new_path))   

        return (optimal_cell, optimal_path, optimal_score)   

    def action(self):
        """Choose to move or sense"""
        pos = self.position
        if pos in self.ship.initial_leaks:
            self.ship.initial_leaks.remove(pos)
            self.sense(self.k)
            return True
        elif pos in self.leaky:
            self.not_leaky.add(pos)
            self.leaky.remove(pos)

        if self.leaky:
            self.move()
        else:
            if pos in self.not_leaky:
                if not self.future_path:
                    self.sense(self.k)
                    # print(f"ooga booga {self.position}")
                self.move()
            else:
                self.sense(self.k)           
                
        return False

    def move(self):
        # print("MOVE")
        self.timestep += 1

        """Update self.position and cell enum to mark path"""

        curr = self.position
        r, c = curr
        self.path.append(curr)
        
        if not self.future_path:
            if self.leaky:     
                self.future_path = self.bfs(curr, self.leaky)[1]
            else:
                self.future_path = self.bfs_optimal(curr, self.unseen)[1]

        # Update path to next point in path
        next_position = self.future_path.popleft()
        nr, nc = next_position
        self.position = next_position

    def sense(self, k = 1):
        # print("SENSE")
        self.timestep += 1

        """Sense if leak is within (2k + 1)^2 square"""
        detection_square = []
        r, c = self.position

        leakExists = False
        
        for row in range(r - k, r + k + 1):
            for col in range (c - k, c + k + 1):
                if self.ship.is_valid(row, col) and self.ship.board[row][col] != Cell.BLOCKED:
                    loc = (row, col)
                    detection_square.append(loc)

                    if loc in self.ship.initial_leaks:
                        leakExists = True 

        if not leakExists:
            # If detection square isn't leaky, add all cells to not_leaky set and remove any that may exist in leaky set
            for cell in detection_square:
                self.not_leaky.add(cell)
                if cell in self.unseen:
                    self.unseen.remove(cell)
                if cell in self.leaky:
                    self.leaky.remove(cell)
        else:
            # If detection square is leaky, add all squares that aren't already checked into leaky set
            for cell in detection_square:
                if cell not in self.not_leaky:
                    self.leaky.add(cell)
                if cell in self.unseen:
                    self.unseen.remove(cell)

        self.temp = detection_square
            
        return leakExists

    def simulate(self):
        success = False
        while not success:
            ## print(f"sim {self.timestep} {bot.position} sensed: {len(self.not_leaky)}; traveled, to travel: {len(self.path)}, {len(self.future_path)};")
            res = self.action()
            if res:
                success = True

            #self.timestep += 1

        return success, self.path, self.timestep

    def hypothetical_sense(self, k = 1, coords=None):
        """Sense if leak is within (2k + 1)^2 square"""
        detection_square = []
        if coords == None:
            r, c = self.position
        else: r, c = coords
        
        unseen = 0
        for row in range(r - k, r + k + 1):
            for col in range (c - k, c + k + 1):
                if self.ship.is_valid(row, col) and self.ship.board[row][col] != Cell.BLOCKED:
                    loc = (row, col)
                    detection_square.append(loc)
                    if loc in self.unseen:
                        unseen += 1

        return detection_square, unseen

    

In [None]:
seed = 42
k = 4
ship = Ship(seed=seed, D=30)
bot = BotTwo(ship, k)

ship.display(bot.position)
print("Bot position: %s" % (bot.position,))
print("Leak position: %s" % (ship.initial_leaks,))

In [None]:
# print(bot.traversal_order)

res = bot.simulate()
# print(bot.path)
for i, j in zip(bot.path[:-1], bot.path[1:]):
    if manhattan(i, j) > 1:
        pass
        # print(f"weird {i} {j}")
print(str(res[0]) + " " + str(res[2]) + " steps")

# print(len(bot.path))

ship.display(bot.path)

In [None]:
ship.display(bot.path)
print(bot.hypothetical_sense(bot.k, bot.position)[1])
print(bot.compute_score(bot.position, []))

In [None]:
if False:
    print("100 tests:")
    sum = 0

    counter = 0
    for seed in SEEDS: #np.random.randint(low=0, high=1000000000, size=100):
        counter += 1
        seed = int(seed)
        ship = Ship(seed, D=30)
        bot = BotTwo(ship, k=4)
        res = bot.simulate()
        sum += res[2]
        print(str(seed) + ": " + str(res[0]) + " " + str(res[2]) + " steps" + "\tavg: " + str(sum/counter))

    avg = sum / 100
    print("\naverage: " + str(avg) + " steps")

### Bot 3: 

All cells (other than the bot’s initial cell) start with equal probability of containing the leak. Bot 3
proceeds in the following well:

– At any time, the bot is going to move to the cell that has the highest probability of containing the leak
(breaking ties first by distance from the bot, then at random).

– After entering any cell, if the cell does not contain a leak, the bot will take the sense action. Based on
the results, it updates the probability of containing the leak for each cell∗.

– After the beliefs are updated, this repeats, the bot proceeding to the cell that as the highest probability
of containing the leak (breaking ties first by distance from the bot, then at random)

In [11]:
class BotThree(ProbabilisticBot):
    NAME = "bot_3"
    def __init__(self, ship, alpha):
        super().__init__(ship)
        self.alpha = alpha
        self.path = set()
        
    def move(self):
        """Move to the cell with the highest probability (of least distance)"""
        best_cell = self.find_highest_prob()
        r, c = best_cell
        #print("Moving from {} to {} with probability {}".format(self.position, best_cell, self.probability_at_pos((r, c))))
        path = self.plan_path(best_cell)
        for cell in path:
            self.position = cell
            r, c = cell
            if cell in self.ship.initial_leaks:
                return True

            if self.probability_at_pos((r, c)) != 0:
                self.update_no_leak()
                
            self.path.add(cell)      
            self.timestep += 1
        
        return False
        
    def sense(self):
        """Determine whether or not beep was heard from current position (also return the probability)"""
        r, c = self.position
        beep_prob = self.get_beep_prob(self.position, self.ship.initial_leaks[0])
        return random.uniform(0, 1) <= beep_prob

    def simulate(self):
        n = len(self.ship.board)
        success = False
        while not success:
            
            reached_end = self.move()
            #prob_sum = self.verify_probability()
            #print(prob_sum)
            
            if reached_end: 
                success = True
            else:
                # Sense for beep, update probabilities
                heard_beep = self.sense()
                self.update_based_on_beep(heard_beep)      
                self.timestep +=  1

        return success, self.path, self.timestep   

    def plan_path(self, best_cell):
        """Made into own function for easier child (Bot 4) implementation"""
        return self.bfs(self.position, best_cell)[1]
    
    def update_no_leak(self):
        """Update prior beliefs based on entering a cell with no leak"""
        n = len(self.ship.board)
        current_matrix = self.prob_matrix
        result_matrix = [[0 for _ in range(n)] for _ in range(n)]
        r, c = self.position

        not_curr = 1 - current_matrix[r][c] 
        for i in range(n):
            for j in range(n):
                result_matrix[i][j] = current_matrix[i][j] / not_curr
                
        result_matrix[r][c] = 0
        self.prob_matrix = result_matrix
        

    def update_based_on_beep(self, heard_beep):
        """Update prior beliefs based on hearing or not hearing a beep at a given cell"""
        n = len(self.ship.board)
        current_matrix = self.prob_matrix
        result_matrix = [[0 for _ in range(n)] for _ in range(n)]
        denominator = 0
        for i in range(n):
            for j in range(n):
                if self.ship.board[i][j] == Cell.BLOCKED:
                    continue
                    
                beep_given_here = self.get_beep_prob((i, j), self.position)
                numerator = (beep_given_here if heard_beep 
                             else 1 - beep_given_here) * current_matrix[i][j]
                
                denominator += numerator
                result_matrix[i][j] = numerator

        for i in range(n):
            for j in range(n):
                result_matrix[i][j] /= denominator
                    
        self.prob_matrix = result_matrix

    def verify_probability(self):
        """Verify that proabilities sum up to 1.00 after updates"""
        n = len(self.ship.board)
        total = 0
        for i in range(n):
            for j in range(n):
                total += self.probability_at_pos((i, j))

        return total

In [None]:
import time
start = time.time()
seed = random.randint(0, 1000000000)
ship = Ship(D = 30, seed = seed, testing = True)
print(seed)
bot = BotThree(ship, .03)
end = time.time()
ship.display(bot.position)
print("Time Elapsed: {}".format(end - start))
success, path, steps = bot.simulate()
ship.display(list(path))
steps

### Bot 4: 

A bot of your own design that uses the data from the observed beeps (or lack thereof). Note that Bot 4 may choose to say in place for a given timestep, if that is useful to do

In [12]:
class BotFour(BotThree):
    NAME = "bot_4"
    def __init__(self, ship, alpha):
        super().__init__(ship, alpha)

    def plan_path(self, best_cell):
        """Plan bath as shortest path of highest probability, avoiding previously visited cells"""
        prob, dijkstra = self.dijkstra(self.position, best_cell)
        bfs = self.bfs(self.position, best_cell)[1]
        sum, count = 0, 0
        for cell in bfs:
            sum += self.probability_at_pos(cell)
            count += 1
        bfs_prob = sum / count
        #print(str(prob) + " " + str(bfs_prob))
        if bfs_prob >= prob:
            return bfs
        return dijkstra

    def sense(self, multisense = False):
        """Determine whether or not beep was heard from current position (also return the probability)"""
        r, c = self.position
        beep_prob = self.get_beep_prob(self.position, self.ship.initial_leaks[0])
        beep_action = random.uniform(0, 1) <= beep_prob
        if beep_action and multisense:
            self.timestep += 1
            return (True, random.uniform(0, 1) <= beep_prob)
        return (beep_action, None)       

    def move(self):
        """Move to the cell with the highest probability (of least distance)"""
        best_cell = self.find_highest_prob()
        r, c = best_cell
        #print("Moving from {} to {} with probability {}".format(self.position, best_cell, self.probability_at_pos((r, c))))
        path = self.plan_path(best_cell)
        moves = 0
        for cell in path:
            self.position = cell
            r, c = cell
            if cell in self.ship.initial_leaks:
                return True

            if self.probability_at_pos((r, c)) != 0:
                self.update_no_leak()
                
            self.path.add(cell)      
            self.timestep += 1
            moves += 1

            if moves == 5: 
                heard, temp = self.sense()
                self.update_based_on_beep(heard)
                new_best = self.find_highest_prob()
                if self.probability_at_pos(new_best) > self.probability_at_pos(best_cell):
                    best_cell = new_best
                    path = self.plan_path(best_cell)
                moves = 0
                self.timestep += 1                    
        
        return False
    
    def simulate(self):
        n = len(self.ship.board)
        success = False
        while not success:
            
            reached_end = self.move()
            #prob_sum = self.verify_probability()
            #print(prob_sum)
            
            if reached_end: 
                success = True
            else:
                # Sense for beep, update probabilities
                heard_beep = self.sense(multisense = True)
                if heard_beep[0]:
                    self.update_based_on_beep(heard_beep[1]) 
                else:
                    self.update_based_on_beep(heard_beep[0])      
                self.timestep +=  1

        return success, self.path, self.timestep  

In [None]:
#seed = random.randint(0, 1000000000)
ship = Ship(D = 30, seed = seed)
print(seed)
bot = BotFour(ship, .03)
ship.display(bot.position)
success, path, steps = bot.simulate()
ship.display(list(path))
steps

### Bot 5: 

In the above sections, we considered a single leak. But suppose there are two leaks simultaneously that need plugging.
Assume that once one leak is located (and the square entered) it is plugged, leaving one remaining leak to locate
and plug.

#### Deterministic Leak Detection
– Bot 5: Bot 5 is exactly Bot 1, but removes the first leak once its cell is entered, and continues until the
second leak is also identified and plugged.

In [None]:
class BotFive(BotOne):
    NAME = "bot_5"
    def __init__(self, ship, k = 1):
        super().__init__(ship, k)
    
    def simulate(self):
        """Run exactly bot 1, but until 2 leaks are found"""
        success = [False, False]

        while not success[1]:
            res = self.action()
            if res:
                #print("Plugged Leak at {}".format(self.position))
                if not success[0]:
                    success[0] = True
                else:
                    success[1] = True

            self.timestep += 1

        return success, self.path, self.timestep

In [None]:
ship = Ship(seed=SEEDS[0], leaks=2, D = 30)
bot = BotFive(ship, k = 2)

ship.display(bot.position)
print("Bot position: %s" % (bot.position,))
print("Leak position: %s" % (ship.initial_leaks,))

In [None]:
res = bot.simulate()
print(str(res[0]) + " " + str(res[2]) + " steps")
ship.display(bot.path)

In [None]:
if False:
    print("100 tests:")
    sum = 0

    for seed in SEEDS: #np.random.randint(low=0, high=1000000000, size=100):
        seed = int(seed)
        ship = Ship(seed, 2)
        bot = BotFive(ship, 2)
        res = bot.simulate()
        sum += res[2]
        print(str(seed) + ": " + str(res[0]) + " " + str(res[2]) + " steps")

    avg = sum / 100
    print("\naverage: " + str(avg) + " steps")

### Bot 6: 

Modify your Bot 2 to better handle this two leak situation. Is there anything significant that
needs changing?

#### Deterministic Leak Detection
– Bot 6: Bot 6 is exactly Bot 2, but removes the first leak once its cell is entered, and continues until the second leak is also identified and plugged.

In [None]:
class BotSix(BotTwo):
    NAME = "bot_6"
    def __init__(self, ship, k = 1, ratio=.97):
        super().__init__(ship, k=k, ratio=ratio)
    
    def action(self):
        """Choose to move or sense"""
        pos = self.position
        if pos in self.ship.initial_leaks:
            self.ship.initial_leaks.remove(pos)
            self.sense(self.k)
            # print("wtf")
            return True
        elif pos in self.leaky:
            self.not_leaky.add(pos)
            self.leaky.remove(pos)

        if self.leaky:
            self.move()
        else:
            if pos in self.not_leaky:
                if not self.future_path:
                    self.sense(self.k)
                    # print(f"ooga booga {self.position}")
                self.move()
            else:
                self.sense(self.k)           
                
        return False

    def move(self):
        # print("MOVE")
        self.timestep += 1

        """Update self.position and cell enum to mark path"""

        curr = self.position
        r, c = curr
        self.path.append(curr)
        
        if not self.future_path:
            if self.leaky:     
                self.future_path = self.bfs(curr, self.leaky)[1]
            else:
                self.future_path = self.bfs_optimal(curr, self.unseen)[1]

        # Update path to next point in path
        next_position = self.future_path.popleft()
        nr, nc = next_position
        self.position = next_position

    def sense(self, k = 1):
        # print("SENSE")
        self.timestep += 1

        """Sense if leak is within (2k + 1)^2 square"""
        detection_square = []
        r, c = self.position

        leakExists = False
        
        for row in range(r - k, r + k + 1):
            for col in range (c - k, c + k + 1):
                if self.ship.is_valid(row, col) and self.ship.board[row][col] != Cell.BLOCKED:
                    loc = (row, col)
                    detection_square.append(loc)

                    if loc in self.ship.initial_leaks:
                        leakExists = True 

        if not leakExists:
            # If detection square isn't leaky, add all cells to not_leaky set and remove any that may exist in leaky set
            for cell in detection_square:
                self.not_leaky.add(cell)
                if cell in self.unseen:
                    self.unseen.remove(cell)
                if cell in self.leaky:
                    self.leaky.remove(cell)
        else:
            # If detection square is leaky, add all squares that aren't already checked into leaky set
            for cell in detection_square:
                if cell not in self.not_leaky:
                    self.leaky.add(cell)
                if cell in self.unseen:
                    self.unseen.remove(cell)

        self.temp = detection_square
            
        return leakExists

    def simulate(self):
        success = [False, False]

        while not success[1]:
            res = self.action()
            if res:
                #print("Plugged Leak at {}".format(self.position))
                if not success[0]:
                    success[0] = True
                else:
                    success[1] = True

            #self.timestep += 1

        return success, self.path, self.timestep

In [None]:
ship = Ship(seed=SEEDS[0], leaks=2, D = 30)
bot = BotSix(ship, k = 2)

ship.display(bot.position)
print("Bot position: %s" % (bot.position,))
print("Leak position: %s" % (ship.initial_leaks,))

In [None]:
res = bot.simulate()
print(str(res[0]) + " " + str(res[2]) + " steps")
ship.display(bot.path)

In [None]:
if False:
    print("100 tests:")
    sum = 0
    k=4
    ratio=.97

    counter = 0
    for seed in SEEDS: #np.random.randint(low=0, high=1000000000, size=100):
        counter += 1
        seed = int(seed)
        ship = Ship(seed, leaks=2, D=30)
        bot = BotSix(ship, k=k,ratio=ratio)
        res = bot.simulate()
        sum += res[2]
        print(str(seed) + ": " + str(res[0]) + " " + str(res[2]) + " steps" + "\tavg: " + str(sum/counter))

    avg = sum / 100
    print("\naverage: " + str(avg) + " steps")

### Bot 7: 
Bot 7 is exactly Bot 3, but removes the first leak once its cell is entered, and then continues
searching and updating until the second leak is identified and plugged.

In [13]:
class BotSeven(BotThree):
    NAME = "bot_7"
    def __init__(self, ship, alpha):
        super().__init__(ship, alpha)

    def simulate(self):
        n = len(self.ship.board)
        success = False
        leaks_found = [False, False]
        current_leak = 0
        leak_locations = self.ship.initial_leaks[:]
        while True:
            
            reached_end = self.move()      
            if reached_end:
                self.ship.initial_leaks.remove(self.position)                    
                self.update_no_leak()
                leaks_found[current_leak] = True
                current_leak += 1         
                if current_leak == 2:
                    break
                    
            # Sense for beep, update probabilities
            heard_beep = self.sense()
            self.update_based_on_beep(heard_beep)      
            self.timestep +=  1
            #prob_sum = self.verify_probability()
            #print(prob_sum)
            
        self.ship.initial_leaks = leak_locations
        return success, self.path, self.timestep  

In [None]:
seed = random.randint(0, 1000000000)
ship = Ship(seed = seed, leaks = 2)
print(seed)
print(ship.initial_leaks)
bot = BotSeven(ship, .5)
ship.display(bot.position)
success, path, steps = bot.simulate()
ship.display(list(path))
steps, ship.initial_leaks

### Bot 8:
Bot 8 is exactly Bot 3, except that the probability updates must be corrected to account for the
fact that there are two leaks - how?

In [14]:
class BotEight(BotThree):
    NAME = "bot_8"
    def __init__(self, ship, alpha):
        super().__init__(ship, alpha)

    def simulate(self):
        n = len(self.ship.board)
        success = False
        leaks_found = [False, False]
        current_leak = 0
        leak_locations = self.ship.initial_leaks[:]
        while True:
            
            reached_end = self.move()     
            # If you find the first leak, convert to bot 3
            if reached_end:
                self.ship.initial_leaks.remove(self.position)
                #print("Found leak, converting to bot 3")
                r, c = self.position
                
                new_probs = self.prob_matrix[r, c, :, :]  # All probabilities that factor Leak 1 in
                total_prob = np.sum(new_probs)            # Sum of all of these
                new_probs /= total_prob                   # Normalize       
                print("converting to bot four")
                new_bot = (BotThree(self.ship, self.alpha) if isinstance(self, BotEight)
                           else BotFour(self.ship, self.alpha )) # Spawn Bot 3/4 Instance
                new_bot.prob_matrix = new_probs           # Start Bot 3/4's probs as this slice
                
                new_bot.position = self.position
                new_bot.update_no_leak()       
                
                _, _, steps = new_bot.simulate()
                self.timestep += steps
                break
                    
            # Sense for beep, update probabilities
            if isinstance(self, BotNine):
                heard_beep = self.sense(multisense = True)
                if heard_beep[0]:
                    self.update_based_on_beep(heard_beep[1])
                else:
                    self.update_based_on_beep(heard_beep[0])
            else:
                heard_beep = self.sense()         
                self.update_based_on_beep(heard_beep)   
                
            self.timestep +=  1
            #prob_sum = self.verify_probability()
            #print(prob_sum)



            """
            
            # Sense for beep, update probabilities
                heard_beep = self.sense(multisense = True)
                if heard_beep[0]:
                    self.update_based_on_beep(heard_beep[1]) 
                else:
                    self.update_based_on_beep(heard_beep[0])      
            """
            
        self.ship.initial_leaks = leak_locations
        return success, self.path, self.timestep 

    def init_prob_matrix(self):
        """Initialize Probability Matrix, originally giving all OPEN CELLS equal chance of having the leak"""
        board = self.ship.board
        n = len(board)
        num_open_cells = len(self.ship.open_cells)
        default_prob = (1 / math.comb(num_open_cells, 2))
        prob_matrix = np.zeros((n, n, n, n))
        for i, j in self.ship.open_cells:
            for k, l in self.ship.open_cells:
                if (i, j) != (k, l):
                    prob_matrix[i, j, k, l] = default_prob
                    prob_matrix[k, l, i, j] = default_prob
                    
        return prob_matrix

    def joint_beep_prob(self, pos, leak_one, leak_two):
        """Compute probability of hearing a beep in `pos` given leak in both `leak_one` and `leak_two`"""
        heard_ij = self.get_beep_prob(pos, leak_one)
        heard_kl = self.get_beep_prob(pos, leak_two)
        return (1 - ((1 - heard_ij) * (1 - heard_kl)))

    def sense(self):
        """Determine whether or not beep was heard from current position (also return the probability)"""
        r, c = self.position
        leaks = self.ship.initial_leaks
        beep_prob = (self.get_beep_prob(self.position, leaks[0]) if len(leaks) == 1
                     else self.joint_beep_prob(self.position, leaks[0], leaks[1]))
        
        
        return (random.uniform(0, 1) <= beep_prob)

    def update_no_leak(self):
        """Update prior beliefs based on entering a cell with no leak"""
        board = self.ship.board
        n = len(board)
        current_kb = self.prob_matrix
        result_kb = np.zeros((n, n, n, n))
        r, c = self.position

        not_curr = 1 - self.probability_at_pos(self.position)
        for i in range(n):
            for j in range(n):
                if self.ship.board[i][j] == Cell.BLOCKED:
                    continue
                
                for k in range(i, n):
                    for l in range(j + 1, n) if i == k else range(n):
                        
                        if self.ship.board[k][l] == Cell.BLOCKED:
                            continue
                            
                        a, b = (i, j), (k, l)
                        if a == self.position or b == self.position:
                            result_kb[i, j, k, l] = 0
                            result_kb[k, l, i, j] = 0
                        else:
                            curr_prob = current_kb[i, j, k, l]
                            new_prob = curr_prob / not_curr
                            result_kb[i, j, k, l] = new_prob
                            result_kb[k, l, i, j] = new_prob
        
        self.prob_matrix = result_kb

    def update_based_on_beep(self, heard_beep):
        """Update prior beliefs based on hearing or not hearing a beep at a given cell"""
        board = self.ship.board
        n = len(board)
        current_kb = self.prob_matrix
        result_kb = np.zeros((n, n, n, n))
        denominator = 0

        # For every pair of open cells, update that pairs belief based on beep or no eep
        for i in range(n):
            for j in range(n):
                if board[i][j] == Cell.BLOCKED:
                    continue
                    
                for k in range(i, n):
                    for l in range(j + 1, n) if i == k else range(n):
                        if board[k][l] == Cell.BLOCKED:
                            continue
                            
                        a, b = (i, j), (k, l)
                        curr_prob = current_kb[i, j, k, l]
                        if curr_prob == 0:
                            continue

                        p_beep = self.joint_beep_prob(self.position, a, b)
                        numerator = (p_beep if heard_beep else 1 - p_beep) * curr_prob
                        result_kb[i, j, k, l] = numerator
                        result_kb[k, l, i, j] = numerator
                        denominator += numerator
                        
        result_kb /= denominator
        self.prob_matrix = result_kb   

    def probability_at_pos(self, pos):
        """Marginalize over every potential pair for cell `pos` to find it's individual prob"""
        prob = 0
        n = len(self.ship.board)
        curr_kb = self.prob_matrix
        r, c = pos
        for i in range(n):
            for j in range(n):
                prob += curr_kb[r, c, i, j]
            
        return prob

    def verify_probability(self):
        """Verify that Pair Probabilities sum up to 1.0 and marginal probs sum up to 2.0"""
        n = len(self.ship.board)
        total = 0
        for i in range(n):
            for j in range(n):
                if self.ship.board[i][j] == Cell.BLOCKED:
                    continue
                    
                for k in range(i, n):
                    for l in range(j + 1, n) if i == k else range(n):
                        if self.ship.board[k][l] == Cell.BLOCKED:
                            continue
                        total += self.prob_matrix[i, j, k, l]
        second = 0
        for i in range(n):
            for j in range(n):
                second += self.probability_at_pos((i, j))

        return total, second

    def leak_probabilities(self):
        """Print out the marginal probability of each leak as it collects data"""
        for i in self.ship.initial_leaks:
            print(self.probability_at_pos(i))

In [None]:
seed = random.randint(0, 1000000000)
ship = Ship(D = 30, seed = 148141254, leaks = 2)
print(seed)
print(ship.initial_leaks)
bot = BotEight(ship, 0.007142857)
ship.display(bot.position)
#len(bot.prob_matrix.probs.keys())

#bot.prob_matrix
success, path, steps = bot.simulate()
steps
#ship.display(list(path))
#steps, ship.initial_leaks

In [None]:
seed = random.randint(0, 1000000000)
ship = Ship(seed = seed, leaks = 2)
bot = BotEight(ship, .3)
ship.display()


## Bot 9:
A bot of your own design, that uses the corrected probability updates for there being two leaks.

In [15]:
class BotNine(BotEight):
    NAME = "bot_9"
    def __init__(self, ship, alpha):
        super().__init__(ship, alpha)

    def plan_path(self, best_cell):
        prob, dijkstra = self.dijkstra(self.position, best_cell)
        bfs = self.bfs(self.position, best_cell)[1]
        sum, count = 0, 0
        for cell in bfs:
            sum += self.probability_at_pos(cell)
            count += 1
        bfs_prob = sum / count
        #print(str(prob) + " " + str(bfs_prob))
        if bfs_prob >= prob:
            return bfs
        return dijkstra

    def move(self):
        """Move to the cell with the highest probability (of least distance)"""
        best_cell = self.find_highest_prob()
        r, c = best_cell
        #print("Moving from {} to {} with probability {}".format(self.position, best_cell, self.probability_at_pos((r, c))))
        path = self.plan_path(best_cell)
        moves = 0
        for cell in path:
            self.position = cell
            r, c = cell
            if cell in self.ship.initial_leaks:
                return True

            if self.probability_at_pos((r, c)) != 0:
                self.update_no_leak()
                
            self.path.add(cell)      
            self.timestep += 1
            moves += 1

            if moves == 5: 
                heard, temp = self.sense()
                self.update_based_on_beep(heard)
                new_best = self.find_highest_prob()
                if self.probability_at_pos(new_best) > self.probability_at_pos(best_cell):
                    best_cell = new_best
                    path = self.plan_path(best_cell)
                moves = 0
                self.timestep += 1                    
        
        return False
        
    def sense(self, multisense = False):
        """Determine whether or not beep was heard from current position (also return the probability)"""
        r, c = self.position
        leaks = self.ship.initial_leaks
        beep_prob = (self.get_beep_prob(self.position, leaks[0]) if len(leaks) == 1
                     else self.joint_beep_prob(self.position, leaks[0], leaks[1]))
                     
        beep_action = random.uniform(0, 1) <= beep_prob
        if beep_action and multisense:
            self.timestep += 1
            return (True, random.uniform(0, 1) <= beep_prob)
            
        return (beep_action, None)   


In [None]:
seed = random.randint(0, 1000000000)
ship = Ship(D = 20, seed = 148141254, leaks = 2)
print(seed)
print(ship.initial_leaks)
bot = BotNine(ship, 0.007142857)
ship.display(bot.position)
#len(bot.prob_matrix.probs.keys())

#bot.prob_matrix
success, path, steps = bot.simulate()
steps
#ship.display(list(path))
#steps, ship.initial_leaks

## Data Collection
- 100 Seeds
- Deterministic Bots 1/2/5/6, each must be tested with all 1 <= k <= 8
- Probabalistic Bots 3/4/7/8/9, each must be tested with all alpha .1 <= alpha <= .5

In [17]:
# Load Ship
def load_ship(seed, num_leaks):
    DIR_PATH = "../boards/"
    board_path = DIR_PATH + f"board_{seed}.csv"
    distance_path = DIR_PATH + f"distances_{seed}.npy"
    distances = np.load(distance_path)

    with open(board_path, "r") as f:
        reader = csv.reader(f)
        board = list(reader)
        board = [[Cell(int(cell)) for cell in row] for row in board]

    return Ship(seed = seed, 
                leaks = num_leaks, 
                D = 30,
                load_board = board, 
                load_distances = distances)

In [18]:
import time
import os

alpha_values = np.linspace(0, .1, 15)
k_values = range(1, 9)

# Store the classes of each bot so they can be differentiated when running simulations
# deterministic_bots = [BotOne, BotTwo, BotFive, BotSix]
# probabalistic_bots = [BotThree, BotFour, BotSeven, BotEight, BotNine]

probabalistic_bots = [BotNine]
#deterministic_bots = [BotTwo]

DATA_PATH = "../data/"

def simulate_bots(bots, values, seed):
    for bot_class in bots:
        bot_number = int(bot_class.NAME.split("_")[1])
        num_leaks = 1 if bot_number < 5 else 2
        bot_file = DATA_PATH + bot_class.NAME + "(1).csv"
        for param in values:
            if param == 0: 
                continue
                
            start = time.time()

            ship = load_ship(seed, num_leaks)
            bot = bot_class(ship, param)
            success, path, steps = bot.simulate()
            
            end = time.time()

            data = (seed, bot_class.NAME, param, steps)
            with open(bot_file, mode = "a", newline = "") as f:
                writer = csv.writer(f)
                writer.writerow(data)
                      
            print("({}, {}, {}, {}, {})".format(seed, bot_class.NAME, param, steps, end - start))


for i, seed in enumerate(SEEDS[0:16]):
    print("Simulating Seed {}: {}".format(i, seed))
    #simulate_bots(deterministic_bots, k_values, seed)
    simulate_bots(probabalistic_bots, alpha_values, seed)

Simulating Seed 0: 148141254
converting to bot four
(148141254, bot_9, 0.0071428571428571435, 169, 12.138997316360474)
converting to bot four
(148141254, bot_9, 0.014285714285714287, 791, 28.863045692443848)
converting to bot four
(148141254, bot_9, 0.02142857142857143, 114, 144.05255937576294)
converting to bot four
(148141254, bot_9, 0.028571428571428574, 148, 10.219405889511108)
converting to bot four
(148141254, bot_9, 0.03571428571428572, 152, 10.645976066589355)
converting to bot four
(148141254, bot_9, 0.04285714285714286, 172, 9.68723201751709)
converting to bot four
(148141254, bot_9, 0.05, 172, 10.040601253509521)
converting to bot four
(148141254, bot_9, 0.05714285714285715, 172, 10.828520774841309)
converting to bot four
(148141254, bot_9, 0.0642857142857143, 172, 10.688075542449951)
converting to bot four
(148141254, bot_9, 0.07142857142857144, 294, 10.336910247802734)
converting to bot four
(148141254, bot_9, 0.07857142857142858, 126, 27.45141053199768)
converting to bot 

KeyboardInterrupt: 

In [None]:
ship.display(bot.path)