# Problem 2: Hierarchical Planning

In *Problem 2*, wa are going to solve the elevator domain using hierarchical planning.

## Setup

We will use the `unified-planning` package to model the hierarchical planning and employ the `Aries` planner to solve our hierarchical model.

Start by downloading the unified planning library and **the hierarchical planner** `Aries`:

```
pip install unified-planning
pip install unified-planning[aries]
pip install up-aries
```

An example to using hierarchical planning with `unified-planning` to solve the *Airport* problem can be found [here](https://unified-planning.readthedocs.io/en/latest/notebooks/07-hierarchical-planning.html#Case-study:-Logistics-problem-(IPC-1998)).

In [1]:
pip install unified-planning

Note: you may need to restart the kernel to use updated packages.


In [30]:
pip install unified-planning\[aries\]

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install up-aries

Note: you may need to restart the kernel to use updated packages.


In [67]:
import unified_planning as up
from unified_planning.shortcuts import *
from unified_planning.model.htn import *

## Initialization

For initialization, this problem differs from *Problem 1*. We set up the number of floors and the people in the planning problem as follows, where `FLOOR_LIST` represents the level for each person. In this case, multiple people can be on the same floor, unlike in *Problem 1*.


In [68]:
NUM_FLOOR = 3
NUM_PEOPLE = 5
FLOOR_LIST = [1,1,2,2,2]
print('number of floors: '+str(NUM_FLOOR))
print('number of people: '+str(NUM_PEOPLE))
print('people are originally at floor: '+ str(FLOOR_LIST))

number of floors: 3
number of people: 5
people are originally at floor: [1, 1, 2, 2, 2]


## Problem setup

We need to initialize a hierarchical planning problem using the `HierarchicalProblem` class, and later specify all its internal components.

After the initialization, you can print a summary of the problem, which will not include any components yet.

In [69]:
problem = HierarchicalProblem('ElevatorHTNProblem')
print(problem)

problem name = ElevatorHTNProblem

fluents = [
]

actions = [
]

initial fluents default = [
]

initial values = [
]

goals = [
]

abstract tasks = [
]

methods = [
]

task network {
  subtasks = [
  ]
}


Next, we define the types of objects in the planning problem. In this case, we have four types of objects: `Loc`, which has two subtypes, `Floor` and `Elevator`; and `Person`. We then add these objects to the problem. After adding the objects, we can print the problem to see the changes.


In [70]:
Loc = UserType("Loc")
Floor = UserType('Floor', father=Loc)
Elevator = UserType('Elevator', father=Loc)
Person = UserType('Person')


# Add objects (floors and people)
floors = [Object(f'floor{i}', Floor) for i in range(NUM_FLOOR)]
people = [Object(f'person{i}', Person) for i in range(NUM_PEOPLE)]
elevator = [Object('elevator', Elevator)]
problem.add_objects(floors + people + elevator)
print(problem)

problem name = ElevatorHTNProblem

types = [Loc, Floor - Loc, Person, Elevator - Loc]

fluents = [
]

actions = [
]

objects = [
  Loc: [floor0, floor1, floor2, elevator]
  Floor - Loc: [floor0, floor1, floor2]
  Person: [person0, person1, person2, person3, person4]
  Elevator - Loc: [elevator]
]

initial fluents default = [
]

initial values = [
]

goals = [
]

abstract tasks = [
]

methods = [
]

task network {
  subtasks = [
  ]
}


Now the problem has types to represent a floor, person, elevator, and the location (of the person). In this step, we add `fluents` to describe the state of the world in the planning problem. There are three fluents we need to define:

1. **at_person**: A person is at a given floor.
2. **at_elevator**: The elevator is at a given floor.
3. **elevator_door_open**: Indicates whether the elevator door is open or closed.

Remember to add these fluents to the problem.


In [71]:
at_person = Fluent('at_person', Loc, person=Person)
at_elevator = Fluent('at_elevator', Floor, elevator=Elevator)
elevator_door_open = Fluent('elevator_door_open', BoolType(), elevator=Elevator)

problem.add_fluent(at_person)
problem.add_fluent(at_elevator)
problem.add_fluent(elevator_door_open)
print(problem)

problem name = ElevatorHTNProblem

types = [Loc, Floor - Loc, Person, Elevator - Loc]

fluents = [
  Loc at_person[person=Person]
  Floor - Loc at_elevator[elevator=Elevator - Loc]
  bool elevator_door_open[elevator=Elevator - Loc]
]

actions = [
]

objects = [
  Loc: [floor0, floor1, floor2, elevator]
  Floor - Loc: [floor0, floor1, floor2]
  Person: [person0, person1, person2, person3, person4]
  Elevator - Loc: [elevator]
]

initial fluents default = [
]

initial values = [
]

goals = [
]

abstract tasks = [
]

methods = [
]

task network {
  subtasks = [
  ]
}


We can see that the fluents contain logical statements to represent the location of the people or the elevator and whether the elevator door is open. We then set the initial values for the logical statements we just created.

In [72]:
for i in range(NUM_PEOPLE):
    problem.set_initial_value(at_person(people[i]), floors[int(FLOOR_LIST[i])])

problem.set_initial_value(at_elevator(elevator[0]), floors[0])
problem.set_initial_value(elevator_door_open(elevator[0]), False)

print(problem)

problem name = ElevatorHTNProblem

types = [Loc, Floor - Loc, Person, Elevator - Loc]

fluents = [
  Loc at_person[person=Person]
  Floor - Loc at_elevator[elevator=Elevator - Loc]
  bool elevator_door_open[elevator=Elevator - Loc]
]

actions = [
]

objects = [
  Loc: [floor0, floor1, floor2, elevator]
  Floor - Loc: [floor0, floor1, floor2]
  Person: [person0, person1, person2, person3, person4]
  Elevator - Loc: [elevator]
]

initial fluents default = [
]

initial values = [
  at_person(person0) := floor1
  at_person(person1) := floor1
  at_person(person2) := floor2
  at_person(person3) := floor2
  at_person(person4) := floor2
  at_elevator(elevator) := floor0
  elevator_door_open(elevator) := false
]

goals = [
]

abstract tasks = [
]

methods = [
]

task network {
  subtasks = [
  ]
}


## Task 1: Add Actions

After adding the objects and fluents to the problem, we need to create actions. The actions **differ** from those in *Problem 1*; we have combined the original `move_up` and `move_down` actions into a single action, `move_elevator`, which takes two arguments: the start floor and the end floor.

1. **move_elevator**: Moves the elevator from the start floor to the end floor if the elevator door is closed.
2. **enter_elevator**: Moves a person into the elevator if the person is on the same floor as the elevator and the elevator door is open.
3. **exit_elevator**:  Moves a person out of the elevator and onto the floor where the elevator is located if the elevator door is open.
4. **open_door**: Opens the elevator door if possible.
5. **close_door**: Closes the elevator door if possible.

To indicate the precoditions and effects of the actions:
1. Use `<action>.add_precondition()` to add the preconditions. [Check the documentation here](https://unified-planning.readthedocs.io/en/latest/api/model/InstantaneousAction.html#unified_planning.model.InstantaneousAction.add_precondition)
2. Use `<action>.add_effect()` to add the effects. [Check the documation here](https://unified-planning.readthedocs.io/en/latest/api/model/InstantaneousAction.html#unified_planning.model.InstantaneousAction.add_effect)


In [73]:
move_elevator = InstantaneousAction('move_elevator', elevator = Elevator, start=Floor, end=Floor)

# FILL IN HERE START
# COPY-FLAG-1-START

move_elevator.add_precondition(Not(elevator_door_open(move_elevator.elevator)))
move_elevator.add_precondition(Equals(at_elevator(move_elevator.elevator), move_elevator.start))

move_elevator.add_effect(at_elevator(move_elevator.elevator), move_elevator.end)

# COPY-FLAG-1-END
# FILL IN HERE END

print(move_elevator)

action move_elevator(Elevator - Loc elevator, Floor - Loc start, Floor - Loc end) {
    preconditions = [
      (not elevator_door_open(elevator))
      (at_elevator(elevator) == start)
    ]
    effects = [
      at_elevator(elevator) := end
    ]
  }


In [74]:
enter_elevator = InstantaneousAction('enter_elevator', elevator = Elevator, person=Person, floor = Floor)

# FILL IN HERE START
# COPY-FLAG-2-START

enter_elevator.add_precondition(elevator_door_open(move_elevator.elevator))
enter_elevator.add_precondition(Equals(at_person(enter_elevator.person), enter_elevator.floor))
enter_elevator.add_precondition(Equals(at_elevator(enter_elevator.elevator), enter_elevator.floor))

enter_elevator.add_effect(at_person(enter_elevator.person), enter_elevator.elevator)

# COPY-FLAG-2-END
# FILL IN HERE END

print(enter_elevator)

exit_elevator = InstantaneousAction('exit_elevator', elevator = Elevator, person=Person, floor = Floor)

# FILL IN HERE START
# COPY-FLAG-3-START

exit_elevator.add_precondition(elevator_door_open(exit_elevator.elevator))
exit_elevator.add_precondition(Equals(at_person(exit_elevator.person), exit_elevator.elevator))
exit_elevator.add_precondition(Equals(at_elevator(exit_elevator.elevator), exit_elevator.floor))

exit_elevator.add_effect(at_person(exit_elevator.person), exit_elevator.floor)

# COPY-FLAG-3-END
# FILL IN HERE END

print(exit_elevator)

action enter_elevator(Elevator - Loc elevator, Person person, Floor - Loc floor) {
    preconditions = [
      elevator_door_open(elevator)
      (at_person(person) == floor)
      (at_elevator(elevator) == floor)
    ]
    effects = [
      at_person(person) := elevator
    ]
  }
action exit_elevator(Elevator - Loc elevator, Person person, Floor - Loc floor) {
    preconditions = [
      elevator_door_open(elevator)
      (at_person(person) == elevator)
      (at_elevator(elevator) == floor)
    ]
    effects = [
      at_person(person) := floor
    ]
  }


In [75]:
open_door = InstantaneousAction('open_door', elevator = Elevator)

# FILL IN HERE START
# COPY-FLAG-4-START

open_door.add_precondition(Not(elevator_door_open(open_door.elevator)))

open_door.add_effect(elevator_door_open(open_door.elevator), True)

# COPY-FLAG-4-END
# FILL IN HERE END

print(open_door)

close_door = InstantaneousAction('close_door', elevator = Elevator)

# FILL IN HERE START
# COPY-FLAG-5-START

close_door.add_precondition(elevator_door_open(close_door.elevator))

close_door.add_effect(elevator_door_open(close_door.elevator), False)

# COPY-FLAG-5-END
# FILL IN HERE END

print(close_door)

action open_door(Elevator - Loc elevator) {
    preconditions = [
      (not elevator_door_open(elevator))
    ]
    effects = [
      elevator_door_open(elevator) := true
    ]
  }
action close_door(Elevator - Loc elevator) {
    preconditions = [
      elevator_door_open(elevator)
    ]
    effects = [
      elevator_door_open(elevator) := false
    ]
  }


After defining the actions, we can add them to the problem. 

In [76]:
problem.add_action(move_elevator)
problem.add_action(enter_elevator)
problem.add_action(exit_elevator)
problem.add_action(open_door)
problem.add_action(close_door)


Next, we add tasks and subtasks for the hierarchical planning problem. The goal task is to transport a person to the end floor. We separate the task into two levels:

1. Check if the elevator needs to move and transport the person.
2. If the person is already at the end floor, no operation is needed. If the person is not at the end floor, then we need to achieve a series of subtasks to transport the person from the start floor to the end floor.


In [77]:
def method_transport_person(transport_person):
    method = Method('method_transport_person', elevator = Elevator, person=Person, original_floor = Floor, start_floor=Floor, end_floor=Floor)
    method.set_task(transport_person, method.person, method.end_floor)
    # Conditions for the method to be applicable
    method.add_precondition(Equals(at_person(method.person), method.start_floor))
    method.add_precondition(Equals(at_elevator(method.elevator), method.original_floor))
    method.add_precondition(Not(Equals(at_person(method.person), method.end_floor)))
    method.add_precondition(Not(Equals(at_elevator(method.elevator), method.start_floor)))
    method.add_precondition(Not(elevator_door_open(method.elevator)))
    # this method decomposed into a sequence of 8 subtasks 
    t1 = method.add_subtask(move_elevator, method.elevator, method.original_floor, method.start_floor)
    t2 = method.add_subtask(open_door, method.elevator)
    t3 = method.add_subtask(enter_elevator, method.elevator, method.person, method.start_floor)
    t4 = method.add_subtask(close_door, method.elevator)
    t5 = method.add_subtask(move_elevator, method.elevator, method.start_floor, method.end_floor)
    t6 = method.add_subtask(open_door, method.elevator)
    t7 = method.add_subtask(exit_elevator, method.elevator, method.person, method.end_floor)
    t8 = method.add_subtask(close_door, method.elevator)
    method.set_ordered(t1, t2, t3, t4, t5, t6, t7, t8)
    # print(method)
    
    return method

def method_transport_noop(transport_person):
    method = Method('method_transport_noop', elevator = Elevator, person=Person, start_floor=Floor, end_floor=Floor)
    
    method.set_task(transport_person, method.person, method.end_floor)
    # Conditions for the method to be applicable
    method.add_precondition(Equals(at_person(method.person), method.start_floor))
    method.add_precondition(Equals(at_person(method.person), method.end_floor))
    # print(method)
    return method

In [78]:
transport_person = problem.add_task('transport_person', person=Person, end_floor=Floor)

problem.add_method(method_transport_person(transport_person))
problem.add_method(method_transport_noop(transport_person))

## Task 2: Defining the Goal

The final goal is to transport all people to floor 0, which is the ground floor. Therefore, we add `transport_person()` for all given objects(person).

In [79]:
for p in people:
    # FILL IN HERE START
    # COPY-FLAG-6-START
    
    problem.add_goal(Equals(at_person(p), floors[0]))
    problem.task_network.add_subtask(transport_person(p,floors[0]))
    
    # COPY-FLAG-6-END
    # FILL IN HERE END
    
# print(problem)

## Solve the Problem

After modeling the elevator domain with hierarchical planning, we can solve the problem by calling the `solve()` function in the `unified-planning` package. The function will return the plan if the problem is solvable.

**Please note**: If a plan cannot be found, you should check your hierarchical planning model. A well-defined hierarchical planning model should always yield a plan.


In [80]:
def solve(pb: Problem, verbose=False):
    result = OneshotPlanner(problem_kind=pb.kind).solve(pb)
    if result.plan is not None:
        print("Plan:", repr(result.plan) if verbose else str(result.plan))
    else:
        print(result.status)

solve(problem)

Plan: Hierarchical SequentialPlan:
    move_elevator(elevator, floor0, floor1)
    open_door(elevator)
    enter_elevator(elevator, person0, floor1)
    close_door(elevator)
    move_elevator(elevator, floor1, floor0)
    open_door(elevator)
    exit_elevator(elevator, person0, floor0)
    close_door(elevator)
    move_elevator(elevator, floor0, floor1)
    open_door(elevator)
    enter_elevator(elevator, person1, floor1)
    close_door(elevator)
    move_elevator(elevator, floor1, floor0)
    open_door(elevator)
    exit_elevator(elevator, person1, floor0)
    close_door(elevator)
    move_elevator(elevator, floor0, floor2)
    open_door(elevator)
    enter_elevator(elevator, person2, floor2)
    close_door(elevator)
    move_elevator(elevator, floor2, floor0)
    open_door(elevator)
    exit_elevator(elevator, person2, floor0)
    close_door(elevator)
    move_elevator(elevator, floor0, floor2)
    open_door(elevator)
    enter_elevator(elevator, person3, floor2)
    close_door(eleva