In [1]:
!pip install --pre unified-planning[tamer]

Collecting unified-planning[tamer]
  Downloading unified_planning-1.0.0.53.dev1-py3-none-any.whl (641 kB)
[K     |████████████████████████████████| 641 kB 9.0 MB/s eta 0:00:01
Collecting up-tamer==1.0.0; extra == "tamer"
  Downloading up_tamer-1.0.0-py3-none-any.whl (13 kB)
Collecting pytamer==0.1.15
  Downloading pytamer-0.1.15-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.0 MB)
[K     |████████████████████████████████| 6.0 MB 13.5 MB/s eta 0:00:01
[?25hInstalling collected packages: pytamer, up-tamer, unified-planning
Successfully installed pytamer-0.1.15 unified-planning-1.0.0.53.dev1 up-tamer-1.0.0


In [5]:
from unified_planning.shortcuts import *
from unified_planning.model.multi_agent import *
from collections import namedtuple
from unified_planning.io.ma_pddl_writer import MAPDDLWriter

# Parameters
locations_name = ["pick_area", "release_area"]

obj_types = ["w","b","o"]
n_4_type = 4

objects_name = [f"{obj_type}{k}" for obj_type in obj_types for k in range(1,n_4_type + 1)]

obj_goals = ["b1","b2","o1","o2","o3","o4"]

# Problem representation
problem = MultiAgentProblem("HRC")

# Agents definition
robot = Agent("robot", problem)
human = Agent("human", problem)
problem.add_agent(robot)
problem.add_agent(human)

# User Types
Location = UserType('Location')
Cube = UserType('Cube')
RobotCube = UserType('RobotCube', Cube)
HumanCube = UserType('HumanCube', Cube)

# Fluents
object_at = unified_planning.model.Fluent('object_at', BoolType(), obj=Cube, l=Location)
holding = unified_planning.model.Fluent('holding', BoolType(), obj=Cube)
free = unified_planning.model.Fluent('free', BoolType())    # Non prende un Cube come param in input

get_pick_duration = unified_planning.model.Fluent('get_pick_duration', IntType(), obj=Cube)    # Non prende un Cube come param in input
get_place_duration = unified_planning.model.Fluent('get_place_duration', IntType(), obj=Cube)    # Non prende un Cube come param in input

problem.ma_environment.add_fluent(object_at, default_initial_value=False)
robot.add_private_fluent(holding, default_initial_value=False)                           # default initial value None???, dovrebbe essere false per tutti
robot.add_private_fluent(free, default_initial_value=True)

locations = [Object(f"{location}", Location) for location in locations_name]
cubes = [Object(f"{object}", Cube) for object in objects_name]

problem.add_objects(locations)
problem.add_objects(cubes)

# Usefull specific locations
pick_area = next(filter(lambda location: "pick" in location.name, locations),None)
place_area = next(filter(lambda location: "release" in location.name, locations),None)

if not pick_area:
  print("No picking area exist, unable to go on")
  raise Exception("No picking area exist")
if not place_area:
  print("No place area exist, unable to go on")
  raise Exception("No place area exist")

# Robot actions

# Robot pick

pick_robot = DurativeAction('pick_robot', pick_obj=RobotCube)
pick_obj = pick_robot.parameter('pick_obj')

pick_robot.add_condition(StartTiming(),object_at(pick_obj, pick_area))
pick_robot.add_condition(StartTiming(),free)                             #come si fa a dire all(holding, obj) == False

pick_robot.set_fixed_duration(get_pick_duration(pick_obj))

pick_robot.add_effect(EndTiming(), object_at(pick_obj,pick_area), False)
pick_robot.add_effect(EndTiming(), holding(pick_obj), True)
pick_robot.add_effect(EndTiming(), free, False)

robot.add_action(pick_robot)


# Place Robot
place_robot = DurativeAction('place_robot', place_obj=HumanCube) #, l_to=Location)
place_obj = place_robot.parameter('place_obj')

place_robot.add_condition(StartTiming(),holding(place_obj))

pick_durative.set_fixed_duration(get_place_duration(place_obj))

place_robot.add_effect(EndTiming(), object_at(place_obj,place_area), True)
place_robot.add_effect(EndTiming(), holding(place_obj), False)
place_robot.add_effect(EndTiming(), free, True)

robot.add_action(place_robot)

# # # Human actions
# # human.add_action(pick_human)
# # human.add_action(place_human)

# print(f"Agents: {problem.agents}")

# Initial conditions
for obj in cubes:
  problem.set_initial_value(object_at(obj, pick_area), True)
  problem.set_initial_value(get_duration(obj), 5)

  # problem.set_initial_value(holding(obj), False)

# Task-Goal
for obj_goal in obj_goals:
  print(obj_goal)
  obj_goal_as_Obj = next(filter(lambda cube: obj_goal in cube.name, cubes),None)
  if obj_goal_as_Obj:
    problem.add_goal(object_at(obj_goal_as_Obj, place_area))


problem

b1
b2
o1
o2
o3
o4


problem name = HRC

types = [Cube, Location, RobotCube - Cube, HumanCube - Cube]

environment fluents = [
  bool object_at[obj=Cube, l=Location]
]

agents = [
  Agent name = robot

private fluents = [
 bool holding[obj=Cube]
 bool free
]

public fluents = [
]

actions = [
 durative action pick_robot(RobotCube - Cube pick_obj) {
    duration = [get_place_duration(place_obj), get_place_duration(place_obj)]
    conditions = [
      [start]:
        object_at(pick_obj, pick_area)
        free
    ]
    effects = [
      end:
        object_at(pick_obj, pick_area) := false:
        holding(pick_obj) := true:
        free := false:
    ]
    simulated effects = [
    ]
  }
 durative action place_robot(HumanCube - Cube place_obj) {
    duration = [0, 0]
    conditions = [
      [start]:
        holding(place_obj)
    ]
    effects = [
      end:
        object_at(place_obj, release_area) := true:
        holding(place_obj) := false:
        free := true:
    ]
    simulated effects = [
    ]


In [3]:
w = MAPDDLWriter(problem)
w.write_ma_domain("hrc_domain")
w.write_ma_problem("hrc_problem")

# with OneshotPlanner(name='fmap') as planner:
#     result = planner.solve(problem, None, "1")
#     if result.status == up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING:
#         print("%s Returned Sequential Plans object: %s" % (planner.name, result.plan.all_sequential_plans()))
#         [print(f"{idx} Sequential Plans: {seq_plan}") for idx, seq_plan in enumerate(result.plan.all_sequential_plans())]
#         print("Adjacency list:", result.plan.get_adjacency_list)
#         print("result:", result)
#     else:
#         print("Log Error:", result)
with OneshotPlanner(problem_kind=problem.kind) as planner:
    result = planner.solve(problem)
    plan = result.plan
    if plan is not None:
        print("%s returned:" % planner.name)
        for start, action, duration in plan.timed_actions:
            print("%s: %s [%s]" % (float(start), action, float(duration)))
    else:
        print("No plan found.")

UPNoSuitableEngineAvailableException: No available engine supports all the problem features:
--------------------------------------------------------------------------------------
| Engine                  | CONTINUOUS_TIME | ACTION_BASED_MULTI_AGENT | FLAT_TYPING |
======================================================================================
| tamer                   | True            | False                    | True        |
--------------------------------------------------------------------------------------
| oversubscription[tamer] | True            | False                    | True        |
--------------------------------------------------------------------------------------

In [None]:
from unified_planning.environment import get_environment

print(str(get_environment().factory))

<unified_planning.engines.factory.Factory object at 0x7c6fcff01870>


ProblemKind(['CONTINUOUS_TIME', 'ACTION_BASED_MULTI_AGENT', 'FLAT_TYPING'])

In [None]:
from unified_planning.model.scheduling import SchedulingProblem
from unified_planning.io.pddl_writer import PDDLWriter

problem = SchedulingProblem("factory")
workers = problem.add_resource("workers", capacity=4)
machine1 = problem.add_resource("machine1", capacity=1)
machine2 = problem.add_resource("machine2", capacity=1)

a1 = problem.add_activity("a1", duration=3)
a1.uses(machine1)
a1.uses(workers, amount=2)
a1.add_deadline(14)

a2 = problem.add_activity("a2", duration=6)
a2.uses(workers)
a2.uses(machine2)

problem.add_constraint(LT(a2.end, a1.start))

# One worker is unavailable over [16, 25)
problem.add_decrease_effect(16, workers, 1)
problem.add_increase_effect(25, workers, 1)
w = PDDLWriter(problem)
print(w.write_domain("test"))

UPProblemDefinitionError: ignored

In [None]:
# from unified_planning.plot import (
#     plot_plan,  # plot_plan plots all the types of plans, but is not customizable, while specific methods
#     plot_sequential_plan,
#     plot_time_triggered_plan,
#     plot_partial_order_plan,
#     plot_contingent_plan,
#     plot_stn_plan,
#     plot_causal_graph,
# )
# plot_plan(result)


NotImplementedError: ignored

In [None]:
from unified_planning.shortcuts import *
from unified_planning.model.multi_agent import *
from collections import namedtuple
from unified_planning.io.ma_pddl_writer import MAPDDLWriter

# Parameters
locations_name = ["pick_area", "release_area"]

obj_types = ["w","b","o"]
n_4_type = 4

objects_name = [f"{obj_type}{k}" for obj_type in obj_types for k in range(1,n_4_type + 1)]

obj_goals = ["b1","b2","o1","o2","o3","o4"]

# Problem representation
problem = MultiAgentProblem("HRC")

# Agents definition
robot = Agent("robot", problem)
human = Agent("human", problem)
problem.add_agent(robot)
problem.add_agent(human)

# User Types
Location = UserType('Location')
Cube = UserType('Cube')
RobotCube = UserType('RobotCube',Cube)
HumanCube = UserType('RobotCube',Cube)

# Agent_type = UserType('Agent')

# Fluents
object_at = unified_planning.model.Fluent('object_at', BoolType(), obj=Cube, l=Location)
holding = unified_planning.model.Fluent('holding', BoolType(), obj=Cube)
free = unified_planning.model.Fluent('free', BoolType())    # Non prende un Cube come param in input
# robot_is_able = unified_planning.model.Fluent('robot_is_able', BoolType(), obj=Cube)    # Non prende un Cube come param in input
get_duration = unified_planning.model.Fluent('get_duration', IntType(), obj=Cube)    # Non prende un Cube come param in input

problem.ma_environment.add_fluent(object_at, default_initial_value=False)
robot.add_private_fluent(holding, default_initial_value=False)                           # default initial value None???, dovrebbe essere false per tutti
robot.add_private_fluent(free, default_initial_value=True)
# robot.add_private_fluent(robot_is_able)

# j_obj = Variable("j_var", Cube)
# problem.set_initial_value(Forall(holding(j_obj), j_obj), False)

print(type(robot))
# Objects
locations = [Object(f"{location}", Location) for location in locations_name]
cubes = [Object(f"{object}", Cube) for object in objects_name]

problem.add_objects(locations)
problem.add_objects(cubes)

# Usefull specific locations
pick_area = next(filter(lambda location: "pick" in location.name, locations),None)
place_area = next(filter(lambda location: "release" in location.name, locations),None)

if not pick_area:
  print("No picking area exist, unable to go on")
  raise Exception("No picking area exist")
if not place_area:
  print("No place area exist, unable to go on")
  raise Exception("No place area exist")

# Robot actions

# Durative pick action

pick_durative = DurativeAction('pick_durative', pick_obj=Cube)
pick_obj = pick_durative.parameter('pick_obj')

pick_durative.add_condition(StartTiming(),object_at(pick_obj, pick_area))
pick_durative.add_condition(StartTiming(),free)                             #come si fa a dire all(holding, obj) == False
# pick_durative.add_condition(StartTiming(),robot_is_able(pick_obj))                             #come si fa a dire all(holding, obj) == False
# pick_durative.set_fixed_duration(1)
pick_durative.set_fixed_duration(get_duration(pick_obj))

pick_durative.add_effect(EndTiming(), object_at(pick_obj,pick_area), False)
pick_durative.add_effect(EndTiming(), holding(pick_obj), True)
pick_durative.add_effect(EndTiming(), free, False)
robot.add_action(pick_durative)

# pick_durative_h = DurativeAction('pick_durative', pick_obj=Cube1)
# pick_obj = pick_durative.parameter('pick_obj')

# pick_durative_h.add_condition(StartTiming(),object_at(pick_obj, pick_area))
# pick_durative_h.add_condition(StartTiming(),free)                             #come si fa a dire all(holding, obj) == False
# pick_durative_h.add_condition(StartTiming(),robot_is_able(pick_obj))                             #come si fa a dire all(holding, obj) == False

# pick_durative_h.set_fixed_duration(10)

# pick_durative_h.add_effect(EndTiming(), object_at(pick_obj,pick_area), False)
# pick_durative_h.add_effect(EndTiming(), holding(pick_obj), True)
# pick_durative_h.add_effect(EndTiming(), free, False)
# human.add_action(pick_durative_h)


# # Pick action
# pick_robot = unified_planning.model.InstantaneousAction('pick', pick_obj=Cube) #, l_to=Location)
# pick_obj = pick_robot.parameter('pick_obj')
# # l_to = pick_robot.parameter('l_to')
# # Precondition
# pick_robot.add_precondition(object_at(pick_obj, pick_area))
# pick_robot.add_precondition(free)                             #come si fa a dire all(holding, obj) == False
# # Effects
# pick_robot.add_effect(object_at(pick_obj,pick_area), False)
# # pick_robot.add_effect(object_at(pick_obj,place_area), True)
# pick_robot.add_effect(holding(pick_obj), True)
# pick_robot.add_effect(free, False)

# robot.add_action(pick_robot)

# Place Robot
place_robot = unified_planning.model.InstantaneousAction('place', place_obj=Cube) #, l_to=Location)
place_obj = place_robot.parameter('place_obj')
# l_to = pick_robot.parameter('l_to')

pick_area = next(filter(lambda location: "pick" in location.name, locations),None)
place_area = next(filter(lambda location: "release" in location.name, locations),None)

place_robot.add_precondition(holding(place_obj))

# pick_robot.add_precondition(robot_at(l_from))
# pick_robot.add_effect(robot_at(l_from), False)
place_robot.add_effect(object_at(place_obj,place_area), True)
place_robot.add_effect(holding(place_obj), False)
place_robot.add_effect(free, True)

robot.add_action(place_robot)

# # # Human actions
# # human.add_action(pick_human)
# # human.add_action(place_human)

# print(f"Agents: {problem.agents}")

# Initial conditions
for obj in cubes:
  problem.set_initial_value(object_at(obj, pick_area), True)
  problem.set_initial_value(get_duration(obj), 5)

  # problem.set_initial_value(holding(obj), False)

# Task-Goal
for obj_goal in obj_goals:
  print(obj_goal)
  obj_goal_as_Obj = next(filter(lambda cube: obj_goal in cube.name, cubes),None)
  if obj_goal_as_Obj:
    problem.add_goal(object_at(obj_goal_as_Obj, place_area))


problem