# HW 2.2 Starter Code

See pset 1 for dependency installation instructions and see the problem set for deliverables.

## Before You Begin

Review the first two sections of this notebook ("Logical Sentences" and "Operators for Constructing Logical Sentences"):
https://github.com/aimacode/aima-python/blob/master/logic.ipynb

We will be using the data structures and logical operations from that codebase in this problem set. If you are interested, feel free to read the rest of the notebook as well, but note that we make several departures from what is described beyond the logical sentences part.

In [None]:
# Install dependencies (run this once ever 12 hours)
!git clone https://github.com/MIT-6-882/HW utils
!pip install --upgrade git+https://github.com/tomsilver/pddlgym # Install most recent PDDLGym (must be from source!)
!pip install tabulate
!pip install python-sat # modern SAT solvers (do not pip install pysat; that is a different package)

In [None]:
from utils.logic_utils import Expr, to_cnf, conjuncts, disjuncts, equiv, implies, check_if_satisfiable
from utils.planning_utils import AStar
from pddlgym.rendering.myopic_posar import posar_render_from_layout, ROBOT, PERSON, SMOKE, FIRE, EMPTY
from pddlgym.custom.searchandrescue import MyopicPOSAREnv
import abc
import itertools
import pddlgym
import numpy as np
import time
import matplotlib.pyplot as plt
from collections import defaultdict, namedtuple
from tabulate import tabulate

In [None]:
# For convenience, give global names to the actions in the environment
UP, DOWN, LEFT, RIGHT, PICKUP = MyopicPOSAREnv.up, MyopicPOSAREnv.down, MyopicPOSAREnv.left, \
                                MyopicPOSAREnv.right, MyopicPOSAREnv.pickup

## Logical Inference

In [None]:
# For convenience, we define a number of functions for creating
# named propositions
def smoke(r, c):
    return Expr("Smoke", r, c)

def fire(r, c):
    return Expr("Fire", r, c)

def empty(r, c):
    return Expr("Empty", r, c)

def person(r, c):
    return Expr("Person", r, c)

def robot(r, c, t):
    return Expr("Robot", r, c, t)

def rescued(t):
    return Expr("Rescued", t)

def up(t):
    return Expr("Up", t)

def down(t):
    return Expr("Down", t)

def left(t):
    return Expr("Left", t)

def right(t):
    return Expr("Right", t)

def pickup(t):
    return Expr("Pickup", t)

