# Sokoban AI Solver

"""
This module implements an AI-based motion planning agent for solving 
weighted Sokoban puzzles. The agent uses informed search algorithms to 
determine optimal sequences of actions to move weighted boxes to their 
goal positions in a warehouse-like environment.

The module adheres to a predefined interface expected by external components, 
including puzzle representations and evaluation scripts.

Key features:
- Support for dynamic action costs based on box weights
- Integration with a warehouse simulation environment
- Compatibility with external puzzle and search modules

Note: Function and class interfaces must remain unchanged to ensure 
interoperability with the system. This module was originally developed 
in an academic setting but is presented here as a demonstration of AI planning techniques.

Last updated: 2025-03-25
"""


In [15]:
# Set up and import useful functionality 

# https://saturncloud.io/blog/how-to-import-jupyter-notebooks-to-another-jupyter-notebook/
!pip install import_ipynb # should only need to run once
import import_ipynb # allows you to use other *.ipynb files
import search       # such as search tools written by course textbook and similar to week 3
import sokoban     # defines the problem domain
import itertools # please see https://docs.python.org/3/library/itertools.html
import time # please investigate external files

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -





Taboo cells:

    Identify the taboo cells of a warehouse. A "taboo cell" is by definition
    a cell inside a warehouse such that whenever a box get pushed on such 
    a cell then the puzzle becomes unsolvable. 
    
    Cells outside the warehouse are not taboo. It is a fail to tag one as taboo.
    
    When determining the taboo cells, you must ignore all the existing boxes, 
    only consider the walls and the target  cells.  
    Use only the following rules to determine the taboo cells;
     Rule 1: if a cell is a corner and not a target, then it is a taboo cell.
     Rule 2: all the cells between two corners along a wall are taboo if none of 
             these cells is a target.
    
    @param warehouse: 
    
        a Warehouse object with a worker inside the warehouse

    @return
    
       A string representing the warehouse with only the wall cells marked with 
       a '#' and the taboo cells marked with a 'X'.  
       The returned string should NOT have marks for the worker, the targets,
       and the boxes.  


In [18]:
# Hint: This is a great place to start the assignment
# Think about the 'out there' world in terms of a grid. 
# Inspect some of the given warehouses to determine their properties, e.g. are they all square or rectangular
# Are there certain cells in the grid that we should not consider in the problem domain, i.e. they are unreachable by the agent?
# Identify the subset of cells that should be investigated to whether they are Taboo or not.

# Move away from the computer for 15 minutes so that you can get a pen and paper: draw lots of sketches, diagrams and pseudocode (and then come back!)
# Write the pseudocode as comments

# Then build actual code under the comments testing as you go to check each line works as intended where possible
# Programming hint: Either write all code in function below or create and call new functions (e.g. CellReachability(*argument*))
# a small number of well-commented new functions is likely the easiest approach for clarity of code, which helps debugging
def is_corner(r, c, grid):
    """Check if a cell at (r, c) is a corner against walls."""
    if grid[r][c] == '#':
        return False
    wall = lambda x, y: 0 <= x < len(grid) and 0 <= y < len(grid[0]) and grid[x][y] == '#'
    return ((wall(r-1, c) and wall(r, c-1)) or
            (wall(r-1, c) and wall(r, c+1)) or
            (wall(r+1, c) and wall(r, c-1)) or
            (wall(r+1, c) and wall(r, c+1)))
    
def get_reachable_area(grid, warehouse):
    """Return all grid positions reachable from the worker's location (flood-fill)."""
    num_rows, num_cols = len(grid), len(grid[0])
    reachable_cells = set()

    # Convert worker position from (x, y) to (row, col)
    start_position = warehouse.worker[::-1]

    # DFS flood-fill starting from the worker's position
    stack = [start_position]
    while stack:
        current_row, current_col = stack.pop()

        if (current_row, current_col) in reachable_cells:
            continue
        if not (0 <= current_row < num_rows and 0 <= current_col < num_cols):
            continue
        if grid[current_row][current_col] == '#':
            continue

        reachable_cells.add((current_row, current_col))

        # Add adjacent cells to explore
        for delta_row, delta_col in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
            neighbour_row = current_row + delta_row
            neighbour_col = current_col + delta_col
            stack.append((neighbour_row, neighbour_col))

    return reachable_cells

