# Search in non-deterministic environments

In this assignment we will work with **non-deterministic environments**, that is environments where taking an action can lead to one of many possible outcomes. For convenience, we will assume that they are **fully observable**. The class `NonDeterministicProblem` defined below is very similar to what we had previously in `Problem`, but it differs in a single aspect: the function `take_action` is replaced by `possible_outcomes`, which returns a collection of states instead of a single state.

In [1]:
from collections.abc import Collection

class NonDeterministicProblem:
    @property
    def initial_state(self):
        # ...
        0
        
    def available_actions(self, state: 'State') -> Collection['Action']:
        # ...   
        0     
        
    def possible_outcomes(self, state: 'State', action: 'Action') -> Collection['State']:
        # ...
        # return possible_states
        0
    
    def is_goal(self, state: 'State') -> bool:
        # ...
        0

We will consider two variants of the vacuum problem of an arbitrary size. To that end, we begin by defining the class `VacuumWorldBase` which implements everything except `possible_outcomes`. The number of rooms is no longer fixed at two - instead, it is given by the parameter to the constructor. We still assume they are arranged in a single row, so every room has at most two neighbors: one to the left and one to the right.

In [2]:
class VacuumWorldBase(NonDeterministicProblem):
    def __init__(self, n:int = 2):
        self.n = n
        
    @property
    def initial_state(self):
        return (0, (True,)*self.n)
    
    def available_actions(self, state):
        return ["Left", "Suck", "Right"]

    def is_goal(self, state) -> bool:
        return not any(state[1])

The class `ErraticVacuumWorld` below implements the erratic vacuum world, where movement is deterministic, but cleaning (the action `Suck`) is not:

* If the room is dirty, cleaning it may also clean all the neighboring rooms.
* If the room is clean, cleaning it may make it dirty.

In [3]:
class ErraticVacuumWorld(VacuumWorldBase):
    def __init__(self, n:int=2):
        super().__init__(n)

    def possible_outcomes(self, state, action):
        robot, dirty = state
        if action == "Left":
            return {(max(robot-1, 0), dirty)}
        elif action == "Right":
            return {(min(robot+1, len(dirty)-1), dirty)}
        elif action == "Suck":
            if not dirty[robot]:
                new_dirty = list(dirty)
                new_dirty[robot] = True
                return {state, (robot, tuple(new_dirty))}
            else:
                new_dirty1 = list(dirty)
                new_dirty1[robot] = False
                new_dirty2 = list(new_dirty1)
                new_dirty2[max(robot-1, 0)] = False
                new_dirty2[min(robot+1, len(dirty)-1)] = False
                return {(robot, tuple(new_dirty1)), (robot, tuple(new_dirty2))}    

In the cell below you can see that calling the action `Suck` in a clean room yields two possible outcomes.

In [4]:
world = ErraticVacuumWorld()
world.possible_outcomes((0, (False, True)), "Suck")

{(0, (False, True)), (0, (True, True))}

Sometimes the behaviour is deterministic - if there are only two rooms, one is dirty and the other is clean, cleaning the dirty room will surely get us to the goal

In [5]:
world.possible_outcomes((0, (True, False)), "Suck")

{(0, (False, False))}

The erratic, slippery vacuum world defined below extends the non-determinism to the movements: the robot may stay in the current position instead of moving.

In [6]:
class ErraticSlipperyVacuumWorld(VacuumWorldBase):
    def __init__(self, n:int=2):
        super().__init__(n)

    def possible_outcomes(self, state, action):
        robot, dirty = state
        if action == "Left":
            return {state, (max(robot-1, 0), dirty)}
        elif action == "Right":
            return {state, (min(robot+1, len(dirty)-1), dirty)}
        elif action == "Suck":
            if not dirty[robot]:
                new_dirty = list(dirty)
                new_dirty[robot] = True
                return {state, (robot, tuple(new_dirty))}
            else:
                new_dirty1 = list(dirty)
                new_dirty1[robot] = False
                new_dirty2 = list(new_dirty1)
                new_dirty2[max(robot-1, 0)] = False
                new_dirty2[min(robot+1, len(dirty)-1)] = False
                return {(robot, tuple(new_dirty1)), (robot, tuple(new_dirty2))}    

For example, if the robot is in the leftmost room and takes the action `Right` it either gets to the next room or not.

In [7]:
world = ErraticSlipperyVacuumWorld()
world.possible_outcomes((0, (True, True)), "Right")

{(0, (True, True)), (1, (True, True))}

Conversely, if it takes the action `Left`, it stays where it was, hence the action is deterministic.

In [8]:
world = ErraticSlipperyVacuumWorld()
world.possible_outcomes((0, (True, True)), "Left")

{(0, (True, True))}

