# Assignment 1: Planning using Heuristic Search

**Felipe Meneguzzi**  
**Mauricio Magnaguagno (PhD Student)**  
**Leonardo Rosa Amado (PhD Student)**

Computer Science (4646A-4):
- Assigned: 30 August
- Due: 25 September

Computer Engineering (4621A-4):
- Assigned: 28 August
- Due: 23 September

## Assignment Overview

<img align="right" src="planning-assignment.png"/>

The goal of this work is to implement the core functions of an automated planner. You will implement three main functions in this assignment:
- Implement the **Max-Cost** heuristic function.
- Implement a function capable of validating a plan given a domain and a problem.
- Finally, implement the heuristic search **A\***

After implementing the required functions, you must write a 2-page paper. The entire package must be delivered using GitHub, where your implemented functions must be contained in this Jupyter Notebook, and the paper as a separate **pdf** file committed in the same Github repository (the template is in the ```paper``` folder).

## Experimentation

- You can test your implementation with the provided domains and problems:
  - [blocksworld](examples/blocksworld)
  - [dinner](examples/dinner)
  - [dwr](examples/dwr)
  - [tsp](examples/tsp)

- Planning tools and extra domains and problems to sanity check your own implementation:
  - [Web-Planner](https://web-planner.herokuapp.com/)
  - [editor.planning.domains](http://editor.planning.domains/)
  - IPC domains and problems can be found in [potassco/pddl-instances](https://github.com/potassco/pddl-instances)


## Grading

In order to properly evaluate your work and thought process, you will write a **2-page** report in the AAAI two-column format explaining your encoding and experiments. 
These guidelines are to be followed **exactly**. 
**Reports that are less than two pages of actual content, or not in format will receive 0 marks for the report criterion.** 
This report will be included in the deliverables of the assignment. 
[The formatting instructions are available at ShareLatex (AAAI Press)](https://www.sharelatex.com/templates/journals/aaai-press). 
The report must have the following sections:

- An introduction with your understanding of the problem domain, outlining the remainder of the paper;
- Three sections explaining each part of your implementation (search, heuristic, and validator).
- One experimentation section where you measure the performance of the planner using your action formalisation for each of the domains, on multiple problems.
- One conclusion section, where you will summarise your experience in encoding planning domains and discuss the performance of the planner, and any limitations encountered in solving the problems you encoded.

Grading will consider elements of your encoding, experimentation and reporting of the work done. 
The criteria, as well as their weight in the final grade is as follows:

- Implementation (70%):
  - Heuristic function (20%);
  - Validator (20%);
  - Heuristic search (30%):
      - Correctness and optimality (20%); and
      - Runtime efficiency (10%).
- Overall report readability (20%) — how accessible and coherent your explanation of your implementation is;
- Code readability (10%).

## Collaboration Policy

You must work on this project **individually**. 
You are free to discuss high-level design issues with the people in your class, but every aspect of your actual formalisation must be entirely your own work.
Furthermore, there can be no textual similarities in the reports generated by each group. 
Plagiarism, no matter the degree, will result in forfeiture of the entire grade of this assignment.

## Sections

- [Heuristic](#Heuristic)
  - [Implement the heuristic function](#Implement-the-heuristic-function)
  - [Test heuristic function](#Test-heuristic-function)
- [Validator](#Validator)
  - [Implement the validate function](#Implement-the-validate-function)
  - [Test validate function](#Test-validate-function)
- [Planner](#Planner)
  - [Implement the planner solve function](#Implement-the-planner-solve-function)
  - [Test planner completeness and optimality](#Test-planner-completeness-and-optimality)
  - [Test planner output time](#Test-planner-output-time)

In [1]:
from pddl.action import Action
from pddl.state import applicable, apply

# Objects example

# An action to move robot r1 from location l1 to location l2
a1 = Action(
    'move',
    ['r1', 'l1', 'l2'], # parameter
    frozenset({('at', 'r1', 'l1'), ('adjacent', 'l1', 'l2')}), # positive
    frozenset({('occupied', 'l2')}), #negative
    frozenset({('at', 'r1', 'l1'), ('occupied', 'l1')}), # effects added
    frozenset({('at', 'r1', 'l2'), ('occupied', 'l2')}) # effects removed
)

# Get each element from the action
print(a1.name)
print(a1.parameters)
print(a1.positive_preconditions)
print(a1.negative_preconditions)
print(a1.add_effects)
print(a1.del_effects)

# The list of actions contains all possible actions
actions = [
    a1,
    # ...
]

# Only positive literals are present in the initial state
initial_state = frozenset({
    ('on', 'ca', 'pallet'),
    ('at', 'r1', 'l1'),
    ('belong', 'k1', 'l1'),
    ('adjacent', 'l1', 'l2'), ('adjacent', 'l2', 'l1'), ('attached', 'q2', 'l2'),
    ('empty', 'k2'),
    ('attached', 'p1', 'l1'), ('occupied', 'l1'),
    ('empty', 'k1'),
    # ...
})

# Goal literals are split in two, positive and negative
positive_goal = frozenset({('in', 'cb', 'p1'), ('in', 'ca', 'p1')})
negative_goal = frozenset()

# The output plan from the planner is either a list of actions or failure (None)
# An empty plan is valid
plan = []
# Preconditions and effects are None when obtained from a plan file, may be filled when obtained from the planner
plan = [
    Action('take', ['k1', 'cc', 'cb', 'p1', 'l1'], None, None, None, None),
    Action('load', ['k1', 'r1', 'cc', 'l1'], None, None, None, None),
    Action('move', ['r1', 'l1', 'l2'], None, None, None, None),
    Action('unload', ['k2', 'r1', 'cc', 'l2'], None, None, None, None)
    # ...
]
# Failure
plan = None

# A valid plan is either true or false
valid_plan   = True
invalid_plan = False

move
['r1', 'l1', 'l2']
frozenset({('adjacent', 'l1', 'l2'), ('at', 'r1', 'l1')})
frozenset({('occupied', 'l2')})
frozenset({('at', 'r1', 'l2'), ('occupied', 'l2')})
frozenset({('at', 'r1', 'l1'), ('occupied', 'l1')})


In [29]:
from pddl.heuristic import Heuristic
import pddl.state

class MaxHeuristic(Heuristic):
    def are_goals_satisfied(self, state, positive_goals, negative_goals):
        return (pddl.state.applicable(state, positive_goals, negative_goals))
    
    def is_goal_satisfied(self, state, goal):
        return goal in state

    def can_apply_action_to_state(self, state, action):
        return pddl.state.applicable(state, action.positive_preconditions, action.negative_preconditions)

    def get_state_with_applied_action(self, state, action):
        return pddl.state.apply(state, action.add_effects, action.del_effects)
    
    def get_cost_to_solution_of_goal(self, initial_state, actions, goal, is_a_positive_goal):
        if (is_a_positive_goal and self.is_goal_satisfied(initial_state, goal)):
            return 0
        if (not is_a_positive_goal and not self.is_goal_satisfied(initial_state, goal)):
            return 0
        # List of all past states so we never cross the same state twice
        past_states = [initial_state];
        # A list to hold the cost of each path we find, so we can find which is closer
        path_costs = [0];
        # A list to keep track of all successfull path costs
        paths_heuristics = [];
        stateId = 0;
        while (stateId < len(past_states)):
            # Get the node we will be searching and it's cost
            state = past_states[stateId]
            cost = 1+path_costs[stateId]
            # Attempt to execute the actions so that we create new states
            for action in actions:
                if (not self.can_apply_action_to_state(state, action)):
                    continue
                new_state = self.get_state_with_applied_action(state, action)
                # Check if we satisfied the goal with this action
                if (is_a_positive_goal and self.is_goal_satisfied(new_state, goal)):
                    if (len(paths_heuristics) < 5):
                        paths_heuristics.append(cost)
                    else:
                        paths_heuristics.append(cost)
                        print("positive goal can be solved by paths with costs: ",paths_heuristics)
                        return min(paths_heuristics)
                if (not is_a_positive_goal and not self.is_goal_satisfied(new_state, goal)):
                    if (len(paths_heuristics) < 5):
                        paths_heuristics.append(cost)
                    else:
                        paths_heuristics.append(cost)
                        print("negative goal can be solved by paths with costs: ",paths_heuristics)
                        return min(paths_heuristics)
                    # return cost # paths_heuristics.append(cost)
                # Check if we have visited this node yet
                if (new_state in past_states):
                    continue
                # Add this node to the list so that we can continue the search
                past_states.append(new_state)
                path_costs.append(cost)
            stateId += 1
        return float('inf')

    def h(self, actions, initial_state, positive_goals, negative_goals, debug=False):
        if (self.are_goals_satisfied(initial_state, positive_goals, negative_goals)):
            return 0
        steps = []

        for goal in positive_goals:
            steps.append(self.get_cost_to_solution_of_goal(initial_state, actions, goal, True))

        for goal in negative_goals:
            steps.append(self.get_cost_to_solution_of_goal(initial_state, actions, goal, False))
        
        return max(steps) if (len(steps) != 0) else float('inf')

            

# Heuristic

## Implement the heuristic function

You will implement the Max-cost heuristic. 
Return estimated distance between current ``state`` $s$ and ``goal`` $G$, a number between 0 (when $s \models G$) and infinity (when $G$ is unreachable).

$$
h^{max}(s,g)=\max_{g_i \in G}
\begin{cases}
0, & \text{if } g_i \in s\\
\min \{cost(a) + h^{max}(s,pre(a))\text{ | }a \in A \text{ and }g_i \in \text{eff}(a)\} & \text{otherwise}\\
\end{cases}
$$

where cost is $cost$ of the action (usually $1$), and $pre(a)$ is the set of precoditions of action $a$, and $f$ is the set of effects of actions $a$. Your code must be contained in the ``h(self, actions, initial_state, positive_goals, negative_goals)`` function in the cell below. You can create additional functions (do not forget to comment the code intelligibly). H takes the following inputs:
- ``actions``: list of ground actions
- ``initial_state``: initial state of the problem file
- ``positive_goals``: positive predicates of the goal
- ``negative_goals``: negative predicates of the goal

## Test the heuristic function

We will test the Max Heuristic using 3 different domains, dinner, tsp and dwr. The state used is the initial state of each problem. 

At each execution we show the expected value for the Max-cost heuristic.

In [30]:
from pddl.pddl_parser import PDDL_Parser
from pddl.action import Action
from pddl.state import applicable, apply

# The following should be visible to the students
# Load some domain and some problem
dwr = "examples/dwr/dwr.pddl"
pb1_dwr = "examples/dwr/pb1.pddl"
pb2_dwr = "examples/dwr/pb2.pddl"

tsp = "examples/tsp/tsp.pddl"
pb1_tsp = "examples/tsp/pb1.pddl"

dinner = "examples/dinner/dinner.pddl"
pb1_dinner = "examples/dinner/pb1.pddl"

def parse_domain_problem(domain, problem):
    parser = PDDL_Parser()
    parser.parse_domain(domain)
    parser.parse_problem(problem)
    # Grounding process
    actions = []
    for action in parser.actions:
        for act in action.groundify(parser.objects):
            actions.append(act)
    return parser, actions

def test_heuristic(domain, problem, h, expected):
    parser, actions = parse_domain_problem(domain, problem)
    v = h.h(actions, parser.state, parser.positive_goals, parser.negative_goals)
    print("Expected " + str(expected) + ", got:", str(v) + ('. Correct!' if v == expected else '. False!'))

# Apply Hmax to initial states of many problems from many domains
h = MaxHeuristic()
test_heuristic(dwr, pb1_dwr, h, 6)
test_heuristic(dwr, pb2_dwr, h, 0)
test_heuristic(tsp, pb1_tsp, h, 2)
test_heuristic(dinner, pb1_dinner, h, 1)

positive goal can be solved by paths with costs:  [7, 8, 8, 8, 8, 8]
positive goal can be solved by paths with costs:  [5, 6, 6, 6, 6, 6]
positive goal can be solved by paths with costs:  [9, 10, 10, 10, 10, 11]
Expected 6, got: 9. False!
Expected 0, got: 0. Correct!
positive goal can be solved by paths with costs:  [2, 3, 3, 3, 4, 4]
positive goal can be solved by paths with costs:  [2, 3, 3, 3, 3, 4]
positive goal can be solved by paths with costs:  [1, 2, 2, 2, 2, 2]
positive goal can be solved by paths with costs:  [1, 2, 2, 2, 3, 3]
positive goal can be solved by paths with costs:  [2, 2, 3, 3, 3, 3]
Expected 2, got: 2. Correct!
positive goal can be solved by paths with costs:  [1, 2, 2, 2, 2, 2]
positive goal can be solved by paths with costs:  [1, 2, 2, 2, 2, 2]
negative goal can be solved by paths with costs:  [1, 1, 2, 2, 2, 2]
Expected 1, got: 1. Correct!


# Validator

## Implement the validate function
You will now implement a validator capable of verifying if a plan is valid to a specific domain and problem. The validator must return **True** if and only if the given plan is applicable and reaches the specified goal, and **False** if the plan itself is **not** applicable **or**  the given plan does **not** achieve the specified goal. Your code must be contained in the ```validate(self, actions, initial_state, positive_goals, negative_goals, plan)``` function in the cell below. You can create additional functions (do not forget to comment the code intelligibly). Validate takes the following inputs:
- ``actions``: list of ground actions
- ``initial_state``: initial state of the problem file
- ``positive_goals``: positive predicates of the goal
- ``negative_goals``: negative predicates of the goal
- ``plan``: plan parsed from a plan trace

In [32]:
from pddl.pddl_parser import PDDL_Parser
from pddl.action import Action
from pddl.state import applicable, apply

class Validator:

    def parse_plan(self, filename):
        with open(filename,'r') as f:
            plan = []
            for act in f.read().splitlines():
                act = act[1:-1].split()
                plan.append(Action(act[0], tuple(act[1:]), None, None, None, None))
            return plan

    def validate_file(self, domainfile, problemfile, planfile):
        return self.validate_plan(domainfile, problemfile, self.parse_plan(planfile))

    def validate_plan(self, domainfile, problemfile, plan):
        # Parser
        parser = PDDL_Parser()
        parser.parse_domain(domainfile)
        parser.parse_problem(problemfile)
        # Grounding process
        ground_actions = []
        for action in parser.actions:
            for act in action.groundify(parser.objects):
                ground_actions.append(act)
        return self.validate(ground_actions, parser.state, parser.positive_goals, parser.negative_goals, plan)
    
    # Obvious helper methods to make my code more verbose
    def are_goals_satisfied(self, initial_state, positive_goals, negative_goals):
        return (pddl.state.applicable(initial_state, positive_goals, negative_goals))

    def can_apply_action_to_state(self, state, action):
        return pddl.state.applicable(state, action.positive_preconditions, action.negative_preconditions)

    def get_state_with_applied_action(self, state, action):
        return pddl.state.apply(state, action.add_effects, action.del_effects)

    def fix_incomplete_plan(self, plan, actions):
        """
        Once upon a time someone instanced a class without it's constructor parameters.
        Little did he know that optional parameters are for parameters that are optional.
        This method tries to solve this inconsistency.
        """
        correct_plan = []
        for frankenstein in plan: # It stops being an "Action" when it becomes an incomplete object
            for action in actions:
                # Is the action it should be in the first place?
                if (action.name == frankenstein.name and action.parameters == frankenstein.parameters):
                    correct_plan.append(action)
                    continue
        return correct_plan

    # =====================================
    # Params:
    # actions -> list of ground actions
    # initial_state -> initial state of the problem file
    # positive_goals -> positive predicates of the goal
    # negative_goals -> negative predicates of the goal
    # plan -> plan parsed from a plan trace
    # 
    # Returns:
    # True when following the plan implies on finishing at a state that satisfy the goals
    # =====================================
    def validate(self, actions, initial_state, positive_goals, negative_goals, plan):
        correct_plan = self.fix_incomplete_plan(plan, actions);
        state = initial_state
        for action in correct_plan:
            if (not self.can_apply_action_to_state(state, action)):
                return False
            state = self.get_state_with_applied_action(state, action)
        # The plan only works if you finish the goal at the last step
        return self.are_goals_satisfied(state, positive_goals, negative_goals)

## Test the validate function
In this test, we verify the correctness of the implemented validator using the **[dwr](examples/dwr)** domain. Consider running more tests to ensure the correctness of the implemented function.

In [33]:
dwr = "examples/dwr/dwr.pddl"
pb1 = "examples/dwr/pb1.pddl"
plan1 = "examples/dwr/dwr_pb1_bfs.plan"
plan2 = "examples/dwr/dwr_pb1_heuristic.plan"
plan_empty = "examples/dwr/empty.plan"
val = Validator()
print("Expected True, got:", str(val.validate_file(dwr, pb1, plan1)))
print("Expected True, got:", str(val.validate_file(dwr, pb1, plan2)))
print("Expected False, got:", str(val.validate_file(dwr, pb1, plan_empty)))


Expected True, got: True
Expected True, got: True
Expected False, got: False


# Planner

## Implement the planner ```solve``` function
You will implement the **A\*** search. This search must use the implemented **Max-cost** heuristic. The search receives a **domain** pddl file and a **problem** pddl file (both are already parsed for you). The search must always return an optimal plan, given that there is a solution for the given problem in the specified domain. Your code must be contained in the ``solve(self, actions, initial_state, positive_goals, negative_goals)`` function (in the following cell). Solve takes the following inputs:
- ``actions``: list of grounded actions
- ``initial_state``: initial state of the problem file
- ``positive_goals``: positive predicates of the goal
- ``negative_goals``: negative predicates of the goal

In [41]:
from pddl.pddl_planner import PDDL_Planner
from pddl.state import applicable, apply
import sys
import queue as queue

class Heuristic_Planner(PDDL_Planner):

    def __init__(self, heuristic=MaxHeuristic()):
        self.h = heuristic
        
    # Obvious helper methods to make my code more verbose
    def are_goals_satisfied(self, initial_state, positive_goals, negative_goals):
        return (pddl.state.applicable(initial_state, positive_goals, negative_goals))

    def can_apply_action_to_state(self, state, action):
        return pddl.state.applicable(state, action.positive_preconditions, action.negative_preconditions)

    def get_state_with_applied_action(self, state, action):
        return pddl.state.apply(state, action.add_effects, action.del_effects)


    # -----------------------------------------------
    # Solve
    # -----------------------------------------------
    
    # =====================================
    # Params:
    # actions -> list of grounded actions
    # initial_state -> initial state of the problem file
    # positive_goals -> positive predicates of the goal
    # negative_goals -> negative predicates of the goal
    #
    # Returns:
    # A plan (list of Actions) that solves the goal from the initial_state
    # =====================================
    
    def solve(self, actions, initial_state, positive_goals, negative_goals):
        if (self.are_goals_satisfied(initial_state, positive_goals, negative_goals)):
            return 0
        # List of all past states so we never cross the same state twice
        past_states = [initial_state]
        # List of where each past state came from, so we can backtrace if needed
        backtraces = [None]
        actions_taken = [None]
        # A list to hold the cost of each path we find, so we can find which is closer
        path_costs = [0]
        # A list to keep track of all successfull path costs
        paths_heuristics = [];
        
        stateId = 0
        limit = 100000
        while (stateId < len(past_states)):
            # Iteration limiter so we don't run out of memory!
            limit -= 1 
            if (limit <= 0):
                print("Heuristic bail!")
                break
            state = past_states[stateId]
            cost = 1+path_costs[stateId]
            for action in actions:
                # Skip action if it cannot be executed at this state
                if (not self.can_apply_action_to_state(state, action)):
                    continue
                # Create new state with that action
                new_state = self.get_state_with_applied_action(state, action)
                # Check if we reached the destination
                if (self.are_goals_satisfied(new_state, positive_goals, negative_goals)):
                    return range(0, cost);
                # Skip new state if it has already been visited
                if (new_state in past_states):
                    continue
                # Add this new node to our lists
                past_states.append(new_state)
                backtraces.append(stateId)
                path_costs.append(cost)
                actions_taken.append(actions)
            stateId += 1
        
        if (debug):
            print("No path found. Returning None");
        return None

## Test planner completeness and optimality
Here we perform a simple test to verify if the lenght of the plan found by your implementation is step optimal. Please note, that this test does not verify if the plan is valid (but we are going to test this). You can use your own implementation of the validator to verify this (highly recommended).

In [42]:
#Student_tests
dwr = "examples/dwr/dwr.pddl"
pb1 = "examples/dwr/pb1.pddl"
pb2 = "examples/dwr/pb2.pddl"
planner = Heuristic_Planner()

plan, time = planner.solve_file(dwr, pb1, False)
print("Expected 17, got:", str(len(plan)) + ('. Correct!' if len(plan) == 17 else '. False!'))
plan, time = planner.solve_file(dwr, pb2, False)
print("Expected 0, got:", str(len(plan)) + ('. Correct!' if len(plan) == 0 else '. False!'))


Expected 17, got: 17. Correct!
Expected 0, got: 0. Correct!


## Test planner output time

Here we will test the output time of the implemented search function. The **maximum** acceptable output time is **60 seconds** for the given domains. Please consider that a good implementation should take less than 20 seconds (depending on the machine) for any of the given problems.

In [None]:
#Student_tests
dwr = "examples/dwr/dwr.pddl"
pb1 = "examples/dwr/pb1.pddl"
pb2 = "examples/dwr/pb2.pddl"
planner = Heuristic_Planner()

plan, time = planner.solve_file(dwr, pb1, False)
print("Elapsed time:", str(time) + (' Passed!' if time <= 60.0 else ' Timeout!'))

plan, time = planner.solve_file(dwr, pb2, False)
print("Elapsed time:", str(time) + (' Passed!' if time <= 60.0 else ' Timeout!'))
