## NextFLAP Example

This python notebook shows the usage of NextFLAP within unified planning library.

[![Open In GitHub](https://img.shields.io/badge/see-Github-579aca?logo=github)](https://github.com/aiplan4eu/up-nextflap/blob/master/example/NextFLAP.ipynb)
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aiplan4eu/up-nextflap/blob/master/example/NextFLAP.ipynb)


In particular we will go through the following steps:

 - create a numeric and temporal planning problem;
 - call the planner to solve the problem;


# Setup

First, we install up_nextflap library and its dependencies from PyPi. Here, we use the `--pre` flag to use the latest development build.

In [None]:
!pip install --pre up_nextflap

### Imports
The basic imports we need for this demo are abstracted in the `shortcuts` package.

In [2]:
import unified_planning as up
from unified_planning.engines import PlanGenerationResultStatus
from unified_planning.shortcuts import (Problem, UserType, Object, Variable,
                                        BoolType, RealType,
                                        StartTiming, EndTiming, ClosedTimeInterval, Timing,
                                        Equals, GE, GT, LT, LE, Not,
                                        Plus, Minus, Times, Div,
                                        Forall)
from unified_planning.model.timing import Timepoint, TimepointKind

##Importing and registering NextFLAP

We register NextFLAP as a planning engine on the UPF platform:

In [3]:
from up_nextflap import NextFLAPImpl

env = up.environment.get_environment()
env.factory.add_engine('nextflap', __name__, 'NextFLAPImpl')

### Problem representation

In this example, we are going to model a problem in which a robot has to fill several water tanks up to a certain limit.

#### Types

The first thing to do is to introduce the required types. In our case, we will define the types `robot`, `location` and `jar`:

In [4]:
robot = UserType('robot')
location = UserType('location')
jar = UserType('jar')

## Predicates

Now we define the problem and the predicates (boolean fluents) that we need:
*  `at-robot`: with two parameters (`robot` and `location`), it models the position of the robot.
*  `at-jar`: with two parameters (`jar` and `location`), it models the position of a jar.
* `at-tap`: with one parameter (`location`), indicates the locations where there is a tap (necessary to fill the jar).
*  `at-tank`: with one parameter (`location`), it models the locations where there is a water tank.
*  `at-station`: with one parameter (`location`), it models the locations that have a charging station. At these stations, the robots can recharge their batteries.
*  `attached-to-station`: with one parameter (`robot`), it indicates when a robot is docked to a station. The battery can only be recharged while the robot is docked.
*  `empty`: with one parameter (`robot`), indicates that the robot does not carry a jar.
*  `has`: with two parameters (`robot` and `jar`), indicates that the robot is carrying that jar. A robot can only carry one jar at a time.
*  `using-jar`: with one parameter (`robot`), indicates that the robot is using its jar. It is used to prevent the robot from filling and emptying its jar at the same time.
*  `using-tap`: with one parameter (`location`), indicates that the tap at that location is being used. It is needed to prevent multiple robots from using the same tap at the same time.
*   `tank_ready`: with one parameter (`location`), it represents that the water tank located at this location has reached the desired level of water. Once the tank is ready, its level can no longer be modified.

In [5]:
p = Problem('WaterTanks')

# Boolean fluents
def add_predicate(p, name, **kwargs):
    predicate = up.model.Fluent(name, BoolType(), **kwargs)
    return p.add_fluent(predicate, default_initial_value=False)

at_robot = add_predicate(p, 'at-robot', r=robot, l=location)
at_jar = add_predicate(p, 'at-jar', j=jar, l=location)
at_tap = add_predicate(p, 'at-tap', l=location)
at_tank = add_predicate(p, 'at-tank', l=location)
at_station = add_predicate(p, 'at-station', l=location)
attached_to_station = add_predicate(p, 'attached-to-station', r=robot)
empty = add_predicate(p, 'empty', r=robot)
has = add_predicate(p, 'has', r=robot, j=jar)
using_jar = add_predicate(p, 'using-jar', r=robot)
using_tap = add_predicate(p, 'using-tap', l=location)
tank_ready = add_predicate(p, 'tank-ready', l=location)

## Numeric fluents

Now we define the numeric fluents we need:
*   `jar-level`: with one parameter (`jar`), it models the amount of water in the jar.
*   `max-jar-level`: with one parameter (`jar`), it represents the maximum capacity of the jar.
*   `tank-level`: with one parameter (`location`), it models the amount of water contained in the tank located at the given location.
*   `tank-goal`: with one parameter (`location`), it represents the desired amount of water for the tank located at that location.
*   `battery-level`: with one parameter (`robot`), it models the battery level of the robot (real number from 0% to 100%).

In [6]:
def add_function(p, name, **kwargs):
    return p.add_fluent(up.model.Fluent(name, RealType(), **kwargs))

jar_level = add_function(p, 'jar-level', j=jar)
max_jar_level = add_function(p, 'max-jar-level', j=jar)
tank_level = add_function(p, 'tank-level', l=location)
tank_goal = add_function(p, 'tank-goal', l=location)
battery_level = add_function(p, 'battery-level', r=robot)

## Action `move`

Using a `move` action, a robot can move from one location to another. This action only applies when the robot is not carrying any jars and consumes 10% of the battery.

In [7]:
move = up.model.DurativeAction('move', r=robot, l1=location, l2=location)
r, l1, l2 = move.parameters
move.set_fixed_duration(10)
move.add_condition(StartTiming(), Not(Equals(l1, l2)))
move.add_condition(StartTiming(), GE(battery_level(r), 10))
move.add_condition(StartTiming(), empty(r))
move.add_condition(StartTiming(), at_robot(r, l1))
move.add_effect(StartTiming(), at_robot(r, l1), False)
move.add_effect(EndTiming(), at_robot(r, l2), True)
move.add_decrease_effect(EndTiming(), battery_level(r), 10)
p.add_action(move)

## Action `move-with-jar`

Similar to the previous one, but used when the robot transports a jar. The **duration of this action depends on the weight transported**, that is, on the amount of water the robot carries in its jar: `10 + jar-level(r)/10`. This action also consumes 10% of the battery.

In [8]:
move_with_jar = up.model.DurativeAction('move-with-jar', r=robot, j=jar, l1=location, l2=location)
r, j, l1, l2 = move_with_jar.parameters
move_with_jar.set_fixed_duration(Plus(10, Div(jar_level(j), 10)))
move_with_jar.add_condition(StartTiming(), Not(Equals(l1, l2)))
move_with_jar.add_condition(StartTiming(), GE(battery_level(r), 10))
move_with_jar.add_condition(StartTiming(), has(r, j))
move_with_jar.add_condition(StartTiming(), at_robot(r, l1))
move_with_jar.add_effect(StartTiming(), at_robot(r, l1), False)
move_with_jar.add_effect(EndTiming(), at_robot(r, l2), True)
move_with_jar.add_decrease_effect(EndTiming(), battery_level(r), 10)
p.add_action(move_with_jar)

## Action `take-jar`

Using a `take-jar` action, an empty robot can pick up one of the jars. This action consumes 1% of the battery.

In [9]:
take_jar = up.model.DurativeAction('take-jar', r=robot, j=jar, l=location)
r, j, l = take_jar.parameters
take_jar.set_fixed_duration(1)
take_jar.add_condition(StartTiming(), GE(battery_level(r), 1))
take_jar.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), at_robot(r, l))
take_jar.add_condition(StartTiming(), at_jar(j, l))
take_jar.add_condition(StartTiming(), empty(r))
take_jar.add_effect(StartTiming(), empty(r), False)
take_jar.add_effect(StartTiming(), at_jar(j, l), False)
take_jar.add_effect(EndTiming(), has(r, j), True)
take_jar.add_decrease_effect(EndTiming(), battery_level(r), 1)
p.add_action(take_jar)

