In [3]:
import itertools

class Action:

    def __init__(self, name, parameters, positive_preconditions, negative_preconditions, add_effects, del_effects):
        self.name = name
        self.parameters = parameters
        self.positive_preconditions = positive_preconditions
        self.negative_preconditions = negative_preconditions
        self.add_effects = add_effects
        self.del_effects = del_effects

    def __str__(self):
        return 'action: ' + self.name + \
        '\n  parameters: ' + str(self.parameters) + \
        '\n  positive_preconditions: ' + str(self.positive_preconditions) + \
        '\n  negative_preconditions: ' + str(self.negative_preconditions) + \
        '\n  add_effects: ' + str(self.add_effects) + \
        '\n  del_effects: ' + str(self.del_effects) + '\n'

    def __eq__(self, other): 
        return self.__dict__ == other.__dict__

    def groundify(self, objects):
        if not self.parameters:
            yield self
            return
        type_map = []
        variables = []
        for var, type in self.parameters:
            type_map.append(objects[type])
            variables.append(var)
        for assignment in itertools.product(*type_map):
            positive_preconditions = self.replace(self.positive_preconditions, variables, assignment)
            negative_preconditions = self.replace(self.negative_preconditions, variables, assignment)
            add_effects = self.replace(self.add_effects, variables, assignment)
            del_effects = self.replace(self.del_effects, variables, assignment)
            yield Action(self.name, assignment, positive_preconditions, negative_preconditions, add_effects, del_effects)

    def replace(self, group, variables, assignment):
        g = []
        for pred in group:
            pred = list(pred)
            iv = 0
            for v in variables:
                while v in pred:
                    pred[pred.index(v)] = assignment[iv]
                iv += 1
            g.append(pred)
        return g