In [None]:
class KnowledgeBase:
    """
    This knowledge base keeps track of the agent's current
    beliefs/knowledge about the MyopicPOSAR environment.
    
    The number of facts will depend on the height and width
    of the environment because we need one proposition per
    location that the agent might be at, for example.
    """
    def __init__(self, height, width):
        self.height = height
        self.width = width
        # Map propositions to IDs
        self._proposition_to_id = {}
        self._id_count = itertools.count(1)
        # Keep track of clauses in CNF form
        # This will be directly passed to pysat solver
        self._clauses = []

        # Initialize background knowledge
        self._initialize_background_knowledge()

    def check(self, query, assumptions=None, debug=False):
        """Check whether a query is true, in light of the knowledge that we have so far.
        
        Important note: if check(query) returns False, this means: "we cannot prove
        the query". It does not mean that query is False. For example, if we do not
        yet know where the person is, then check(person(0, 0)) would return False,
        and check(~person(0, 0)) would _also_ return False (where ~person(0, 0)) is
        the proposition "the person is not at (0, 0)" because we cannot prove its
        truth or falsity. See the unit tests below for additional examples.
        """
        if assumptions is None:
            assumptions = []
        else:
            assumptions = [c for s in assumptions for c in self._sentence_to_clauses(s)]
        # Check if it's possible that the query is _not_ true in light of the clauses
        cnf_query = self._sentence_to_clauses(~query)
        return not check_if_satisfiable(cnf_query + assumptions + self._clauses, debug=debug)

    def _update(self, sentences):
        """Add new sentences: convert to CNF and store
        in self._clauses
        """
        new_clauses = []
        for sentence in sentences:
            new_clauses += self._sentence_to_clauses(sentence)
        self._clauses.extend(new_clauses)

        # Sanity checks
        assert self.check(equiv(Expr("P"), Expr("P"))), "Cannot prove True; KB is malformed"
        assert not self.check(equiv(Expr("P"), ~Expr("P"))), "Cannot prove ~False; KB is malformed"

    def _sentence_to_clauses(self, s):
        """Convert a sentence into a format friendly to SAT solver
        """
        # Convert to cnf
        cnf_sentence = conjuncts(to_cnf(s))

        # Collect ID-ified clauses
        clauses = []

        for clause in cnf_sentence:
            # Get the disjuncts, which should be individual propositions
            props = disjuncts(clause)
            ids = [] # integer ids for props
            signs = [] # -1 means negative, +1 means positive
            for prop in props:
                # Proposition is negated
                if prop.op == '~':
                    # Get the nonnegated prop
                    prop = prop.args[0]
                    sign = -1
                else:
                    sign = 1
                # Update map of prop to id
                if prop not in self._proposition_to_id:
                    self._proposition_to_id[prop] = next(self._id_count)
                ids.append(self._proposition_to_id[prop])
                signs.append(sign)
            signed_clause = [i*s for i, s in zip(ids, signs)]
            clauses.append(signed_clause)

        return clauses

    def _initialize_background_knowledge(self):
        """Called once at the beginning, add atemporal axioms
        for the domain
        """
        bk = []
        
        # Background knowledge should include...
        # 1. The relationship between smoke and fire
        # 2. The fact that there cannot be a person in the fire
        raise NotImplementedError("Implement me!")

        self._update(bk)

    def update_from_observation(self, obs, t):
        """Update the knowledge base from seeing obs at time t
        """
        new_sentences = []

        ## We'll start you off with some of this implementation,
        ## but there is much left for you to complete in this method
            
        # Add percepts
        obs = dict(obs)

        (r, c) = obs["robot"]
        if obs["cell"] == "fire":
            new_sentences.append(fire(r, c))
        else:
            new_sentences.append(~fire(r, c))
        if obs["cell"] == "person":
            new_sentences.append(person(r, c))
        else:
            new_sentences.append(~person(r, c))
        if obs["cell"] == "empty":
            new_sentences.append(empty(r, c))
        else:
            new_sentences.append(~empty(r, c))
        if obs["smoke"]:
            new_sentences.append(smoke(r, c))
        else:
            new_sentences.append(~smoke(r, c))
        if obs["rescued"]:
            new_sentences.append(rescued(t))
        else:
            new_sentences.append(~rescued(t))

        # Robot location perceived directly
        for r in range(self.height):
            for c in range(self.width):
                if (r, c) == obs["robot"]:
                    new_sentences.append(robot(r, c, t))
                else:
                    new_sentences.append(~robot(r, c, t))
                    
        ## Here, you should add rules for the transition between times t and t+1
        # Specifically, your rules will need to capture what happens when you
        # take each action. You will need to think about rescued(t+1) and robot(_,_t+1).
        raise NotImplementedError("Implement me!")
        
        self._update(new_sentences)

    def update_from_action(self, act, t):
        """Update the knowledge base from taking action at time t
        """
        # No need to modify this method
        new_sentences = []

        # Add new sentences for action that was taken
        if act == UP:
            new_sentences.append(up(t))
        if act == DOWN:
            new_sentences.append(down(t))
        if act == LEFT:
            new_sentences.append(left(t))
        if act == RIGHT:
            new_sentences.append(right(t))
        if act == PICKUP:
            new_sentences.append(pickup(t))

        self._update(new_sentences)

### Make sure you pass these unit tests before moving on