## Action `drop-jar`

Using a `drop-jar` action, a robot can leave the jar it is carrying. This action consumes 1% of the battery.

In [10]:
drop_jar = up.model.DurativeAction('drop-jar', r=robot, j=jar, l=location)
r, j, l = drop_jar.parameters
drop_jar.set_fixed_duration(1)
drop_jar.add_condition(StartTiming(), GE(battery_level(r), 1))
drop_jar.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), at_robot(r, l))
drop_jar.add_condition(StartTiming(), has(r, j))
drop_jar.add_effect(StartTiming(), has(r, j), False)
drop_jar.add_effect(EndTiming(), at_jar(j, l), True)
drop_jar.add_effect(EndTiming(), empty(r), True)
drop_jar.add_decrease_effect(EndTiming(), battery_level(r), 1)
p.add_action(drop_jar)

## Action `empty-jar`

Using an `empty-jar` action, a robot can pour all the water it carries in its jar into a tank. The duration of this action is directly proportional to the amount of water poured. This action leaves the jar empty while increasing the level of the tank with the amount poured. This action consumes 5% battery.

In [11]:
empty_jar = up.model.DurativeAction('empty-jar', r=robot, j=jar, l=location)
r, j, l = empty_jar.parameters
empty_jar.set_fixed_duration(jar_level(j))
empty_jar.add_condition(StartTiming(), GE(battery_level(r), 5))
empty_jar.add_condition(StartTiming(), LE(Plus(jar_level(j), tank_level(l)), 100))
empty_jar.add_condition(StartTiming(), Not(using_jar(r)))
empty_jar.add_condition(StartTiming(), at_tank(l))
empty_jar.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), at_robot(r, l))
empty_jar.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), Not(tank_ready(l)))
empty_jar.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), has(r, j))
empty_jar.add_effect(StartTiming(), using_jar(r), True)
empty_jar.add_decrease_effect(EndTiming(), battery_level(r), 5)
empty_jar.add_increase_effect(EndTiming(), tank_level(l), jar_level(j))
empty_jar.add_effect(EndTiming(), jar_level(j), 0)
empty_jar.add_effect(EndTiming(), using_jar(r), False)
p.add_action(empty_jar)