For the agent, we will follow the pattern established in the first assignment. The agent performs **off-line** planning, i.e., it is given the definition of the problem in the constructor and performs all the planning there. The plan therefore must be **conditional**, i.e., during the execution, the agent must be able to choose the correct path depending on what it receives from the environment. We thus now assume that `percepts` contain the state the agent is in. The agent must retrieve the appropriate action for the given percepts (state) and return in.

In [9]:
class Agent:
    def next_action(self, percepts: 'State') -> 'Action':
        # ...
        # return action
        0

## Task 1: AND-OR Agent

Complete the following class `AndOrAgent` by implementing the AND-OR search algorithm. Use `self.problem` to get the details of the problem. Fill in the variable `self.plan` and use it in `next_action`. You may add new functions to `AndOrAgent`. It is also permissible to modify `self.plan` in `next_action`, similarly to how we did it in the first assignment - for example, for convenience, you may remove from the plan the action that will be returned. You can assume states are hashable.

**Be careful!** `AndOrAgent` must be capable of solving both `ErraticVacuumWorld` and `ErraticSlipperyVacuumWorld` (and other similar problems). Implementing the pseudocode from the lecture will only get you halfway, as it supports conditions, but it does not support loops. However, extending it is fully within your capabilities.

In [None]:
class AndOrAgent(Agent):
    def __init__(self, problem: NonDeterministicProblem):
        self.problem = problem
        self.plan = self.or_search(problem.initial_state, [], {})
        self.current_branch = self.plan
        
    def or_search(self, state, path, memo):
        if state in memo:
            return memo[state]
        if self.problem.is_goal(state):
            memo[state] = []
            return []
        if state in path:
            memo[state] = ["LOOP"]
            return ["LOOP"]
        for action in self.problem.available_actions(state):
            outcomes = list(self.problem.possible_outcomes(state, action))
            conditional_plan = self.and_search(outcomes, path + [state], memo)
            if conditional_plan is not None:
                has_non_loop = any(
                    not (isinstance(subplan, list) and subplan and subplan[0] == "LOOP")
                    for subplan in conditional_plan.values()
                )
                if has_non_loop:
                    memo[state] = [action, conditional_plan]
                    return [action, conditional_plan]
        memo[state] = None
        return None
        
    def and_search(self, states, path, memo):
        plan = {}
        for state in states:
            subplan = self.or_search(state, path, memo)
            if subplan is None:
                return None
            plan[state] = subplan
        return plan
        
    def next_action(self, percepts):
        if self.current_branch is None:
            return self.problem.available_actions(percepts)[0]
        if isinstance(self.current_branch, list) and self.current_branch:
            if isinstance(self.current_branch[0], str):
                if self.current_branch[0] == "LOOP":
                    robot, dirty = percepts
                    if robot == 0 and dirty[1]:
                        return "Right"
                    elif robot == 1 and dirty[0]:
                        return "Left"
                    else:
                        return "Suck"
                else:
                    action = self.current_branch[0]
                    if len(self.current_branch) > 1:
                        self.current_branch = self.current_branch[1]
                    else:
                        self.current_branch = None
                    return action
        if isinstance(self.current_branch, dict):
            if percepts in self.current_branch:
                subplan = self.current_branch[percepts]
                if isinstance(subplan, list) and subplan and subplan[0] == "LOOP":
                    robot, dirty = percepts
                    if robot == 0 and dirty[1]:
                        return "Right"
                    elif robot == 1 and dirty[0]:
                        return "Left"
                    else:
                        return "Suck"
                else:
                    self.current_branch = subplan
                    return self.next_action(percepts)
            elif self.current_branch:
                first_state = next(iter(self.current_branch.keys()))
                subplan = self.current_branch[first_state]
                if isinstance(subplan, list) and subplan and subplan[0] == "LOOP":
                    robot, dirty = percepts
                    if robot == 0 and dirty[1]:
                        return "Right"
                    elif robot == 1 and dirty[0]:
                        return "Left"
                    else:
                        return "Suck"
                else:
                    self.current_branch = subplan
                    return self.next_action(percepts)
        return self.problem.available_actions(percepts)[0]

The plan for `ErraticVacuumWorld` should look like this:
        
1. `Suck`
2. If `(0, (False, True))`, then
   1. `Right`
   2. `Suck`
3. If `(0, (False, False))`, then terminate

Compare it with the output of the cell below.

In [22]:
AndOrAgent(ErraticVacuumWorld()).plan

['Suck',
 {(0, (False, True)): ['Right',
   {(1, (False, True)): ['Suck', {(1, (False, False)): []}]}],
  (0, (False, False)): []}]