In [None]:
# Unit tests
def test_knowledge_base():
    """Run some unit tests
    """
    print("Running tests...")
    start_time = time.time()

    ##  Test w/ initial state: [Fire, Smoke, Robot, Empty, Person]
    kb = KnowledgeBase(1, 5)
    obs0 = (("robot", (0, 2)), ("smoke", False), ("rescued", False), ("cell", "empty"))
    kb.update_from_observation(obs0, 0)
    
    assert kb.check(robot(0, 2, 0)) == True
    assert kb.check(~robot(0, 2, 0)) == False
    assert kb.check(robot(0, 1, 0)) == False
    assert kb.check(~robot(0, 1, 0)) == True
    assert kb.check(robot(0, 2, 1)) == False, "We don't know anything about time 1 yet"
    assert kb.check(~robot(0, 2, 1)) == False, "We don't know anything about time 1 yet"
    assert kb.check(smoke(0, 2)) == False
    assert kb.check(~smoke(0, 2)) == True
    assert kb.check(fire(0, 2)) == False
    assert kb.check(~fire(0, 2)) == True
    assert kb.check(fire(0, 1)) == False
    assert kb.check(~fire(0, 1)) == True, "There's no smoke, so there can be no fire"
    assert kb.check(fire(0, 3)) == False
    assert kb.check(~fire(0, 3)) == True, "There's no smoke, so there can be no fire"
    assert kb.check(empty(0, 2)) == True
    assert kb.check(~empty(0, 2)) == False

    act0 = PICKUP # should do nothing
    kb.update_from_action(act0, 0)
    # We haven't observed the effect yet, but we should know that the robot
    # will be in the same place (because the grid is height 1)
    assert kb.check(robot(0, 2, 1)) == True
    assert kb.check(~robot(0, 2, 1)) == False
    assert kb.check(robot(0, 1, 1)) == False
    assert kb.check(robot(0, 2, 2)) == False, "We don't know anything about time 2 yet"
    assert kb.check(~robot(0, 2, 2)) == False, "We don't know anything about time 2 yet"
    # Don't forget what we already know...
    assert kb.check(smoke(0, 2)) == False
    assert kb.check(~smoke(0, 2)) == True
    assert kb.check(fire(0, 2)) == False
    assert kb.check(~fire(0, 2)) == True
    assert kb.check(fire(0, 1)) == False
    assert kb.check(~fire(0, 1)) == True
    assert kb.check(fire(0, 3)) == False
    assert kb.check(~fire(0, 3)) == True
    assert kb.check(empty(0, 2)) == True
    assert kb.check(~empty(0, 2)) == False

    obs1 = (("robot", (0, 2)), ("smoke", False), ("rescued", False), ("cell", "empty"))
    kb.update_from_observation(obs1, 1)
    act1 = LEFT
    kb.update_from_action(act1, 1)
    # Anticipate the robot moving left, but we don't know what we will see yet
    assert kb.check(robot(0, 1, 2)) == True
    assert kb.check(~robot(0, 1, 2)) == False
    assert kb.check(robot(0, 2, 2)) == False
    assert kb.check(smoke(0, 1)) == False
    assert kb.check(~smoke(0, 1)) == False

    # Now we get the observation of smoke
    obs2 = (("robot", (0, 1)), ("smoke", True), ("rescued", False), ("cell", "empty"))
    kb.update_from_observation(obs2, 2)
    assert kb.check(smoke(0, 1)) == True
    assert kb.check(~smoke(0, 1)) == False
    # So we should be able to assume fire on the left
    assert kb.check(fire(0, 0)) == True
    assert kb.check(~fire(0, 0)) == False
    # We still don't know about fires elsewhere
    assert kb.check(fire(0, 4)) == False
    assert kb.check(~fire(0, 4)) == False

    # If we move left again, we're gonna get stuck in the fire
    act2 = LEFT
    kb.update_from_action(act2, 2)
    obs3 = (("robot", (0, 0)), ("smoke", False), ("rescued", False), ("cell", "fire"))
    kb.update_from_observation(obs3, 3)
    assert kb.check(robot(0, 0, 3)) == True
    assert kb.check(~robot(0, 0, 3)) == False
    assert kb.check(robot(0, 2, 3)) == False
    assert kb.check(~robot(0, 2, 3)) == True
    act3 = RIGHT
    kb.update_from_action(act3, 3)
    # We should know that we're stuck now
    assert kb.check(robot(0, 0, 4)) == True
    assert kb.check(~robot(0, 0, 4)) == False
    assert kb.check(robot(0, 2, 4)) == False
    assert kb.check(~robot(0, 2, 4)) == True

    ## Test w/ initial state: [Person, Empty, Robot, Smoke, Fire]
    kb = KnowledgeBase(1, 5)
    obs0 = (("robot", (0, 2)), ("smoke", False), ("rescued", False), ("cell", "empty"))
    kb.update_from_observation(obs0, 0)
    act0 = LEFT
    kb.update_from_action(act0, 0)
    obs1 = (("robot", (0, 1)), ("smoke", False), ("rescued", False), ("cell", "empty"))
    kb.update_from_observation(obs1, 1)
    act1 = LEFT
    kb.update_from_action(act1, 1)
    obs2 = (("robot", (0, 0)), ("smoke", False), ("rescued", False), ("cell", "person"))
    kb.update_from_observation(obs2, 2)
    # We should see that there is a person here now
    assert kb.check(robot(0, 0, 2)) == True
    assert kb.check(~robot(0, 0, 2)) == False
    assert kb.check(robot(0, 2, 2)) == False
    assert kb.check(~robot(0, 2, 2)) == True
    assert kb.check(person(0, 0)) == True
    assert kb.check(rescued(0)) == False
    # We should anticipate that picking will rescue
    act2 = PICKUP
    kb.update_from_action(act2, 2)
    assert kb.check(rescued(0)) == False
    assert kb.check(rescued(1)) == False
    assert kb.check(rescued(2)) == False
    assert kb.check(rescued(3)) == True

    ## Test w/ initial state: [Smoke, Empty, Empty, Empty, Empty]
    ##                        [Fire,  Smoke, Robot, Empty, Person]
    kb = KnowledgeBase(2, 5)
    trajectory = [
        ((("robot", (1, 2)), ("smoke", False), ("rescued", False), ("cell", "empty")), LEFT),
        ((("robot", (1, 1)), ("smoke", True), ("rescued", False), ("cell", "empty")), RIGHT),
        ((("robot", (1, 2)), ("smoke", False), ("rescued", False), ("cell", "empty")), UP),
        ((("robot", (0, 2)), ("smoke", False), ("rescued", False), ("cell", "empty")), None),
    ]

    # Run through the trajectory and then assert some things above the last state
    for t, (obs, act) in enumerate(trajectory):
        kb.update_from_observation(obs, t)
        if t < len(trajectory) - 1:
            kb.update_from_action(act, t)

    # We should know everything about (1, 1), (1, 2), (0, 2)
    # and we should know that there is a fire at (1, 0)
    # and we should not know anything about
    # (1, 1) has smoke
    assert kb.check(fire(1, 1)) == False
    assert kb.check(~fire(1, 1)) == True
    assert kb.check(smoke(1, 1)) == True
    assert kb.check(~smoke(1, 1)) == False
    assert kb.check(empty(1, 1)) == True
    assert kb.check(~empty(1, 1)) == False
    assert kb.check(person(1, 1)) == False
    assert kb.check(~person(1, 1)) == True
    # (1, 2) is empty
    assert kb.check(fire(1, 2)) == False
    assert kb.check(~fire(1, 2)) == True
    assert kb.check(smoke(1, 2)) == False
    assert kb.check(~smoke(1, 2)) == True
    assert kb.check(empty(1, 2)) == True
    assert kb.check(~empty(1, 2)) == False
    assert kb.check(person(1, 2)) == False
    assert kb.check(~person(1, 2)) == True
    # (0, 2) is empty
    assert kb.check(fire(0, 2)) == False
    assert kb.check(~fire(0, 2)) == True
    assert kb.check(smoke(0, 2)) == False
    assert kb.check(~smoke(0, 2)) == True
    assert kb.check(empty(0, 2)) == True
    assert kb.check(~empty(0, 2)) == False
    assert kb.check(person(0, 2)) == False
    assert kb.check(~person(0, 2)) == True
    # (1, 0) has fire
    assert kb.check(fire(1, 0)) == True
    assert kb.check(~fire(1, 0)) == False

    ## Test w/ initial state: [Empty, Smoke, Empty, Empty, Empty]
    ##                        [Smoke, Fire,  Smoke, Robot, Empty]
    ##                        [Empty, Smoke, Empty, Empty, Person]
    kb = KnowledgeBase(3, 5)
    trajectory = [
        ((("robot", (1, 3)), ("smoke", False), ("rescued", False), ("cell", "empty")), LEFT),
        ((("robot", (1, 2)), ("smoke", True), ("rescued", False), ("cell", "empty")), UP),
        ((("robot", (0, 2)), ("smoke", False), ("rescued", False), ("cell", "empty")), DOWN),
        ((("robot", (1, 2)), ("smoke", True), ("rescued", False), ("cell", "empty")), DOWN),
        ((("robot", (2, 2)), ("smoke", False), ("rescued", False), ("cell", "empty")), DOWN),
    ]

    # We should not figure out that there is fire at (1, 1) until the last time step
    for t, (obs, act) in enumerate(trajectory):
        kb.update_from_observation(obs, t)
        if t < len(trajectory) - 1:
            kb.update_from_action(act, t)
            assert kb.check(fire(1, 1)) == False
            assert kb.check(~fire(1, 1)) == False
        else:
            assert kb.check(fire(1, 1)) == True
            assert kb.check(~fire(1, 1)) == False

    print(f"Tested passed in {time.time() - start_time} s.")