## Action `fill-jar`

Using a `fill-jar` action, a robot can fill its jar. The third parameter of the action is a **numeric parameter**, which allows the planner to choose how much water to pour into the jar. The duration of this action is directly proportional to the amount of water poured. In any case, this action consumes 5% battery.

In [12]:
fill_jar = up.model.DurativeAction('fill-jar', r=robot, j=jar, l=location, amount=RealType())
r, j, l, amount = fill_jar.parameters
fill_jar.set_fixed_duration(amount)
fill_jar.add_condition(StartTiming(), GE(amount, 1))
fill_jar.add_condition(StartTiming(), LE(amount, Minus(max_jar_level(j), jar_level(j))))
fill_jar.add_condition(StartTiming(), GE(battery_level(r), 5))
fill_jar.add_condition(StartTiming(), Not(using_tap(l)))
fill_jar.add_condition(StartTiming(), Not(using_jar(r)))
fill_jar.add_condition(StartTiming(), at_tap(l))
fill_jar.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), at_robot(r, l))
fill_jar.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), has(r, j))
fill_jar.add_effect(StartTiming(), using_tap(l), True)
fill_jar.add_effect(StartTiming(), using_jar(r), True)
fill_jar.add_decrease_effect(EndTiming(), battery_level(r), 5)
fill_jar.add_increase_effect(EndTiming(), jar_level(j), amount)
fill_jar.add_effect(EndTiming(), using_tap(l), False)
fill_jar.add_effect(EndTiming(), using_jar(r), False)
p.add_action(fill_jar)

## Action `close-tank`

This action closes a tank when the water it contains has reached the desired level. Once closed, its water level can no longer be modified.

In [13]:
close_tank = up.model.DurativeAction('close-tank', l=location)
l = close_tank.parameter('l')
close_tank.set_fixed_duration(1)
close_tank.add_condition(StartTiming(), at_tank(l))
close_tank.add_condition(StartTiming(), Equals(tank_level(l), tank_goal(l)))
close_tank.add_effect(StartTiming(), tank_ready(l), True)
p.add_action(close_tank)

## Action `attach`

Using an `attach` action, a robot can dock to a charging station. To do this, it is a condition that the station is free (it can only be used by one robot at the same time). We model this using a **Forall** expression.
The planner is in charge of deciding how long the robot will remain anchored to the station (between 1 and 100 minutes). This is represented by a **duration interval**.

In [14]:
attach = up.model.DurativeAction('attach', r=robot, l=location)
r, l = attach.parameters
rob = Variable('rob', robot)
attach.set_closed_duration_interval(1, 100)
attach.add_condition(StartTiming(), at_station(l))
attach.add_condition(StartTiming(), Forall(Not(attached_to_station(rob)), rob))
attach.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), at_robot(r, l))
attach.add_effect(StartTiming(), attached_to_station(r), True)
attach.add_effect(EndTiming(), attached_to_station(r), False)
p.add_action(attach)

## Action `charge`

Using a `charge` action, a robot can recharge its battery. The third parameter of the action is a **numeric parameter**, which allows the planner to choose the percentage of battery to recharge.

The duration of this action is a **non-linear expression**, which tries to model the fact that batteries charge slower as they get full.

In [15]:
charge = up.model.DurativeAction('charge', r=robot, l=location, percentage=RealType())
r, l, percentage = charge.parameters
charge.set_fixed_duration(Plus(percentage, Div(Times(battery_level(r),
                          Times(battery_level(r), percentage)), 5000)))
