In [1]:
# begin of installation

In [2]:
!pip install pythonnet



In [3]:
# end of installation

### Unified Planning imports ###

In [4]:
from unified_planning.io import PDDLReader
from unified_planning.model import FNode, OperatorKind, Fluent, Effect, SensingAction
from unified_planning.engines import Credits
import unified_planning.engines as engines
import unified_planning.environment as environment
from unified_planning.plans import ActionInstance, ContingentPlan, ContingentPlanNode
from unified_planning.model import ProblemKind, ContingentProblem
from unified_planning.engines.results import PlanGenerationResultStatus, PlanGenerationResult
from unified_planning.engines.mixins.compiler import CompilationKind
from typing import Optional

### pythonNet and c# imports ###


In [5]:
import clr
clr.AddReference('CPORLib')

from CPORLib.PlanningModel import Domain, Problem, ParametrizedAction, PlanningAction
from CPORLib.LogicalUtilities import Predicate, ParametrizedPredicate, GroundedPredicate, PredicateFormula, CompoundFormula, Formula
from CPORLib.Algorithms import CPORPlanner

### credits

In [6]:
# credits = Credits('CPOR planner',
#                   'BGU',
#                   'Guy Shani',
#                   'https://github.com/???',
#                   'Version 1',
#                   'CPOR planner is a lightweight STRIPS planner written in c#.',
#                   'CPOR planner is a lightweight STRIPS planner written in c#.\nPlease note that CPOR planner deliberately prefers clean code over fast code. It is designed to be used as a teaching or prototyping tool. If you use it for paper experiments, please state clearly that CPOR planner does not offer state-of-the-art performance.'
#                 )

## Engine implementation