def mark_corners(grid, reachable_cells, warehouse):
    """Identify Rule 1 taboo cells: reachable corners that are not targets."""
    taboo_cells = set()
    num_rows, num_cols = len(grid), len(grid[0])

    for row in range(num_rows):
        for col in range(num_cols):
            if (row, col) not in reachable_cells:
                continue
            if (col, row) in warehouse.targets:
                continue
            if is_corner(row, col, grid):
                taboo_cells.add((row, col))

    return taboo_cells

def mark_taboo_lines(grid, taboo_cells, warehouse):
    """Apply Rule 2: mark taboo lines between taboo corners (horizontal & vertical)."""
    num_rows, num_cols = len(grid), len(grid[0])
    targets = warehouse.targets

    # ---- Horizontal scanning ----
    for row in range(num_rows):
        col = 0
        while col < num_cols:
            if (row, col) in taboo_cells: # Line of taboo cells must start at a taboo corner cell
                next_col = col + 1
                while next_col < num_cols and (row, next_col) not in taboo_cells:
                    next_col += 1

                # Check if line between two taboo corners is clear and not on targets
                if next_col < num_cols:
                    if all( # All conditions must be met if a line is to be drawn
                        (grid[row][between_col] == ' ') 
                        and ((between_col, row) not in targets) 
                        and (grid[row+1][between_col] == '#' or grid[row-1][between_col] == '#')
                        for between_col in range(col + 1, next_col)
                    ):
                        for between_col in range(col + 1, next_col):
                            grid[row][between_col] = 'X'
                col = next_col
            else:
                col += 1

    # ---- Vertical scanning ----
    for col in range(num_cols):
        row = 0
        while row < num_rows:
            if (row, col) in taboo_cells:
                next_row = row + 1
                while next_row < num_rows and (next_row, col) not in taboo_cells:
                    next_row += 1

                # Check if line between two taboo corners is clear and not on targets
                if next_row < num_rows:
                    if all( 
                        (grid[between_row][col] == ' ') 
                    and ((col, between_row) not in targets) 
                    and (grid[between_row][col+1] == '#' or grid[between_row][col-1] == '#' )
                        for between_row in range(row + 1, next_row)
                    ):
                        for between_row in range(row + 1, next_row):
                            grid[between_row][col] = 'X'
                row = next_row
            else:
                row += 1

    return grid

def format_output(grid_with_taboo):
    """Convert the grid into a string showing only walls ('#') and taboo cells ('X').
    """
    formatted_output = ''
    for row in grid_with_taboo:
        formatted_row = ''.join(cell if cell in ('#', 'X') else ' ' for cell in row).rstrip()
        formatted_output += formatted_row + '\n'
    return formatted_output

def taboo_cells(warehouse):
    """Main function to identify taboo cells using only walls and targets."""
    
    grid = [list(row) for row in warehouse.__str__().splitlines()]
    x, y = warehouse.worker
    grid[y][x] = ' '
    rows, cols = len(grid), max(len(row) for row in grid)
    
    for row in grid:
        row.extend([' '] * (cols - len(row))) # Make grid uniform in size

    reachable = get_reachable_area(grid, warehouse)
    taboo = mark_corners(grid, reachable, warehouse) # taboo variable uses properties of a set to avoid duplicates
    
    for r, c in taboo:
        grid[r][c] = 'X'
        
    mark_taboo_lines(grid, taboo, warehouse)
    
    return format_output(grid)

In [20]:
wh = sokoban.Warehouse()
wh.load_warehouse("./warehouses/warehouse_11.txt")
answer = taboo_cells(wh)
print(wh,"\n")
print(answer)

  ###### 
  #    # 
  # ##@##
### # $ #
# ..# $ #
#       #
#  ######
####      

  ######
  #XXXX#
  # ## ##
### #X X#
#X  #  X#
#X     X#
#XX######
####




    An instance of the class 'SokobanPuzzle' represents a Sokoban puzzle.
    An instance contains information about the walls, the targets, the boxes
    and the worker.

    Your implementation should be fully compatible with the search functions of 
    the provided module 'search.ipynb'