In [None]:
test_knowledge_base() # This takes 1-2 seconds for me to run.

### It may be helpful or interesting to look at the agent's beliefs rendered as an image

In [None]:
# Rendering utility (can be very slow due to so many queries)
def myopic_sar_render_belief(kb, t):
    """
    Parameters
    ----------
    kb : KnowledgeBase
    t : int

    Returns
    -------
    img : np.ndarray
    """
    grid = np.zeros((kb.height, kb.width, 6), dtype=bool)

    print("Rendering belief...")
    for r in range(kb.height):
        for c in range(kb.width):
            if kb.check(robot(r, c, t)):
                grid[r, c, ROBOT] = True
            if kb.check(person(r, c)):
                grid[r, c, PERSON] = True
            if kb.check(smoke(r, c)):
                grid[r, c, SMOKE] = True
            if kb.check(fire(r, c)):
                grid[r, c, FIRE] = True
            if kb.check(empty(r, c)):
                grid[r, c, EMPTY] = True

    return posar_render_from_layout(grid)

def run_belief_rendering_demo():
    """A hardcoded example with belief rendering
    """
    ## Test w/ initial state: [Empty, Smoke, Empty, Empty, Empty]
    ##                        [Smoke, Fire,  Smoke, Robot, Empty]
    ##                        [Empty, Smoke, Empty, Empty, Person]
    kb = KnowledgeBase(3, 5)
    trajectory = [
        ((("robot", (1, 3)), ("smoke", False), ("rescued", False), ("cell", "empty")), LEFT),
        ((("robot", (1, 2)), ("smoke", True), ("rescued", False), ("cell", "empty")), UP),
        ((("robot", (0, 2)), ("smoke", False), ("rescued", False), ("cell", "empty")), DOWN),
        ((("robot", (1, 2)), ("smoke", True), ("rescued", False), ("cell", "empty")), DOWN),
        ((("robot", (2, 2)), ("smoke", False), ("rescued", False), ("cell", "empty")), DOWN),
    ]

    # Collect the belief images
    belief_images = []

    # We should not figure out that there is fire at (1, 1) until the last time step
    for t, (obs, act) in enumerate(trajectory):
        kb.update_from_observation(obs, t)
        
        # Render belief at time 0
        img = myopic_sar_render_belief(kb, t)
        belief_images.append(img)
        kb.update_from_action(act, t)

    # Render belief at last time
    img = myopic_sar_render_belief(kb, len(trajectory))
    belief_images.append(img)

    # Show images
    for t, img in enumerate(belief_images):
        print(f"Time step {t}")
        plt.figure()
        plt.imshow(img, interpolation='nearest')
        plt.axis('off')
        plt.show()

