In [1]:
from pddl.logic import Predicate, constants, variables
from pddl.core import Domain, Problem, Action, Requirements
from pddl.formatter import domain_to_string, problem_to_string
from pddl import parse_domain, parse_problem

from pddl.logic.effects import When, AndEffect
from pddl.logic.base import Not, And

import copy


# Merge Domain

## Merge Predicates

In [2]:
#create a predicate to capture robot failures
robot_failed = Predicate("robot_failed")

In [3]:
human_domain = parse_domain('human_domain.pddl')
human_problem = parse_problem('human_problem.pddl')

robot_domain = parse_domain('robot_domain.pddl')
robot_problem = parse_problem('robot_problem.pddl')

##this is from manual merge
# m_domain = parse_domain('merged_domain.pddl')
# m_problem = parse_problem('merged_problem.pddl')

In [4]:
list_h_preds = list(human_domain.predicates)
list_r_preds = list(robot_domain.predicates)
list_h_preds, list_r_preds

([Predicate(available_raw, ?r),
  Predicate(processed_C, ?r),
  Predicate(processed_B, ?r),
  Predicate(processed_A, ?r)],
 [Predicate(processed_C, ?r),
  Predicate(no_bottleneck, ),
  Predicate(processed_B, ?r),
  Predicate(no_need_to_calibrate, ),
  Predicate(available_raw, ?r),
  Predicate(processed_A, ?r)])

In [5]:
#change robot predicates' names: add "r_"
for pred in list_r_preds:
    pred._name = "r_" + str(pred.name)

In [6]:
#merged domain's predicates
merged_preds = list_h_preds + list_r_preds
merged_preds.append(robot_failed)
merged_preds

[Predicate(available_raw, ?r),
 Predicate(processed_C, ?r),
 Predicate(processed_B, ?r),
 Predicate(processed_A, ?r),
 Predicate(r_processed_C, ?r),
 Predicate(r_no_bottleneck, ),
 Predicate(r_processed_B, ?r),
 Predicate(r_no_need_to_calibrate, ),
 Predicate(r_available_raw, ?r),
 Predicate(r_processed_A, ?r),
 Predicate(robot_failed, )]

## Merge Actions

In [7]:
list_h_actions = list(human_domain.actions)
list_r_actions = list(robot_domain.actions)
list_h_actions, list_r_actions

([<pddl.core.Action at 0x105931880>,
  <pddl.core.Action at 0x105931ac0>,
  <pddl.core.Action at 0x105956070>],
 [<pddl.core.Action at 0x1058b47f0>,
  <pddl.core.Action at 0x105924cd0>,
  <pddl.core.Action at 0x1058b4820>,
  <pddl.core.Action at 0x1058b4370>,
  <pddl.core.Action at 0x105924310>])

In [8]:
merged_action_list =[]

for h_action in list_h_actions:
    for r_action in list_r_actions:
        if str(h_action.name)==str(r_action.name):
            
            merged_effect_list = []
            
            #add r_ to robot's preconditions
            for precs in r_action.precondition.operands:
                precs._name = "r_" + precs._name
            
            #get human effects
            for h_eff_operand in h_action.effect.operands:
                merged_effect_list.append(h_eff_operand)
            
            #get robot effects with "r_" added to the predicates
            for r_eff_operand in r_action.effect.operands:
                if type(r_eff_operand)== Not:
                    r_eff_operand.argument._name = "r_"+ r_eff_operand.argument._name
                else:
                    r_eff_operand._name = "r_"+ r_eff_operand._name
            
            
            #use robot effects in conditional effects
            ce_1 = When(~(r_action.precondition), (robot_failed))
            merged_effect_list.append(ce_1)
            ce_2 = When((r_action.precondition), (r_action.effect))
            merged_effect_list.append(ce_2)
            merged_effect_list = tuple(merged_effect_list)
            
            #Careful! AndEffect takes the elements not the list
            merged_effect_list = AndEffect(*merged_effect_list)
            print(merged_effect_list)
                         
            
            act = Action(
                    str(h_action.name),
                    parameters = h_action.parameters,
                    precondition = h_action.precondition,
                    effect = merged_effect_list
            )
            
            merged_action_list.append(act)
            