In [7]:
class CPORImpl(engines.Engine):
    def __init__(self, bOnline = False, **options):
        self.bOnline = bOnline
        self._skip_checks = False

    @property
    def name(self) -> str:
        return "CPORPlanning"

    @staticmethod
    def supports_compilation(compilation_kind: CompilationKind) -> bool:
        return compilation_kind == CompilationKind.GROUNDING

    @staticmethod
    def supported_kind():
        # Ask what more need to be added
        supported_kind = ProblemKind()
        supported_kind.set_problem_class('CONTINGENT')
        supported_kind.set_problem_class("ACTION_BASED")
        supported_kind.set_typing('FLAT_TYPING')
        supported_kind.set_typing('HIERARCHICAL_TYPING')
        return supported_kind

    @staticmethod
    def supports(problem_kind):
        return problem_kind <= CPORImpl.supported_kind()

    def solve(self, problem: 'up.model.ContingentProblem') -> 'up.engines.results.PlanGenerationResult':

        if not self.supports(problem.kind):
            return PlanGenerationResult(PlanGenerationResultStatus.UNSOLVABLE_PROVEN, None, self.name)

        c_domain = self.__createDomain(problem)
        c_problem = self.__createProblem(problem, c_domain)

        solution = self.__createPlan(c_domain, c_problem)
        actions = self.__createActionTree(solution, problem)
        if solution is None or actions is None:
            return PlanGenerationResult(PlanGenerationResultStatus.UNSOLVABLE_PROVEN, None, self.name)

        return PlanGenerationResult(PlanGenerationResultStatus.SOLVED_SATISFICING, ContingentPlan(actions), self.name)

    def destroy(self):
        pass

    def __createProblem(self, problem, domain):
        p = Problem(problem.name, domain)

        for f, v in problem.initial_values.items():
            if v.is_true():
                gp = self.__CreatePredicate(f, False, None)
                p.AddKnown(gp)

        for c in problem.or_constraints:
            cf = self.__CreateOrFormula(c, [])
            p.AddHidden(cf)

        for c in problem.oneof_constraints:
            cf = self.__CreateOneOfFormula(c, [])
            p.AddHidden(cf)

        goal = CompoundFormula("and")
        for g in problem.goals:
            cp = self.__CreateFormula(g, [])
            goal.AddOperand(cp)
        p.Goal = goal.Simplify()

        return p

    def __createPlan(self, c_domain, c_problem):
        solver = CPORPlanner(c_domain, c_problem)
        c_plan = solver.OfflinePlanning()
        # handle errors in c# code - null?
        solver.WritePlan("plan.dot", c_plan)
        return c_plan

    def __createDomain(self, problem):
        d = Domain(problem.name)
        for t in problem.user_types:
            if t.father is None:
                d.AddType(t.name)
            else:
                d.AddType(t.name, t.father.name)

        for o in problem.all_objects:
            d.AddConstant(o.name, o.type.name)

        for f in problem.fluents:
            pp = self.__CreatePredicate(f, True, [])
            d.AddPredicate(pp)

        for a in problem.actions:
            l = []
            pa = ParametrizedAction(a.name)
            for param in a.parameters:
                l.append(param.name)
                pa.AddParameter(param.name, param.type.name)
            if not a.preconditions is None:
                for pre in a.preconditions:
                    formula = self.__CreateFormula(pre, l)
                    pa.Preconditions = formula
            if not a.effects is None and len(a.effects) > 0:
                cp = CompoundFormula("and")
                for eff in a.effects:
                    pp = self.__CreatePredicate(eff, False, l)
                    cp.SimpleAddOperand(pp)
                pa.Effects = cp
            if type(a) is SensingAction:
                if not a.observed_fluents is None:
                    for o in a.observed_fluents:
                        pf = self.__CreateFormula(o, l)
                        pa.Observe = pf

            d.AddAction(pa)
        return d

    def __CreatePredicate(self, f, bAllParameters, lActionParameters) -> ParametrizedPredicate:
        if type(f) is Fluent:
            if (not bAllParameters) and (lActionParameters is None or len(lActionParameters) == 0):
                pp = GroundedPredicate(f.name)
            else:
                pp = ParametrizedPredicate(f.name)
            for param in f.signature:
                bParam = bAllParameters or (param.name in lActionParameters)
                if bParam:
                    pp.AddParameter(param.name, param.type.name)
                else:
                    pp.AddConstant(param.name, param.type.name)
            return pp
        if type(f) is Effect:
            pp = self.__CreatePredicate(f.fluent, bAllParameters, lActionParameters)
            if str(f.value) == "false":
                pp.Negation = True
            return pp
        if type(f) is FNode:
            if (not bAllParameters) and (lActionParameters is None or len(lActionParameters) == 0):
                pp = GroundedPredicate(f.fluent().name)
            else:
                pp = ParametrizedPredicate(f.fluent().name)
            for arg in f.args:
                if arg.is_parameter_exp():
                    param = arg.parameter()
                    pp.AddParameter(param.name, param.type.name)
                if arg.is_object_exp():
                    obj = arg.object()
                    pp.AddConstant(obj.name, obj.type.name)
            return pp

    def __CreateFormula(self, n: FNode, lActionParameters) -> Formula:
        if n.node_type == OperatorKind.FLUENT_EXP:
            pp = self.__CreatePredicate(n, False, lActionParameters)
            pf = PredicateFormula(pp)
            return pf
        else:
            if n.node_type == OperatorKind.AND:
                cp = CompoundFormula("and")
            elif n.node_type == OperatorKind.OR:
                cp = CompoundFormula("or")
            elif n.node_type == OperatorKind.NOT:
                cp = self.__CreateFormula(n.args[0], lActionParameters)
                cp = cp.Negate()
                return cp
            else:
                cp = CompoundFormula("oneof")

            for nSub in n.args:
                fSub = self.__CreateFormula(nSub, lActionParameters)
                cp.SimpleAddOperand(fSub)
            return cp

    def __CreateOrFormula(self, n, lActionParameters) -> Formula:
        cp = CompoundFormula("or")
        for nSub in n:
            fSub = self.__CreateFormula(nSub, lActionParameters)
            cp.SimpleAddOperand(fSub)
        return cp

    def __CreateOneOfFormula(self, n, lActionParameters) -> Formula:
        cp = CompoundFormula("oneof")
        for nSub in n:
            fSub = self.__CreateFormula(nSub, lActionParameters)
            cp.SimpleAddOperand(fSub)
        return cp

    def __createActionTree(self, solution, problem) -> ContingentPlanNode:
        ai = self.__convert_string_to_action_instance(str(solution.Action), problem)
        if ai:
            root = ContingentPlanNode(ai)
            if solution.SingleChild:
                root.add_singleChild(self.__createActionTree(solution.SingleChild, problem))
            if solution.FalseObservationChild:
                root.add_falseObservationChild(self.__createActionTree(solution.FalseObservationChild, problem))
            if solution.TrueObservationChild:
                root.add_trueObservationChild(self.__createActionTree(solution.TrueObservationChild, problem))
            return root

    def __convert_string_to_action_instance(self, string, problem) -> 'up.plans.InstantaneousAction':
        if string != 'None':
            assert string[0] == "(" and string[-1] == ")"
            list_str = string[1:-1].replace(":","").replace('~',' ').split("\n")
            ac = list_str[0].split(" ")
            action = problem.action(ac[1])
            expr_manager = problem.env.expression_manager
            param = tuple(expr_manager.ObjectExp(problem.object(o_name)) for o_name in ac[2:])
            return ActionInstance(action, param)