In [4]:
import re
class PDDL_Parser:

    def scan_tokens(self, filename):
        with open(filename,'r') as f:
            str = f.read().lower()
            #print(str)
        # Tokenize
        #print(str)
        stack = []
        list = []
        for t in re.findall(r'[()]|[^\s()]+', str):
            #print(t)
            if t == '(':
                stack.append(list)
                list = []
            elif t == ')':
                if stack:
                    l = list
                    list = stack.pop()
                    list.append(l)
            else:
                list.append(t)
        #print(list)
        return list[0]

    #-----------------------------------------------
    # Parse domain
    #-----------------------------------------------

    def parse_domain(self, domain_filename):
        tokens = self.scan_tokens(domain_filename)
        #print(tokens)
        if type(tokens) is list and tokens.pop(0) == 'define':
            self.domain_name = 'unknown'
            self.requirements = []
            self.types = []
            self.actions = []
            self.predicates = {}
            while tokens:
                group = tokens.pop(0)
                t = group.pop(0)
                if   t == 'domain':
                    self.domain_name = group[0]
                elif t == ':requirements':
                    #for req in group:
                    #    if not req in self.SUPPORTED_REQUIREMENTS:
                    #        raise Exception('Requirement ' + req + ' not supported')
                    self.requirements = group
                elif t == ':predicates':
                    self.parse_predicates(group)
                elif t == ':types':
                    self.types = group
                elif t == ':action':
                    self.parse_action(group)
                else: print(str(t) + ' is not recognized in domain')
        else:
            raise Exception('File ' + domain_filename + ' does not match domain pattern')

    #-----------------------------------------------
    # Parse predicates
    #-----------------------------------------------

    def parse_predicates(self, group):
        for pred in group:
            predicate_name = pred.pop(0)
            if predicate_name in self.predicates:
                raise Exception('Predicate ' + predicate_name + ' redefined')
            arguments = {}
            untyped_variables = []
            while pred:
                t = pred.pop(0)
                if t == '-':
                    if not untyped_variables:
                        raise Exception('Unexpected hyphen in predicates')
                    type = pred.pop(0)
                    while untyped_variables:
                        arguments[untyped_variables.pop(0)] = type
                else:
                    untyped_variables.append(t)
            while untyped_variables:
                arguments[untyped_variables.pop(0)] = 'object'
            self.predicates[predicate_name] = arguments

    #-----------------------------------------------
    # Parse action
    #-----------------------------------------------

    def parse_action(self, group):
        name = group.pop(0)
        if not type(name) is str:
            raise Exception('Action without name definition')
        for act in self.actions:
            if act.name == name:
                raise Exception('Action ' + name + ' redefined')
        parameters = []
        positive_preconditions = []
        negative_preconditions = []
        add_effects = []
        del_effects = []
        while group:
            t = group.pop(0)
            if t == ':parameters':
                if not type(group) is list:
                    raise Exception('Error with ' + name + ' parameters')
                parameters = []
                untyped_parameters = []
                p = group.pop(0)
                while p:
                    t = p.pop(0)
                    if t == '-':
                        if not untyped_parameters:
                            raise Exception('Unexpected hyphen in ' + name + ' parameters')
                        ptype = p.pop(0)
                        while untyped_parameters:
                            parameters.append([untyped_parameters.pop(0), ptype])
                    else:
                        untyped_parameters.append(t)
                while untyped_parameters:
                    parameters.append([untyped_parameters.pop(0), 'object'])
            elif t == ':precondition':
                self.split_predicates(group.pop(0), positive_preconditions, negative_preconditions, name, ' preconditions')
            elif t == ':effect':
                self.split_predicates(group.pop(0), add_effects, del_effects, name, ' effects')
            else: print(str(t) + ' is not recognized in action')
        self.actions.append(Action(name, parameters, positive_preconditions, negative_preconditions, add_effects, del_effects))

    #-----------------------------------------------
    # Parse problem
    #-----------------------------------------------

    def parse_problem(self, problem_filename):
        tokens = self.scan_tokens(problem_filename)
        if type(tokens) is list and tokens.pop(0) == 'define':
            self.problem_name = 'unknown'
            self.objects = dict()
            self.state = []
            self.positive_goals = []
            self.negative_goals = []
            while tokens:
                group = tokens.pop(0)
                t = group[0]
                if   t == 'problem':
                    self.problem_name = group[-1]
                elif t == ':domain':
                    if self.domain_name != group[-1]:
                        raise Exception('Different domain specified in problem file')
                elif t == ':requirements':
                    pass # Ignore requirements in problem, parse them in the domain
                elif t == ':objects':
                    group.pop(0)
                    object_list = []
                    while group:
                        if group[0] == '-':
                            group.pop(0)
                            self.objects[group.pop(0)] = object_list
                            object_list = []
                        else:
                            object_list.append(group.pop(0))
                    if object_list:
                        if not 'object' in self.objects:
                            self.objects['object'] = []
                        self.objects['object'] += object_list
                elif t == ':init':
                    group.pop(0)
                    self.state = group
                elif t == ':goal':
                    self.split_predicates(group[1], self.positive_goals, self.negative_goals, '', 'goals')
                else: print(str(t) + ' is not recognized in problem')
        else:
            raise Exception('File ' + problem_filename + ' does not match problem pattern')

    #-----------------------------------------------
    # Split predicates
    #-----------------------------------------------

    def split_predicates(self, group, pos, neg, name, part):
        #print(group,pos,neg,name,part)
        if not type(group) is list:
            raise Exception('Error with ' + name + part)
        if group[0] == 'and':
            group.pop(0)
        else:
            group = [group]
        for predicate in group:
            if predicate[0] == 'not':
                if len(predicate) != 2:
                    raise Exception('Unexpected not in ' + name + part)
                neg.append(predicate[-1])
            else:
                pos.append(predicate)

# ==========================================
# Main
# ==========================================
# if __name__ == '__main__':
#     import pprint
#     domain = './test4/test4_domain.txt'
#     problem = './test4/test4_problem.txt'
#     parser = PDDL_Parser()
#     print('----------------------------')
#     #pprint.pprint(parser.scan_tokens(domain))
#     print('----------------------------')
#     #pprint.pprint(parser.scan_tokens(problem))
#     print('----------------------------')
#     parser.parse_domain(domain)
#     parser.parse_problem(problem)
#     print('Domain name: ' + parser.domain_name)
#     for act in parser.actions:
#         print(act)
#     print('----------------------------')
#     print('Problem name: ' + parser.problem_name)
#     print('Objects: ' + str(parser.objects))
#     print('State: ' + str(parser.state))
#     print('Positive goals: ' + str(parser.positive_goals))
#     print('Negative goals: ' + str(parser.negative_goals))

