diff --git a/unified_planning/io/ma_pddl_writer.py b/unified_planning/io/ma_pddl_writer.py index db7115649..818d6de8e 100644 --- a/unified_planning/io/ma_pddl_writer.py +++ b/unified_planning/io/ma_pddl_writer.py @@ -69,21 +69,39 @@ def __init__( str, ], agent: Optional["up.model.multi_agent.Agent"], + unfactored: Optional[bool] = False, ): ConverterToPDDLString.__init__(self, problem.environment, get_mangled_name) self._problem = problem self._agent = agent + self._unfactored = unfactored def walk_dot(self, expression, args): agent = self._problem.agent(expression.agent()) fluent = expression.args[0].fluent() objects = expression.args[0].args - return f'(a_{self.get_mangled_name(fluent)} {self.get_mangled_name(agent)} {" ".join([self.convert(obj) for obj in objects])})' + agent_name = "" + if ( + self._unfactored + and self._agent is not None + and fluent not in self._agent.public_fluents + ): + agent_name = f"_{self.get_mangled_name(agent)}" + + return f'(a_{self.get_mangled_name(fluent)}{agent_name} {self.get_mangled_name(agent)} {" ".join([self.convert(obj) for obj in objects])})' def walk_fluent_exp(self, expression, args): fluent = expression.fluent() + agent_name = "" + if ( + self._unfactored + and self._agent is not None + and fluent not in self._agent.public_fluents + ): + agent_name = f"_{self._agent.name}" + if self._agent is not None and fluent in self._agent.fluents: - return f'(a_{self.get_mangled_name(fluent)} ?{self._agent.name}{" " if len(args) > 0 else ""}{" ".join(args)})' + return f'(a_{self.get_mangled_name(fluent)}{agent_name} ?{self._agent.name}{" " if len(args) > 0 else ""}{" ".join(args)})' else: return f'({self.get_mangled_name(fluent)}{" " if len(args) > 0 else ""}{" ".join(args)})' @@ -91,16 +109,19 @@ def walk_fluent_exp(self, expression, args): class MAPDDLWriter: """ This class can be used to write a :class:`~unified_planning.model.MultiAgentProblem` in `MA-PDDL`. - The constructor of this class takes the problem to write and 2 flags: - needs_requirements determines if the printed problem must have the :requirements, - rewrite_bool_assignments determines if this writer will write - non constant boolean assignment as conditional effects. + The constructor of this class takes the problem to write and 4 flags: + + * ``explicit_false_initial_states`` determines if this writer includes initially false predicates in the problem ``:init``, + * ``unfactored`` when ``False`` this writer prints a domain and a problem for every agent, if ``True`` generates a single domain and a single problem, + * ``needs_requirements`` determines if the printed problem must have the :requirements, + * ``rewrite_bool_assignments`` determines if this writer will write non constant boolean assignment as conditional effects. """ def __init__( self, problem: "up.model.multi_agent.MultiAgentProblem", explicit_false_initial_states: Optional[bool] = False, + unfactored: Optional[bool] = False, needs_requirements: bool = True, rewrite_bool_assignments: bool = False, ): @@ -108,6 +129,7 @@ def __init__( self.problem = problem self.problem_kind = self.problem.kind self.explicit_false_initial_states = explicit_false_initial_states + self.unfactored = unfactored self.needs_requirements = needs_requirements self.rewrite_bool_assignments = rewrite_bool_assignments # otn represents the old to new renamings @@ -165,8 +187,11 @@ def _write_domain(self): out.write(f"(domain {name}-domain)\n") if self.needs_requirements: - out.write(" (:requirements :factored-privacy") - # out.write(" (:requirements :strips") + out.write(" (:requirements :multi-agent") + if not self.unfactored: + out.write(" :factored-privacy") + else: + out.write(" :unfactored-privacy") if self.problem_kind.has_flat_typing(): out.write(" :typing") if self.problem_kind.has_negative_conditions(): @@ -261,9 +286,23 @@ def _write_domain(self): predicates_environment, functions_environment, ) = self.get_predicates_functions(self.problem.ma_environment) - predicates_agent, functions_agent = self.get_predicates_functions( - ag, is_private=True - ) + + predicates_agents = [] + functions_agent = [] + nl = "\n " + if self.unfactored: + for ag in self.problem.agents: + predicates_agent, functions_agent = self.get_predicates_functions( + ag, is_private=True + ) + if len(predicates_agent) > 0: + predicates_agents.append( + f" (:private agent - {ag.name + '_type'}\n {nl.join(predicates_agent)})\n" + ) + else: + predicates_agents, functions_agent = self.get_predicates_functions( + ag, is_private=True + ) predicates_public_agent = [] functions_public_agent = [] @@ -340,7 +379,7 @@ def _write_domain(self): out.write( f" (:predicates\n " if len(predicates_environment) > 0 - or len(predicates_agent) > 0 + or len(predicates_agents) > 0 or len(predicates_agent_goal) > 0 or len(predicates_public_agent) > 0 else "" @@ -362,15 +401,21 @@ def _write_domain(self): ) nl = "\n " - out.write( - f" (:private\n {nl.join(predicates_agent)})" - if len(predicates_agent) > 0 - else "" - ) + + if self.unfactored: + out.write( + "".join(predicates_agents) if len(predicates_agents) > 0 else "" + ) + else: + out.write( + f" (:private\n {nl.join(predicates_agents)})" + if len(predicates_agents) > 0 + else "" + ) out.write( f")\n" if len(predicates_environment) > 0 - or len(predicates_agent) > 0 + or len(predicates_agents) > 0 or len(predicates_agent_goal) > 0 or len(predicates_public_agent) > 0 else "" @@ -413,114 +458,143 @@ def _write_domain(self): else "" ) - converter = ConverterToMAPDDLString( - self.problem, self._get_mangled_name, ag - ) costs: dict = {} - for a in ag.actions: - if isinstance(a, up.model.InstantaneousAction): - out.write(f" (:action {self._get_mangled_name(a)}") - out.write(f"\n :parameters (") - out.write( - f' ?{self._get_mangled_name(ag)} - {self._get_mangled_name(ag) +"_type"}' - ) - for ap in a.parameters: - if ap.type.is_user_type(): + + def write_action(ag): + converter = ConverterToMAPDDLString( + self.problem, self._get_mangled_name, ag, unfactored=self.unfactored + ) + for a in ag.actions: + if isinstance(a, up.model.InstantaneousAction): + if self.unfactored: out.write( - f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" + f" (:action {self._get_mangled_name(a)}_{ag.name}" ) else: - raise UPTypeError( - "MA-PDDL supports only user type parameters" - ) - out.write(")") - if len(a.preconditions) > 0: - out.write(f"\n :precondition (and \n") - for p in a.preconditions: - out.write(f" {converter.convert(p)}\n") - out.write(f" )") - - if len(a.effects) > 0: - out.write("\n :effect (and\n") - for e in a.effects: - _write_effect( - e, - None, - out, - converter, - self.rewrite_bool_assignments, - self._get_mangled_name, - ) + out.write(f" (:action {self._get_mangled_name(a)}") + if self.unfactored: + out.write(f'\n :agent ?{ag.name} - {ag.name + "_type"}') - if a in costs: - out.write( - f" (increase (total-cost) {converter.convert(costs[a])})" - ) - out.write(")") - out.write(")\n") - elif isinstance(a, DurativeAction): - out.write(f" (:durative-action {self._get_mangled_name(a)}") - out.write(f"\n :parameters (") - for ap in a.parameters: - if ap.type.is_user_type(): + out.write(f"\n :parameters (") + + if not self.unfactored: out.write( - f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" + f' ?{self._get_mangled_name(ag)} - {self._get_mangled_name(ag) +"_type"}' ) - else: - raise UPTypeError( - "MA-PDDL supports only user type parameters" - ) - out.write(")") - l, r = a.duration.lower, a.duration.upper - if l == r: - out.write(f"\n :duration (= ?duration {converter.convert(l)})") - else: - out.write(f"\n :duration (and ") - if a.duration.is_left_open(): - out.write(f"(> ?duration {converter.convert(l)})") - else: - out.write(f"(>= ?duration {converter.convert(l)})") - if a.duration.is_right_open(): - out.write(f"(< ?duration {converter.convert(r)})") - else: - out.write(f"(<= ?duration {converter.convert(r)})") - out.write(")") - if len(a.conditions) > 0: - out.write(f"\n :condition (and ") - for interval, cl in a.conditions.items(): - for c in cl: - if interval.lower == interval.upper: - if interval.lower.is_from_start(): - out.write(f"(at start {converter.convert(c)})") - else: - out.write(f"(at end {converter.convert(c)})") - else: - if not interval.is_left_open(): - out.write(f"(at start {converter.convert(c)})") - out.write(f"(over all {converter.convert(c)})") - if not interval.is_right_open(): - out.write(f"(at end {converter.convert(c)})") + for ap in a.parameters: + if ap.type.is_user_type(): + out.write( + f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" + ) + else: + raise UPTypeError( + "MA-PDDL supports only user type parameters" + ) out.write(")") - if len(a.effects) > 0: - out.write("\n :effect (and") - for t, el in a.effects.items(): - for e in el: + if len(a.preconditions) > 0: + out.write(f"\n :precondition (and \n") + for p in a.preconditions: + out.write(f" {converter.convert(p)}\n") + out.write(f" )") + + if len(a.effects) > 0: + out.write("\n :effect (and\n") + for e in a.effects: _write_effect( e, - t, + None, out, converter, self.rewrite_bool_assignments, self._get_mangled_name, ) - if a in costs: + + if a in costs: + out.write( + f" (increase (total-cost) {converter.convert(costs[a])})" + ) + out.write(")") + out.write(")\n") + elif isinstance(a, DurativeAction): + out.write(f" (:durative-action {self._get_mangled_name(a)}") + out.write(f"\n :parameters (") + for ap in a.parameters: + if ap.type.is_user_type(): + out.write( + f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" + ) + else: + raise UPTypeError( + "MA-PDDL supports only user type parameters" + ) + out.write(")") + l, r = a.duration.lower, a.duration.upper + if l == r: out.write( - f" (at end (increase (total-cost) {converter.convert(costs[a])}))" + f"\n :duration (= ?duration {converter.convert(l)})" ) - out.write(")") - out.write(")\n") - else: - raise NotImplementedError + else: + out.write(f"\n :duration (and ") + if a.duration.is_left_open(): + out.write(f"(> ?duration {converter.convert(l)})") + else: + out.write(f"(>= ?duration {converter.convert(l)})") + if a.duration.is_right_open(): + out.write(f"(< ?duration {converter.convert(r)})") + else: + out.write(f"(<= ?duration {converter.convert(r)})") + out.write(")") + if len(a.conditions) > 0: + out.write(f"\n :condition (and ") + for interval, cl in a.conditions.items(): + for c in cl: + if interval.lower == interval.upper: + if interval.lower.is_from_start(): + out.write( + f"(at start {converter.convert(c)})" + ) + else: + out.write( + f"(at end {converter.convert(c)})" + ) + else: + if not interval.is_left_open(): + out.write( + f"(at start {converter.convert(c)})" + ) + out.write(f"(over all {converter.convert(c)})") + if not interval.is_right_open(): + out.write( + f"(at end {converter.convert(c)})" + ) + out.write(")") + if len(a.effects) > 0: + out.write("\n :effect (and") + for t, el in a.effects.items(): + for e in el: + _write_effect( + e, + t, + out, + converter, + self.rewrite_bool_assignments, + self._get_mangled_name, + ) + if a in costs: + out.write( + f" (at end (increase (total-cost) {converter.convert(costs[a])}))" + ) + out.write(")") + out.write(")\n") + else: + raise NotImplementedError + + if self.unfactored: + for ag in self.problem.agents: + write_action(ag) + else: + write_action(ag) + out.write(")\n") ag_domains[self._get_mangled_name(ag)] = out.getvalue() @@ -528,6 +602,8 @@ def _write_domain(self): self.domain_objects = None self.domain_objects_agents = {} # self.all_public_fluents = [] + if self.unfactored: + break return ag_domains @@ -576,7 +652,7 @@ def _write_problem(self): out.write("\n )\n") converter = ConverterToMAPDDLString( - self.problem, self._get_mangled_name, ag + self.problem, self._get_mangled_name, ag, unfactored=self.unfactored ) out.write(" (:init") @@ -585,36 +661,45 @@ def _write_problem(self): if f.is_dot(): fluent = f.args[0].fluent() args = f.args - if ( - fluent in self.all_public_fluents - or fluent in ag.fluents - and f.agent() == ag.name - ): - out.write(f"\n {converter.convert(f)}") - elif f.agent() != ag.name and fluent in self.all_public_fluents: - out.write(f"\n {converter.convert(f)}") - else: - out.write(f"") - else: - out.write(f"\n {converter.convert(f)}") - elif v.is_false(): - if self.explicit_false_initial_states: - if f.is_dot(): - fluent = f.args[0].fluent() - args = f.args + if not self.unfactored: if ( fluent in self.all_public_fluents or fluent in ag.fluents and f.agent() == ag.name ): - out.write(f"\n (not {converter.convert(f)})") + out.write(f"\n {converter.convert(f)}") elif ( f.agent() != ag.name and fluent in self.all_public_fluents ): - out.write(f"\n (not {converter.convert(f)})") + out.write(f"\n {converter.convert(f)}") else: out.write(f"") + else: + out.write(f"\n {converter.convert(f)}") + else: + out.write(f"\n {converter.convert(f)}") + elif v.is_false(): + if self.explicit_false_initial_states: + if f.is_dot(): + fluent = f.args[0].fluent() + args = f.args + if not self.unfactored: + if ( + fluent in self.all_public_fluents + or fluent in ag.fluents + and f.agent() == ag.name + ): + out.write(f"\n (not {converter.convert(f)})") + elif ( + f.agent() != ag.name + and fluent in self.all_public_fluents + ): + out.write(f"\n (not {converter.convert(f)})") + else: + out.write(f"") + else: + out.write(f"\n (not {converter.convert(f)})") else: out.write(f"\n (not {converter.convert(f)})") else: @@ -634,6 +719,8 @@ def _write_problem(self): self.domain_objects = None self.domain_objects_agents = {} # self.all_public_fluents = [] + if self.unfactored: + break return ag_problems def print_ma_domain_agent(self, agent_name): @@ -676,7 +763,11 @@ def write_ma_domain(self, directory_name): outdir_ma_pddl = "ma_pddl_" + directory_name osy.makedirs(outdir_ma_pddl, exist_ok=True) for ag, domain in ag_domains.items(): - path_ma_pddl = osy.path.join(outdir_ma_pddl, ag + "_domain.pddl") + if not self.unfactored: + name_domain = ag + else: + name_domain = _get_pddl_name(self.problem) + path_ma_pddl = osy.path.join(outdir_ma_pddl, name_domain + "_domain.pddl") with open(path_ma_pddl, "w") as f: f.write(domain) @@ -686,7 +777,11 @@ def write_ma_problem(self, directory_name): outdir_ma_pddl = "ma_pddl_" + directory_name osy.makedirs(outdir_ma_pddl, exist_ok=True) for ag, problem in ag_problems.items(): - path_ma_pddl = osy.path.join(outdir_ma_pddl, ag + "_problem.pddl") + if not self.unfactored: + name_problem = ag + else: + name_problem = _get_pddl_name(self.problem) + path_ma_pddl = osy.path.join(outdir_ma_pddl, name_problem + "_problem.pddl") with open(path_ma_pddl, "w") as f: f.write(problem) @@ -718,7 +813,10 @@ def get_predicates_functions( else: raise UPTypeError("MA-PDDL supports only user type parameters") if isinstance(obj, up.model.multi_agent.Agent): - expression = f'({prefix}{self._get_mangled_name(f)} ?agent - {"ag"}{"".join(params)})' + if self.unfactored: + expression = f'({prefix}{self._get_mangled_name(f)}_{obj.name} ?agent - {obj.name + "_type"}{"".join(params)})' + else: + expression = f'({prefix}{self._get_mangled_name(f)} ?agent - {"ag"}{"".join(params)})' else: expression = f'({prefix}{self._get_mangled_name(f)}{"".join(params)})' if f.type.is_bool_type():