In [None]:
run_belief_rendering_demo()

## Planning and Acting with Partial Observability

Don't move on to this until you've passed the unit tests above.

In [None]:
class MyopicPOSARLogicalInferenceAgent:
    """An agent for the Myopic POSAR environments that
    performs logical inference in response to observations
    and actions. Also decides what actions to take.

    The height and width are needed to initialize the kb.
    """
    def __init__(self, height, width):
        self._height = height
        self._width = width
        self._kb = None # set in reset
        self._rng = None # set in seed
        self._actions = [UP, DOWN, LEFT, RIGHT, PICKUP]

    def reset(self, obs):
        """Refresh the knowledge base
        """
        # We'll ignore the observation because we're going
        # to see it again in step
        self._kb = KnowledgeBase(self._height, self._width)
        # Reset step count
        self._step_count = 0
        return {}

    def step(self, obs):
        """Update the knowledge base and suggest an action
        """
        # Update the kb based on the observation
        self._kb.update_from_observation(obs, self._step_count)
        # Get an action to take
        act, info = self._get_action(obs)
        # Update the kb based on the action
        self._kb.update_from_action(act, self._step_count)
        # Update step count
        self._step_count += 1
        return act, info

    @abc.abstractmethod
    def _get_action(self, obs):
        """Return an action to be immediately taken, based on the current
        knowledge base. This is the main thing that differentiates subclasses.

        Returns
        -------
        action : int
            The action to be taken immediately.
        info : dict
            Any useful debug info.
        """
        raise NotImplementedError("Override me")

    def seed(self, seed):
        """Seed the agent, just in case it's random
        """
        self._rng = np.random.RandomState(seed)