In [5]:
# parser = PDDL_Parser()
# parser.parse_domain(domain)
# parser.parse_problem(problem)
# state = parser.state
# goal_pos = parser.positive_goals
# goal_not = parser.negative_goals
# print(parser.state[0])
# print(goal_pos)
# print(goal_not)

In [22]:
import copy
class Planner:
    
    def solve(self, domain, problem):
        parser = PDDL_Parser()
        parser.parse_domain(domain)
        parser.parse_problem(problem)
        state = parser.state
        goal_pos = parser.positive_goals
        goal_not = parser.negative_goals
        if self.applicable(state, goal_pos, goal_not):
            return []
        ground_actions = []
        for action in parser.actions:
            for act in action.groundify(parser.objects):
                ground_actions.append(act)
        #print(len(ground_actions))
        # Search
        visited = [state]
        fringe = [state, None]
        #print(visited)
        #print(state)
        while fringe:
            minn = 10
            idx = -1
            for i in range(0,len(fringe),2):
                state = fringe[i]
                plan = fringe[i+1]
                value = self.heuristic(state,ground_actions,goal_pos) + (len(plan) if(type(plan) == 'list') else 0)
                #print(state,value)
                if(value < minn):
                    minn = value
                    idx = i
            state = fringe.pop(i)
            plan = fringe.pop(i)
#             state = fringe.pop(0)
#             plan = fringe.pop(0)
            for act in ground_actions:
                if self.applicable(state, act.positive_preconditions, act.negative_preconditions):
                    new_state = self.apply(state, act.add_effects, act.del_effects)
                    if new_state not in visited:
                        if self.applicable(new_state, goal_pos, goal_not):
                            #print(goal_pos,goal_not)
                            full_plan = [act]
                            while plan:
                                act, plan = plan
                                full_plan.insert(0, act)
                            return full_plan
                        visited.append(new_state)
                        fringe.append(new_state)
                        fringe.append((act, plan))
        return None
    
    def applicable(self, state, positive, negative):
        for i in positive:
            if i not in state:
                return False
        for i in negative:
            if i in state:
                return False
        #print(positive,negative,state)
        return True

    def apply(self, state, positive, negative):
        #print(positive,negative,state)
        new_state = []
        for i in state:
            if i not in negative:
                new_state.append(i)
        for i in positive:
            if i not in new_state:
                new_state.append(i)
        return new_state
    
    def heuristic(self, state,ground_actions,goal_state):
        cnt = 0
        visited = [state]
        fringe = [state,0]
        while fringe:
            #print(fringe)
            state = fringe.pop(0)
            step = fringe.pop(0)+1
            #print(state,step)
            for act in ground_actions:
                #print(state)
                if self.applicable_pos(state, act.positive_preconditions):
                    new_state = self.apply_pos(state, act.add_effects)
                    #print(new_state)
                    if new_state not in visited:
                        #print(new_state,goal_state)
                        if self.applicable_pos(new_state,goal_state):
                            #print(step)
                            return step
                        visited.append(new_state)
                        fringe.append(new_state)
                        fringe.append(step)
        return 1000
        
        
    
    def applicable_pos(self, state, positive):
        for i in positive:
            if i not in state:
                return False
        return True

    def apply_pos(self, state, positive):
        new_state = copy.deepcopy(state)
        for i in positive:
            if i not in new_state:
                new_state.append(i)
        return new_state

In [23]:
import sys, time
start_time = time.time()
for n in range(5):
    n = 0
    domain = './test{}/test{}_domain.txt'.format(n,n)
    problem = './test{}/test{}_problem.txt'.format(n,n)
    f=open('./test{}/test{}_plan.txt'.format(n,n),"w")
    planner = Planner()
    plan = planner.solve(domain, problem)
    print('Time: ' + str(time.time() - start_time) + 's')
    if plan:
        print('plan:',file=f)
        for act in plan:
            print(act,file=f)
    else:
        print('No plan was found')
    f.close()

Time: 0.0013501644134521484s
Time: 0.0025780200958251953s
Time: 0.0034589767456054688s
Time: 0.004341840744018555s
Time: 0.005115032196044922s
