In [1]:
import numpy as np
from time import time

# Graph Node Class
 Represents a node in the search graph.  

    - parent: The node from which this node was generated.  

    - state: The 7x7 numpy array representing the marble board.  

    - pcost: Path cost (g(n)).  

    - hcost: Heuristic cost (h(n)).  

    - cost: Total cost (f(n) = g(n) + h(n)).  
    

In [None]:
class Node:
    def __init__(self, parent, state, pcost, hcost, action = None):
        self.parent = parent
        self.state = state
        self.action = action
        self.pcost = pcost
        self.hcost = hcost
        self.cost = pcost + hcost
    
    def __hash__(self):
        # Hash based on the flattened state array
        return hash(str(self.state.flatten()))
    
    def __str__(self):
        return str(self.state)
    
    def __eq__(self, other):
        # Equality based on state content
        return np.array_equal(self.state, other.state)
    
    def __ne__(self, other):
        return not self.__eq__(other)

# Priority Queue Class: 
The Priority Queue Class here is used to store the nodes along with the cost, and pop the node having the least cost for BFS

In [None]:
class PriorityQueue():
    
    def __init__(self):
        self.queue = []
        # Stores hashes of states currently in the queue to prevent duplicates
        self.hashes = {} 
        
    def push(self, node):
        # Only add a node if its state hash is not already in the frontier
        if hash(node) not in self.hashes:
            self.hashes[hash(node)] = 1
            self.queue.append(node)
    
    def pop(self):
        # Find and pop the node with the least total cost (node.cost)
        if not self.queue:
            raise IndexError("pop from empty PriorityQueue")
            
        state_cost = float('inf')
        index = -1
        for i in range(len(self.queue)):
            if self.queue[i].cost < state_cost:
                state_cost = self.queue[i].cost
                index = i
        
        return self.queue.pop(index)
    
    def is_empty(self):
        return len(self.queue) == 0
    
    def __str__(self):
        l = [str(i.state) for i in self.queue]
        return str(l)
    
    def __len__(self):
        return len(self.queue)

# Environment