charge.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), attached_to_station(r))
charge.add_condition(ClosedTimeInterval(StartTiming(), EndTiming()), at_robot(r, l))
charge.add_condition(StartTiming(), LT(battery_level(r), 100))
charge.add_condition(StartTiming(), LE(percentage, Minus(100, battery_level(r))))
charge.add_condition(StartTiming(), GT(percentage, 0))
charge.add_increase_effect(EndTiming(), battery_level(r), percentage)
p.add_action(charge)

## Objects

We are now going to create an scenario with two robots (`r1` and `r2`), three locations (`a`, `b` and `c`) and two jars (`jar1` and `jar2`).

In [16]:
a = p.add_object(Object('a', location))
b = p.add_object(Object('b', location))
c = p.add_object(Object('c', location))
r1 = p.add_object(Object('r1', robot))
r2 = p.add_object(Object('r2', robot))
jar1 = p.add_object(Object('jar1', jar))
jar2 = p.add_object(Object('jar2', jar))

## Initial state

In our example problem, we will have a charging station at location `a`, two taps (at location `a` and `b`, respectively) and three water tanks (one at each location).

The taps at `a` and `b` are initially in use, but will become available at times 100 and 200, respectively. We model this using TILs (**timed initial literals**).

Robots `r1` and `r2` will initially be in location `a` and `c`, respectively, both with a battery level of 10%.

`jar1` is empty and has a capacity of 20 liters. `jar2` already contains 10 liters of water and its capacity is 40 liters.

Tank at location `a` is initially empty and must be filled up to 10 liters. Tank at `b` is also empty and must be filled up to 20 liters. Finally, Tank at `b` contains 10 liters and must be filled up to 20 liters.

In [17]:
p.set_initial_value(at_robot(r1, a), True)
p.set_initial_value(at_robot(r2, c), True)
p.set_initial_value(empty(r1), True)
p.set_initial_value(empty(r2), True)
p.set_initial_value(battery_level(r1), 10)
p.set_initial_value(battery_level(r2), 10)
p.set_initial_value(at_jar(jar1, a), True)
p.set_initial_value(at_jar(jar2, c), True)
p.set_initial_value(jar_level(jar1), 0)
p.set_initial_value(jar_level(jar2), 10)
p.set_initial_value(max_jar_level(jar1), 20)
p.set_initial_value(max_jar_level(jar2), 40)
p.set_initial_value(at_station(a), True)
p.set_initial_value(at_tap(a), True)
p.set_initial_value(at_tap(b), True)
p.set_initial_value(using_tap(a), True)
p.set_initial_value(using_tap(b), True)
p.add_timed_effect(Timing(100, Timepoint(TimepointKind.GLOBAL_START)), using_tap(a), False)
p.add_timed_effect(Timing(200, Timepoint(TimepointKind.GLOBAL_START)), using_tap(b), False)
p.set_initial_value(at_tank(a), True)
p.set_initial_value(at_tank(b), True)
p.set_initial_value(at_tank(c), True)
p.set_initial_value(tank_level(a), 0)
p.set_initial_value(tank_level(b), 0)
p.set_initial_value(tank_level(c), 10)
p.set_initial_value(tank_goal(a), 10)
p.set_initial_value(tank_goal(b), 20)
p.set_initial_value(tank_goal(c), 20)

## Goal

Finally, the objective is for all the tanks to be ready, that is, to be closed once they have reached their desired level.

In [18]:
p.add_goal(tank_ready(a))
p.add_goal(tank_ready(b))
p.add_goal(tank_ready(c))

## Solving the problem

We now select the NextFLAP planner to solve the problem we have defined and show the plan obtained.

In [None]:
with env.factory.OneshotPlanner(name='nextflap') as planner:
    result = planner.solve(p, timeout=60.0)
    if result.status == PlanGenerationResultStatus.SOLVED_SATISFICING:
        print(f'; {planner.name} found a plan!')
        if result.plan.kind == up.plans.plan.PlanKind.PARTIAL_ORDER_PLAN:
            seqPlan = result.plan.convert_to(up.plans.plan.PlanKind.SEQUENTIAL_PLAN, p)
            for action in seqPlan.actions:
                print(action)
        else:
            for time, action, dur in result.plan.timed_actions:
                print(f'{time.numerator/time.denominator:.3f}: {action} [{dur.numerator/dur.denominator:.3f}]')
    else:
        print('No plan found!')