In [22]:
class SokobanPuzzle(search.Problem):
    def __init__(self, warehouse):
        self.warehouse = warehouse
        self.walls = set(warehouse.walls)
        self.targets = set(warehouse.targets)
        self.weights = warehouse.weights
        self.taboo = set(sokoban.find_2D_iterator(taboo_cells(warehouse).splitlines(), 'X'))

        initial_state = (warehouse.worker, tuple(warehouse.boxes))
        self.initial = initial_state
        self.goal = tuple(warehouse.targets)  # for reference, but not used in goal_test

        super().__init__(self.initial)

    def actions(self, state):
        """Return list of legal moves: 'Up', 'Down', 'Left', 'Right'."""
        directions = {'Up': (0, -1), 'Down': (0, 1), 'Left': (-1, 0), 'Right': (1, 0)}
        worker, boxes = state
        valid_actions = []

        for action, (dx, dy) in directions.items():
            next_pos = (worker[0] + dx, worker[1] + dy)
            if next_pos in self.walls:
                continue
            if next_pos in boxes:
                box_new_pos = (next_pos[0] + dx, next_pos[1] + dy)
                if box_new_pos in self.walls or box_new_pos in boxes or box_new_pos in self.taboo:
                    continue
                valid_actions.append(action)
            else:
                valid_actions.append(action)

        return valid_actions

    def result(self, state, action):
        """Return the new state after applying an action."""
        directions = {'Up': (0, -1), 'Down': (0, 1), 'Left': (-1, 0), 'Right': (1, 0)}
        dx, dy = directions[action]
        worker, boxes = state
        new_worker = (worker[0] + dx, worker[1] + dy)
        new_boxes = list(boxes)

        if new_worker in boxes:
            box_idx = new_boxes.index(new_worker)
            new_box_pos = (new_worker[0] + dx, new_worker[1] + dy)
            new_boxes[box_idx] = new_box_pos

        return (new_worker, tuple(new_boxes))

    def goal_test(self, state):
        """Return True if all boxes are on target locations."""
        _, boxes = state
        return set(boxes) == self.targets

    def path_cost(self, c, state1, action, state2):
        """Cost = 1 + box weight if box was pushed, otherwise 1."""
        worker1, boxes1 = state1
        worker2, _ = state2

        dx = worker2[0] - worker1[0]
        dy = worker2[1] - worker1[1]
        moved_to = (worker1[0] + dx, worker1[1] + dy)

        if moved_to in boxes1:
            box_idx = boxes1.index(moved_to)
            weight = self.weights[box_idx]
            return c + 1 + weight

        return c + 1

    def h(self, node):
        """
        Heuristic: sum of manhattan distances from each box to its nearest target.
        Doesn't consider weights yet. Admissible.
        """
        _, boxes = node.state
        total = 0
        for box in boxes:
            distances = [abs(box[0] - tx) + abs(box[1] - ty) for (tx, ty) in self.targets]
            total += min(distances)
        return total

In [27]:
def test_sokoban_puzzle(warehouse_file):
    print(f"Loading warehouse from: {warehouse_file}")


    wh = sokoban.Warehouse()
    wh.load_warehouse(warehouse_file)
    print("Initial Warehouse:")
    print(wh)

    # Create the SokobanProblem instance
    problem = SokobanPuzzle(wh)

    # Solve using A* search
    print("\nSearching for solution...")
    solution_node = search.astar_graph_search(problem)

    if solution_node is None:
        print("No solution found.")
        return

    # Print results
    actions = solution_node.solution()
    cost = solution_node.path_cost

    print("\n Solution found!")
    print(f"Action sequence: {actions}")
    print(f"Total cost: {cost}")
    print("\nFinal state:")
    final_state = solution_node.state
    print(f"Worker: {final_state[0]}")
    print(f"Boxes: {final_state[1]}")

    print(f"Taboo: {problem.taboo}")
test_sokoban_puzzle('./warehouses/warehouse_01_a.txt')

Loading warehouse from: ./warehouses/warehouse_01_a.txt
Initial Warehouse:
####  
# .#  
#  ###
#*@  #
#  $ #
#  ###
####  

Searching for solution...

 Solution found!
Action sequence: ['Down', 'Left', 'Up', 'Right', 'Right', 'Right', 'Down', 'Left', 'Up', 'Left', 'Left', 'Down', 'Down', 'Right', 'Up', 'Left', 'Up', 'Right', 'Up', 'Up', 'Left', 'Down', 'Right', 'Down', 'Down', 'Right', 'Right', 'Up', 'Left', 'Down', 'Left', 'Up', 'Up']
Total cost: 73