In [None]:
class Environment():
    def __init__(self, start_state = None, goal_state = None):
        self.actions = [1, 2, 3, 4] # Placeholder for move directions
        if goal_state is None:
            self.goal_state = self.generate_goal_state()
        else:
            self.goal_state = goal_state
        if start_state is None:
            self.start_state = self.generate_start_state()
        else:
            self.start_state = start_state
    
    def generate_start_state(self):
        # Initializes the board with all marbles except the center hole (3,3)
        start = np.zeros((7, 7))
        # Mark invalid corners (-1)
        for i in (0, 1, 5, 6):
            for j in (0, 1, 5, 6):
                start[i][j] = -1
        # Mark all valid spots with marbles (1)
        for i in range(7):
            for j in range(7):
                if start[i][j] != -1:
                    start[i][j] = 1
        # Set the center hole to empty (0)
        start[3][3] = 0
        return start
    
    def generate_goal_state(self):
        # Goal state: only one marble at the center (3, 3)
        goal = np.zeros((7, 7))
        for i in (0, 1, 5, 6):
            for j in (0, 1, 5, 6):
                goal[i][j] = -1
        # Mark all valid spots as empty (0)
        for i in range(7):
            for j in range(7):
                if goal[i][j] != -1:
                    goal[i][j] = 0
        # Set the center spot to a marble (1)
        goal[3][3] = 1
        return goal

    def get_start_state(self):
        return self.start_state
    
    def get_goal_state(self):
        return self.goal_state
    
    def get_next_states(self, state):
        new_states = []
        holes = []
        
        # Find all empty holes (0)
        for i in range(7):
            for j in range(7):
                if state[i][j] == 0:
                    holes.append((i, j))
                    
        # Check for possible jumps into each hole
        for x_hole, y_hole in holes:
            
            # Vertical Jumps: Marble at x_hole-2 jumps to x_hole
            if x_hole > 1:
                if state[x_hole - 2][y_hole] == 1 and state[x_hole - 1][y_hole] == 1:
                    new_state = state.copy()
                    new_state[x_hole][y_hole] = 1      # Hole becomes marble
                    new_state[x_hole - 2][y_hole] = 0  # Marble becomes hole
                    new_state[x_hole - 1][y_hole] = 0  # Middle marble is removed
                    action = f'({x_hole - 2}, {y_hole}) -> ({x_hole}, {y_hole})'
                    new_states.append((new_state, action))
            
            # Vertical Jumps: Marble at x_hole+2 jumps to x_hole
            if x_hole < 5:
                if state[x_hole + 2][y_hole] == 1 and state[x_hole + 1][y_hole] == 1:
                    new_state = state.copy()
                    new_state[x_hole][y_hole] = 1      
                    new_state[x_hole + 2][y_hole] = 0  
                    new_state[x_hole + 1][y_hole] = 0  
                    action = f'({x_hole + 2}, {y_hole}) -> ({x_hole}, {y_hole})'
                    new_states.append((new_state, action))
            
            # Horizontal Jumps: Marble at y_hole-2 jumps to y_hole
            if y_hole > 1:
                if state[x_hole][y_hole - 2] == 1 and state[x_hole][y_hole - 1] == 1:
                    new_state = state.copy()
                    new_state[x_hole][y_hole] = 1      
                    new_state[x_hole][y_hole - 2] = 0  
                    new_state[x_hole][y_hole - 1] = 0  
                    action = f'({x_hole}, {y_hole - 2}) -> ({x_hole}, {y_hole})'
                    new_states.append((new_state, action))
            
            # Horizontal Jumps: Marble at y_hole+2 jumps to y_hole
            if y_hole < 5:
                if state[x_hole][y_hole + 2] == 1 and state[x_hole][y_hole + 1] == 1:
                    new_state = state.copy()
                    new_state[x_hole][y_hole] = 1      
                    new_state[x_hole][y_hole + 2] = 0  
                    new_state[x_hole][y_hole + 1] = 0  
                    action = f'({x_hole}, {y_hole + 2}) -> ({x_hole}, {y_hole})'
                    new_states.append((new_state, action))
                    
        return new_states
    
    def reached_goal(self, state):
        # Compare current state with goal state
        return np.array_equal(state, self.goal_state)

# Heuristic Functions

### Heuristic 0 (Null Heuristic for A*)

In [5]:
def heuristic0(curr_state):
    return 0

### Heuristic 1 (Manhattan Distance)

In [12]:
def heuristic1(curr_state):
    cost = 0
    center_row, center_col = 3, 3
    for i in range(7):
        for j in range(7):
            if curr_state[i][j] == 1:
                cost += abs(i - center_row) + abs(j - center_col)
    return cost

### Heuristic 2 (Exponential Distance)

In [6]:
def heuristic2(curr_state):
    cost = 0
    center_row, center_col = 3, 3
    for i in range(7):
        for j in range(7):
            if curr_state[i][j] == 1:
                h_dist = abs(i - center_row)
                v_dist = abs(j - center_col)
                cost += 2 ** (max(h_dist, v_dist))
    return cost


# Agent Class (A-star Search)

In [7]:
class Agent:
    def __init__(self, env, heuristic):
        self.frontier = PriorityQueue()
        self.explored = dict()
        self.start_state = env.get_start_state()
        self.goal_state = env.get_goal_state()
        self.env = env
        self.goal_node = None
        self.heuristic = heuristic
    
    def run(self):
        h_start = self.heuristic(self.start_state)
        init_node = Node(parent = None, state = self.start_state, pcost = 0, hcost = h_start)
        self.frontier.push(init_node)
        start = time()
        
        while not self.frontier.is_empty():
            curr_node = self.frontier.pop()
            
            # Skip if already explored
            if hash(curr_node) in self.explored:
                continue
            
            self.explored[hash(curr_node)] = curr_node
            
            if self.env.reached_goal(curr_node.state):
                print("Reached goal!")
                self.goal_node = curr_node
                break

            next_states_with_actions = self.env.get_next_states(curr_node.state)
            
            for state_data in next_states_with_actions:
                new_state = state_data[0]
                action = state_data[1]
                
                hcost = self.heuristic(new_state)
                pcost = curr_node.pcost + 1 # Path cost increases by 1 per move
                
                node = Node(parent=curr_node, state=new_state, pcost=pcost, hcost=hcost, action=action)
                self.frontier.push(node)

        end = time()
        time_taken = end - start
        print(time_taken)
        return time_taken
    
    def print_nodes(self):
        node = self.goal_node
        l = []
        while node is not None:
            l.append(node)
            node = node.parent

        step = 1
        for node in l[::-1]:
            print("Current step: ", step)
            if node.action is not None:
                print(node.action)
            step += 1