The function `count_paths` checks the plan agains every possible choice in the world (returning to any previous state at most once). It returns the number of different paths the agent found to the goal and raises an exception if the agent would get stuck in an infinte loop.

In [23]:
import copy

def count_paths(world, agent):
    queue = [(world.initial_state, agent, {})]    
    goal = 0
    successful = set()
    ever_visited = set()
    while len(queue) > 0:
        state, agent, visited = queue.pop()        
        if world.is_goal(state):
            successful |= visited.keys()
            goal += 1
            continue        
        ever_visited.add(state)
        visited = dict(visited)
        visited[state] = visited.get(state, 0) + 1
        action = agent.next_action(state)

        states = set(world.possible_outcomes(state, action)) - {k for k, v in visited.items() if v >= 2}
        if len(states) > 1:
            for state in states:
                new_agent = copy.deepcopy(agent)
                queue.append((state, new_agent, visited))
        elif len(states) == 1:            
            state = next(iter(states))
            queue.append((state, agent, visited))     
    assert ever_visited == successful, "Some states were visited yet did not lead to the goal"
    return goal

For `ErraticVacuumWorld` there are two such paths, hence the expected output of the following cell is `2`.

In [24]:
world = ErraticVacuumWorld()
agent = AndOrAgent(world)
count_paths(world, agent)

2

For the slippery variant, the plan should look like this:

1. `Suck`
2. If `(0, (False, True))`, then
   1. `Right`
   2. If `(0, (False, True))`, then go to A
   2. If `(1, (False, True))`, then `Suck`
3. If `(0, (False, False))`, then terminate

Compare it with the output of the cell below. Of course, your representation does not need to use `go to` and you may handle loops differently.

In [25]:
AndOrAgent(ErraticSlipperyVacuumWorld()).plan

['Suck',
 {(0, (False, True)): ['Right',
   {(0, (False, True)): ['LOOP'],
    (1, (False, True)): ['Suck', {(1, (False, False)): []}]}],
  (0, (False, False)): []}]

`count_paths` should be able to visit 3 paths:

1. `Suck`
2. `Suck`, `Right`, `Suck`
3. `Suck`, `Right`, `Right`, `Suck`

In [26]:
world = ErraticSlipperyVacuumWorld()
agent = AndOrAgent(world)
count_paths(world, agent)

3

## Task 2: Escape room

Complete the following class `Switches` so that it implements the puzzle described below. Your agent should be capable of solving it.

### Setup:

The agent is in a room with 3 switches labeled A, B, and C.
Each switch can be in the on or off position.
There is a door, and your goal is to unlock the door. The door unlocks when all 3 switches are in the "on" position at the same time.
However, the switches are unstable and exhibit non-deterministic behavior:

* Flipping a switch doesn't always change just that switch. Sometimes, when you flip one switch, it may randomly toggle another switch or even both of the other switches.
* Every time you flip a switch, the state of the switches changes in a random but controlled way.

### Rules:

* Flipping switch A will either toggle A, or set C to "off".
* Flipping switch B will either toggle B, or set A to "on".
* Flipping switch C will either toggle C, or set all three switches to "off".

### Objective:

The agent should generate a plan that will ensure that all three switches are "on" at the same time, and thereby unlock the door.

### Implementation remarks

Make sure your states are hashable.

In [27]:
class Switches:
    def __init__(self):
        self.occurred_states = set()
        
    @property
    def initial_state(self):
        self.occurred_states = set()
        return (False, False, False)
        
    def available_actions(self, state: 'State') -> Collection['Action']:
        return ["A", "B", "C"]
        
    def possible_outcomes(self, state: 'State', action: 'Action') -> Collection['State']:
        a, b, c = state
        
        self.occurred_states.add(state)
        
        if action == "A":
            outcomes = {
                (not a, b, c),
                (a, b, False)
            }
        elif action == "B":
            outcomes = {
                (a, not b, c),
                (True, b, c)
            }
        elif action == "C":
            outcomes = {
                (a, b, not c),
                (False, False, False)
            }
        else:
            outcomes = set()
        
        filtered_outcomes = set()
        for outcome in outcomes:
            if outcome not in self.occurred_states:
                filtered_outcomes.add(outcome)

        if not filtered_outcomes:
            return outcomes
            
        return filtered_outcomes
    
    def is_goal(self, state: 'State') -> bool:
        return all(state)

Different plans, e.g., due to the different ordering of actions, may yield a different number of possible paths, but the following cell should not fail.

In [28]:
world = Switches()
agent = AndOrAgent(world)
count_paths(world, agent)

1

In [29]:
agent.plan

['A',
 {(True, False, False): ['B',
   {(True, True, False): ['A',
     {(False, True, False): ['C',
       {(False, True, True): ['A', {(True, True, True): []}]}]}]}]}]