Final state:
Worker: (2, 2)
Boxes: ((1, 3), (2, 1))
Taboo: {(4, 4), (1, 5), (4, 3), (1, 1), (2, 5)}


    Determine if the sequence of actions listed in 'action_seq' is legal or not.
    
    Important notes:
      - a legal sequence of actions does not necessarily solve the puzzle.
      - an action is legal even if it pushes a box onto a taboo cell.
        
    @param warehouse: a valid Warehouse object

    @param action_seq: a sequence of legal actions.
           For example, ['Left', 'Down', Down','Right', 'Up', 'Down']
           
    @return
        The string 'Impossible', if one of the action was not valid.
           For example, if the agent tries to push two boxes at the same time,
                        or push a box into a wall.
        Otherwise, if all actions were successful, return                 
               A string representing the state of the puzzle after applying
               the sequence of actions.  This must be the same string as the
               string returned by the method  Warehouse.__str__()

In [7]:
# Problem domains addressed by AI have *hard* and *soft* constraints
# Identify the hard constraints in this problem's definition

def move(pos, direction):
    direction = direction.lower()
    if direction == 'right':
        return (pos[0] + 1, pos[1])
    elif direction == 'left':
        return (pos[0] - 1, pos[1])
    elif direction == 'up':
        return (pos[0], pos[1] - 1)
    elif direction == 'down':
        return (pos[0], pos[1] + 1)

def condition_valid(action, new_pos, grid):
    x,y = new_pos
    new_pos_char = grid[y][x]

    if(new_pos_char == '#'):
        return False
    if(new_pos_char == '$' or new_pos_char == '*'):
        neighbour_x, neighbour_y = move(new_pos, action)
        # Cannot push a box into another box or wall
        if ((grid[neighbour_y][neighbour_x] == '$') or (grid[neighbour_y][neighbour_x] == '#') or (grid[neighbour_y][neighbour_x] == '*')):
            return False

    return True

def update_grid(action, new_pos, grid, worker):
    x, y = new_pos
    worker_x, worker_y = worker
    if(grid[y][x] == '$' or grid[y][x] == '*'):
        neighbour_x, neighbour_y = move(new_pos, action)
        if(grid[neighbour_y][neighbour_x] == '.'): # Is the box being pushed onto a goal?
            grid[neighbour_y][neighbour_x] = '*'
        else:
            grid[neighbour_y][neighbour_x] = '$'
    if(grid[y][x] == '.' or grid[y][x] == '*'):
        grid[y][x] = '!'
    else:
        grid[y][x] = '@'
    if(grid[worker_y][worker_x] == '!'): 
        grid[worker_y][worker_x] = '.'
    else:
        grid[worker_y][worker_x] = ' '

def check_elem_action_seq(warehouse_init, action_seq):
    warehouse = warehouse_init.copy()
    grid = [list(row) for row in warehouse.__str__().splitlines()]
    
    for action in action_seq:
        if(not str.strip(action)): # check if spaces or empty actions are passed
            continue
        new_pos = move(warehouse.worker, action)
        if(condition_valid(action, new_pos, grid) == False):
            return "Impossible"
        update_grid(action, new_pos, grid, warehouse.worker)
        warehouse.worker = new_pos
    
    grid_str = '\n'.join(''.join(row) for row in grid)
    return grid_str

    This function analyses the given warehouse.
    It returns the two items. The first item is an action sequence solution. 
    The second item is the total cost of this action sequence.
    
    @param 
     warehouse: a valid Warehouse object

    @return
    
        If puzzle cannot be solved 
            return 'Impossible', None
        
        If a solution was found, 
            return S, C 
            where S is a list of actions that solves
            the given puzzle coded with 'Left', 'Right', 'Up', 'Down'
            For example, ['Left', 'Down', Down','Right', 'Up', 'Down']
            If the puzzle is already in a goal state, simply return []
            C is the total cost of the action sequence C

In [33]:
# Now consider the weights of the boxes being pushed
# Determine how this changes the cost function in the graph search

def solve_weighted_sokoban(warehouse):

    problem = SokobanPuzzle(warehouse)
    solution_node = search.astar_graph_search(problem)

    if solution_node is None:
        return "Impossible", None

    return solution_node.solution(), solution_node.path_cost

In [34]:
#Test-code:

wh = sokoban.Warehouse()
wh.load_warehouse('./warehouses/warehouse_07.txt')
start = time.time()
solution, cost = solve_weighted_sokoban(wh)
end = time.time()
analysis_time = end - start

print("Actions:", solution)
print("Total cost:", cost)
print("Analysis took:", analysis_time)

Actions: ['Up', 'Up', 'Right', 'Right', 'Up', 'Up', 'Left', 'Left', 'Down', 'Down', 'Right', 'Up', 'Down', 'Right', 'Down', 'Down', 'Left', 'Up', 'Down', 'Left', 'Left', 'Up', 'Left', 'Up', 'Up', 'Right']
Total cost: 26
Analysis took: 84.64380264282227


In [None]:
#Check_elem_sequence test code:
print(check_elem_action_seq(wh, solution))