# A-star Search with Heuristic 2

In [8]:
t = 0
runs = 5
for i in range(runs):
    agent = Agent(Environment(), heuristic2)
    t += agent.run()

print(f"Average time {t / runs}")
print(f"Total number of nodes explored: {len(agent.explored)}")
print(f"Total number of nodes in frontier: {len(agent.frontier)}")

Reached goal!
29.36630415916443
Reached goal!
29.212907314300537
Reached goal!
29.161952257156372
Reached goal!
29.217090368270874
Reached goal!
28.99008798599243
Average time 29.18966841697693
Total number of nodes explored: 33353
Total number of nodes in frontier: 213


# Agent Class (Best First Search)

In [9]:
class BestFirstAgent:
    
    def __init__(self, env, heuristic):
        self.frontier = PriorityQueue()
        self.explored = dict()
        self.start_state = env.get_start_state()
        self.goal_state = env.get_goal_state()
        self.env = env
        self.goal_node = None
        self.heuristic = heuristic
    
    def run(self):
        h_start = self.heuristic(self.start_state)
        # pcost is set to 0 for Best First Search
        init_node = Node(parent = None, state = self.start_state, pcost = 0, hcost = h_start)
        self.frontier.push(init_node)
        start = time()
        
        while not self.frontier.is_empty():
            curr_node = self.frontier.pop()

            if hash(curr_node) in self.explored:
                continue
                
            self.explored[hash(curr_node)] = curr_node

            if self.env.reached_goal(curr_node.state):
                print("Reached goal!")
                self.goal_node = curr_node
                break

            next_states_with_actions = self.env.get_next_states(curr_node.state)
            
            for state_data in next_states_with_actions:
                new_state = state_data[0]
                action = state_data[1]
                
                hcost = self.heuristic(new_state)
                pcost = 0 # Crucial: pcost=0 for Best First Search
                
                node = Node(parent=curr_node, state=new_state, pcost=pcost, hcost=hcost, action=action)
                self.frontier.push(node)
        
        end = time()
        time_taken = end - start
        print(time_taken)
        return time_taken
    
    def print_nodes(self):
        node = self.goal_node
        l = []
        while node is not None:
            l.append(node)
            node = node.parent

        step = 1
        for node in l[::-1]:
            print("Current step: ",step)
            if node.action is not None:
                print(node.action)
            step+=1

# Best First Search with Heuristic 2

In [10]:
t = 0
runs = 5
for i in range(runs):
    agent = BestFirstAgent(Environment(), heuristic2) 
    t += agent.run()
    
print(f"Average time {t / runs}")
print(f"Total number of nodes explored: {len(agent.explored)}")
print(f"Total number of nodes in frontier: {len(agent.frontier)}")

Reached goal!
36.75907325744629
Reached goal!
79.66413688659668
Reached goal!
80.28227162361145
Reached goal!
62.54608678817749
Reached goal!
31.870707988739014
Average time 58.22445530891419
Total number of nodes explored: 35997
Total number of nodes in frontier: 133


# Best First Search with Heuristic 1

In [13]:
tm = 0
runs = 5
for i in range(runs):
    agent = BestFirstAgent(Environment(), heuristic1) 
    tm += agent.run()
    
print(f"Average time {tm / runs}")
print(f"Total number of nodes explored: {len(agent.explored)}")
print(f"Total number of nodes in frontier: {len(agent.frontier)}")

Reached goal!
139.57226729393005
Reached goal!
162.16319155693054
Reached goal!
265.655553817749
Reached goal!
183.54784274101257
Reached goal!
124.5057008266449
Average time 175.0889112472534
Total number of nodes explored: 139663
Total number of nodes in frontier: 146