(and (not (processed_B ?r)) (processed_C ?r) (when (not (and (r_processed_B ?r) (r_no_bottleneck) (r_no_need_to_calibrate))) (robot_failed)) (when (and (r_processed_B ?r) (r_no_bottleneck) (r_no_need_to_calibrate)) (and (not (r_processed_B ?r)) (r_processed_C ?r))))
(and (not (processed_A ?r)) (processed_B ?r) (when (not (and (r_processed_A ?r) (r_no_bottleneck) (r_no_need_to_calibrate))) (robot_failed)) (when (and (r_processed_A ?r) (r_no_bottleneck) (r_no_need_to_calibrate)) (and (not (r_processed_A ?r)) (r_processed_B ?r))))
(and (processed_A ?r) (not (available_raw ?r)) (when (not (and (r_available_raw ?r) (r_no_bottleneck) (r_no_need_to_calibrate))) (robot_failed)) (when (and (r_available_raw ?r) (r_no_bottleneck) (r_no_need_to_calibrate)) (and (not (r_available_raw ?r)) (r_processed_A ?r))))


## Merge Types

In [9]:
merged_domain_types = copy.deepcopy(human_domain.types)
merged_domain_types.add('othergoals')
merged_domain_types

{'othergoals', 'raw'}

## Create Merged Domain

In [10]:
merged_domain = Domain("merged_domain",
       requirements=human_domain.requirements,
       types=merged_domain_types,
       constants=human_domain.constants,
       predicates=merged_preds,
       actions=merged_action_list)

In [11]:
print(domain_to_string(merged_domain))

(define (domain merged_domain)
    (:requirements :conditional-effects :typing)
    (:types othergoals raw)
    (:predicates (available_raw ?r - raw)  (processed_A ?r - raw)  (processed_B ?r - raw)  (processed_C ?r - raw)  (r_available_raw ?r - raw)  (r_no_bottleneck) (r_no_need_to_calibrate) (r_processed_A ?r - raw)  (r_processed_B ?r - raw)  (r_processed_C ?r - raw)  (robot_failed))
    (:action load_machine_A
        :parameters (?r - raw )
        :precondition (available_raw ?r)
        :effect (and (processed_A ?r) (not (available_raw ?r)) (when (not (and (r_available_raw ?r) (r_no_bottleneck) (r_no_need_to_calibrate))) (robot_failed)) (when (and (r_available_raw ?r) (r_no_bottleneck) (r_no_need_to_calibrate)) (and (not (r_available_raw ?r)) (r_processed_A ?r))))
    )
     (:action load_machine_B
        :parameters (?r - raw )
        :precondition (processed_A ?r)
        :effect (and (not (processed_A ?r)) (processed_B ?r) (when (not (and (r_processed_A ?r) (r_no_bottleneck) 

In [12]:
# #File writer
# with open("m_1_domain.pddl", "w") as text_file:
#     text_file.write(domain_to_string(merged_domain))

# Merge Problem

## Merge problem objects

In [13]:
#Object for robot failure
rob_failure_object = constants("robot_failed", types=["othergoals"])

#Human problem objects:
human_problem_obj = list(human_problem.objects.copy())

#Merged problem's objects:
merged_objects_for_problem = rob_failure_object + human_problem_obj
merged_objects_for_problem

[Constant(robot_failed), Constant(R1)]

## Merged Problem Inits

In [14]:
#change robot init predicate names: add "r_"
for robot_init in list(robot_problem.init):
    robot_init._name = "r_" + robot_init._name


In [17]:
merged_inits_for_problem = list(human_problem.init) + list(robot_problem.init)
merged_inits_for_problem

[Predicate(available_raw, R1),
 Predicate(r_no_need_to_calibrate, ),
 Predicate(r_available_raw, R1)]

## Merged Problem Goals

In [21]:
merged_goal_list = [human_problem.goal]+ [robot_failed]

#Merge them with AND
merged_goal = And(*merged_goal_list)
print(merged_goal)

(and (processed_C R1) (robot_failed))


## Create Merged Problem

In [22]:
merged_problem = Problem("merged_problem",
        domain=merged_domain,
        requirements=human_domain.requirements,
        objects=merged_objects_for_problem,
        init=merged_inits_for_problem,
        goal=merged_goal
)

In [23]:
print(problem_to_string(merged_problem))

(define (problem merged_problem)
    (:domain merged_domain)
    (:requirements :conditional-effects :typing)
    (:objects R1 - raw robot_failed - othergoals)
    (:init (available_raw R1) (r_available_raw R1) (r_no_need_to_calibrate))
    (:goal (and (processed_C R1) (robot_failed)))
)


In [24]:
# #File writer
# with open("m_1_problem.pddl", "w") as text_file:
#     text_file.write(problem_to_string(merged_problem))