### Random Actions

In [None]:
class RandomActions(MyopicPOSARLogicalInferenceAgent):
    """Take random actions
    """
    def _get_action(self, _):
        return self._rng.choice(self._actions), {}

### Single State Determinization

In [None]:
class SingleStateDeterminization(MyopicPOSARLogicalInferenceAgent):
    """Choose (wisely) a single state for Myopic POSAR that is
    consistent with the knowledge base. Determinize, then
    plan, execute, and replan when an unexpected state is found.
    """
    def __init__(self, determinization_mode, successor_fn, check_goal_fn, 
                 get_observation_fn, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._determinization_mode = determinization_mode
        self._successor_fn = successor_fn
        self._get_observation_fn = get_observation_fn
        self._determinization_mode = determinization_mode
        self._plan = []
        self._expected_observations = [] # to trigger replanning
        self._planner = AStar(successor_fn, check_goal_fn)
        self._planner.set_actions(self._actions)

    def _get_action(self, obs):
        """
        """
        info = {}
        # We should replan if the observation was unexpected
        if len(self._expected_observations) == 0:
            expected_obs = None
        else:
            expected_obs = self._expected_observations.pop(0)
        if obs != expected_obs:
            # Updates self._plan and self._expected_observations
            info = self._reset_plan()

        # If planning failed, take a random action
        if not self._plan:
            return self._rng.choice(self._actions), info
        # Take the next step in the plan
        return self._plan.pop(0), info

    def _reset_plan(self):
        """
        """
        # Determinize the state
        state = self._determinize_state()
        # Plan
        self._plan, info = self._planner(state)
        # Get expected observations
        self._expected_observations = []
        s = state
        for action in self._plan:
            s = self._successor_fn(s, action)
            self._expected_observations.append(self._get_observation_fn(s))
        return info

    def _determinize_state(self):
        """Return a state consistent with the knowledge base
        """
        state = {}
        assumptions = []

        if self._determinization_mode == "mediocre_deteriminization":
            # Get the robot location
            # Note that the robot location is fully observed, so really we are just
            # looking up the location that we have already observed.
            candidates = [(r, c) for r in range(self._height) for c in range(self._width)]
            for r, c in candidates:
                # We should know exactly where the robot is
                new_query = robot(r, c, self._step_count)
                if self._kb.check(new_query, assumptions=assumptions):
                    assumptions.append(new_query)
                    state["robot"] = (r, c)
                    break
            else:
                raise Exception("Could not recover robot location")

            # We can always assume rescued is false (otherwise we're done)
            state["rescued"] = False

            # Ignore fires...
            state["fire_locs"] = frozenset(set())

            # Get a random person location...
            state["person"] = (self._rng.choice(self._height), self._rng.choice(self._width))

        elif self._determinization_mode == "my_deteriminization":
            ## Implement a more clever state deterimnization strategy!
            raise NotImplementedError("Implement me!")

        else:
            raise Exception(f"Unrecognized determinization mode {self._determinization_mode}")

        # Make hashable
        return tuple(sorted(state.items()))

### Evaluation Pipeline

In [None]:
def run_single_test(test_env, problem_idx, model, max_horizon=30):
    print(f"Running test problem {problem_idx}")
    test_env.fix_problem_index(problem_idx)
    start_time = time.time()
    obs, info = test_env.reset()
    model_info = model.reset(obs)
    num_steps = 0
    expansions = model_info.get("node_expansions", 0)
    success = False
    for t in range(max_horizon):
        print(".", end='', flush=True)
        act, model_info = model.step(obs)
        expansions += model_info.get("node_expansions", 0)
        obs, reward, done, info = test_env.step(act)
        num_steps += 1
        if done:
            assert reward == 1
            success = True
            break
    duration = time.time() - start_time
    print(f" final duration: {duration} with num steps {num_steps} and success={success}.")
    return duration, expansions, num_steps, success

def run_single_experiment(model, env, seed=0):
    # Initialize
    model.seed(seed)
    env.seed(seed)

    # Do testing
    test_durations = [] # seconds, one per problem
    test_expansions = [] # integers
    test_num_steps = [] # integers
    test_successes = [] # boolean, True if successful
    for problem_idx in range(len(env.problems)):
        duration, expansions, num_steps, success = \
            run_single_test(env, problem_idx, model)
        test_durations.append(duration)
        test_expansions.append(expansions)
        test_num_steps.append(num_steps)
        test_successes.append(success)

    env.close()

    return test_durations, test_expansions, test_num_steps, test_successes

def get_approach(name, env, planning_timeout=10):
    if name == "random":
        return RandomActions(env.height, env.width)

    if name == "single_state_determinization1":
        return SingleStateDeterminization("mediocre_deteriminization",
            env.get_successor_state, env.check_goal, 
            env.get_observation, env.height, env.width)

    if name == "single_state_determinization2":
        return SingleStateDeterminization("my_deteriminization",
            env.get_successor_state, env.check_goal, 
            env.get_observation, env.height, env.width)

    raise Exception(f"Unrecognized approach: {name}")

def print_results_table(env_name, results_for_env):
    print(f"\n### {env_name} ###")
    mean_table = [(a, ) + tuple(np.mean(results_for_env[a], axis=0)) \
                  for a in sorted(results_for_env)]
    columns = ["Approach", "Duration", "Expansions", "Num Steps", "Successes"]
    print(tabulate(mean_table, headers=columns))

def main():
    approaches = [
        "random", 
        "single_state_determinization1",
        "single_state_determinization2",
    ]
    num_seeds_per_approach = {
        "random" : 3,
        "single_state_determinization1" : 3,
        "single_state_determinization2" : 3,
    }

    env_names = [
        "TinyMyopicPOSAR",
        "SmallMyopicPOSAR",
    ]

    all_results = {}
    for env_name in env_names:
        results_for_env = {}
        all_results[env_name] = results_for_env
        for approach in approaches:
            results_for_env[approach] = []
            env = pddlgym.make(f"{env_name}-v0")
            model = get_approach(approach, env)
            for seed in range(num_seeds_per_approach[approach]):
                results = run_single_experiment(model, env, seed=seed)
                for (dur, expansions, num_steps, succ) in zip(*results):
                    results_for_env[approach].append((dur, expansions, num_steps, succ))

        # Print per-environment results (because of impatience)
        print_results_table(env_name, results_for_env)

    # Print final results
    print("\n" + "*" * 80)
    for env_name in env_names:
        print_results_table(env_name, all_results[env_name])

In [None]:
# Only for the second part of this pset (make sure you complete the logical inference part first)
main() # This may take a long time to run, so make sure you are confident before you try to run it all