Congratulations! You just created an integrated planning engine for the `Oneshot` operation mode! Of course, in a more realistic scenario, one would need to read the `problem` object and call an external engine or provide a more involved algorithm to provide the service, but this is already sufficient for testing out our simple engine.

## Registering the engine

In order to use our `CPORPlanning`, we need to register it among the set of planning engines available for the UP library as follows.

In [8]:
env = environment.get_env()
env.factory.add_engine('CPORPlanning', __name__, 'CPORImpl')

Essentially, we just need to give a custom name (in our case `CPORPlanning`) a module name (in this case, `__name__` as we are in the same file as the Solver class) and finally the class name that we used to define our planning engine.

Done! We are nor ready to test our planning engine!

## Testing the engine

We start by defining a simple problem in the UP syntax (alternatively we can use a parser or any other way to create a problem obejct)

In [9]:
# Creating a PDDL reader
reader = PDDLReader()

# Parsing a PDDL problem from file
problem = reader.parse_problem(
    "../unified_planning/test/pddl/wumpus/domain.pddl",
    "../unified_planning/test/pddl/wumpus/problem.pddl",
)

In [10]:
def print_plan(p_planNode, moves):
    if p_planNode is not None:
        x = p_planNode
        moves.append(x.action_instance)
        if x.singleChild is not None:
            moves.append(print_plan(x.singleChild, moves))
        elif x.trueObservationChild is not None:
            moves.append(print_plan(x.trueObservationChild, moves))
        elif x.falseObservationChild is not None:
            moves.append(print_plan(x.falseObservationChild, moves))
    return moves

In [11]:
with env.factory.Replanner(problem, name='CPORPlanning') as planner:
    result = planner.solve(problem)
    if result.status == PlanGenerationResultStatus.SOLVED_SATISFICING:
        print(f'{planner.name} found a valid plan!')
        print(f'The plan is: {print_plan(result.plan.root_node, [])}')
    else:
        print('No plan found!')

CPORPlanning found a valid plan!
The plan is: [move(p1-1, p1-2), move(p1-2, p1-3), feel-breeze(p1-3), [...], [...]]


Notes:
-Original way that ContingentPlanNode was built -> maybe need to do it that way?
-What is happening when the c# code fails? -> need to fix it -
-Fix the support condition - now it just return UNSOLVABLE_PROVEN - shuold be what the other engine returns.
-Need to add comments and change to meaningful names
-2 other problems