In [102]:
#execution requirements.txt
#! pip install -r requirements.txt

# Librairie pddl

### Librairies

In [103]:
from pddl.parser.domain import DomainParser
from pddl.parser.problem import ProblemParser
import pddl
from itertools import product, permutations
import copy
import heapq

# domain_str = open("data/domain.pddl").read()
# problem_str = open("data/problem.pddl").read() 
domain_str = open("data/taquin_domain.pddl").read()
problem_str = open("data/taquin-size3x3-conf_6.pddl").read()  


domain = DomainParser()(domain_str)
problem = ProblemParser()(problem_str)

init_state = copy.deepcopy(list(problem.init))
goal_state = copy.deepcopy(list(problem.goal.__getstate__()['_operands']))


### Optimisation all actions

In [104]:
# print("Predicates : ", list(domain.predicates))
# print("Predicates 0 : ", list(domain.predicates)[0])
# print("Actions : ", list(domain.actions))
# print("Action 0 Name : ",list((domain.actions))[0].name)
# print("Action 0 Precondition : ",list(list((domain.actions))[0].precondition.operands))
# print("Action 0 Effect : ",list(list((domain.actions))[0].effect.operands))
# print("Action 0 Parameters : ",list(list((domain.actions))[0].parameters))

effects_name=[]  
for action in domain.actions:
    effects = list(action.effect.operands)
    for effect in effects:
        if isinstance(effect, pddl.logic.predicates.Predicate):
            effects_name.append(str(effect.__getstate__()['_name']))
        elif isinstance(effect, pddl.logic.base.Not):
            effects_name.append(effect.__getstate__()['_arg'].__getstate__()['_name'])
effects_name = list(set(effects_name))

print("Effects Actions : ", effects_name)

immuable_predicates = []  
init_state_tuple = tuple(copy.deepcopy(init_state))  
for predicate in init_state_tuple:
    if predicate.__getstate__()['_name'] not in effects_name:
        immuable_predicates.append(predicate)

print("Immuable Predicates : ", immuable_predicates)

def is_possible_actions(dict_action):
    for precondition in dict_action['preconditions']:
        if isinstance(precondition, pddl.logic.predicates.Predicate):
            name=precondition.__getstate__()['_name']
        elif isinstance(precondition, pddl.logic.base.Not):
            name=precondition.__getstate__()['_arg'].__getstate__()['_name']

        if name not in effects_name: #si le nom de la precondition n'est pas dans les effects des actions, donc il ne sera jamais modifié
            if precondition not in init_state: #si la precondition n'est pas dans l'état initial, donc l'action n'est pas possible
                return False
    return True
        

Effects Actions :  ['empty', 'on']
Immuable Predicates :  [Predicate(touch, cell_7, cell_8), Predicate(touch, cell_8, cell_5), Predicate(touch, cell_4, cell_1), Predicate(touch, cell_4, cell_7), Predicate(touch, cell_1, cell_0), Predicate(touch, cell_8, cell_7), Predicate(touch, cell_5, cell_4), Predicate(touch, cell_4, cell_3), Predicate(touch, cell_6, cell_7), Predicate(touch, cell_5, cell_8), Predicate(touch, cell_2, cell_5), Predicate(touch, cell_6, cell_3), Predicate(touch, cell_3, cell_4), Predicate(touch, cell_0, cell_3), Predicate(touch, cell_7, cell_4), Predicate(touch, cell_5, cell_2), Predicate(touch, cell_3, cell_6), Predicate(touch, cell_7, cell_6), Predicate(touch, cell_3, cell_0), Predicate(touch, cell_1, cell_2), Predicate(touch, cell_0, cell_1), Predicate(touch, cell_4, cell_5), Predicate(touch, cell_2, cell_1), Predicate(touch, cell_1, cell_4)]


### Fonction pour générer toutes les actions possibles du problème

In [105]:
def generate_klist(variables, k):
    return list(permutations(variables, k))

def all_actions(domain, problem):
    actions = []
    domain_actions_list = list(domain.actions)  # Convert to list once to avoid repeated conversions
    dict_possibility = {}
    for action in domain_actions_list:

        action_copy = copy.deepcopy(action)
        # Optimization: Generate possibilities once and use len(action_copy.parameters) directly
        if len(action_copy.parameters) not in dict_possibility:
            dict_possibility[len(action_copy.parameters)] = generate_klist(copy.deepcopy(list(problem.objects)), len(action_copy.parameters))
        
        possiblity = dict_possibility[len(action_copy.parameters)]

        
        for klist in possiblity:
            dict_action = {'name': '', 'preconditions': [], 'positive effects': [], 'negative effects': []}
            
            # Construct action name
            dict_action['name'] = action_copy.name.__str__() + '(' + ', '.join(v.__str__() for v in klist) + ')'
            dict_var = dict(zip(action_copy.parameters, klist))

            # Process preconditions
            for precondition in copy.deepcopy(action.precondition.operands):
                terms = tuple(dict_var[v] for v in precondition.__getstate__()['_terms'])
                precondition.__getstate__()['_terms'] = terms
                dict_action['preconditions'].append(precondition)

            # Process effects
            effects = copy.deepcopy(action.effect.operands)
            for effect in effects:
                if isinstance(effect, pddl.logic.predicates.Predicate):
                    terms = tuple(dict_var[v] for v in effect.__getstate__()['_terms'])
                    effect.__getstate__()['_terms'] = terms
                    dict_action['positive effects'].append(effect)
                elif isinstance(effect, pddl.logic.base.Not):
                    terms = tuple(dict_var[v] for v in effect.__getstate__()['_arg'].__getstate__()['_terms'])
                    effect.__getstate__()['_arg'].__getstate__()['_terms'] = terms
                    dict_action['negative effects'].append(effect)
            
            # Check if action is possible
            if is_possible_actions(dict_action):
                actions.append(dict_action) 
    return actions


def remove_negative_effect(actions):
    actions_copy = copy.deepcopy(actions)
    for action in actions_copy:
        action['negative effects'] = []
    return actions_copy

All_actions = all_actions(domain, problem)
Relaxed_all_actions = remove_negative_effect(All_actions)

print(len(All_actions))


360


### Application d'actions sur state

In [106]:
buffer_actions={}
def applicable_actions(state, actions): # return applicable actions for a state
    if tuple(state) not in buffer_actions:
        buffer_actions[tuple(state)]= [a for a in actions if all(p in state for p in a['preconditions'])]
    return buffer_actions[tuple(state)]
# print(applicable_actions(init_state, All_actions))


def apply_action(state, action): # return new state after applying an action
    new_state = list(copy.deepcopy(state))
    for e in action['positive effects']:
        new_state.append(e)
    for e in action['negative effects']:
        new_state.remove(e.__getstate__()['_arg'])
    return tuple(new_state)

# print(apply_action(init_state, applicable_actions(init_state, All_actions)[0]))

def is_applicable(state, action): # return True if an action is applicable for a state
    copy_state = copy.deepcopy(state)   
    
    try:
        apply_action(copy_state, action)
        return True
    except:
        return False

# action=[d for d in all_actions(domain, problem) if d['name']=='move(d0, d1, p2)'][0]
# print(is_applicable(init_state, action))

# def misapply_action(state, action): # return new state after misapplying an action
#     new_state = copy.deepcopy(state)
#     for e in action['positive effects']:
#         new_state.remove(e)
#     for e in action['negative effects']:
#         new_state.append(e.__getstate__()['_arg'])
#     return new_state


# def previous_actions(state, actions): # return previous actions for a state
#     previous_actions = []
#     for a in actions:
#         cdt1 = all([p in state for p in a['positive effects']])
#         cdt2 = all([p.__getstate__()['_arg'] not in state for p in a['negative effects']])
#         cdt3 = all([p in state for p in (set(a['preconditions'])-set([p.__getstate__()['_arg'] for p in a['negative effects']]))])
#         if cdt1 and cdt2 and cdt3:
#             previous_actions.append(a)
#     return previous_actions

# print(len(previous_actions(goal_state, all_actions(domain, problem))))

# print(misapply_action(problem.goal.__getstate__()['_operands'], previous_actions(problem.goal.__getstate__()['_operands'], all_actions(domain, problem))[0]))



### Vérfication de l'action

In [107]:
def valid_plan(plan): # return True if a plan is valid
    state = copy.deepcopy(init_state)
    for a in plan:
        print(a['name'])
        if is_applicable(state, a):
            state = apply_action(state, a)
        else:
            return False
    return set(goal_state).issubset(set(state))

### Heuristiques

In [108]:

def h_apply_action(state, action):
    new_state = list(copy.deepcopy(state))
    for e in action['positive effects']:
        if e not in new_state:
            new_state.append(e)
    return tuple(new_state)


def h_relaxed(state, goal_state):
    frontier = [(0, state)]  # Use a heap for frontier to implement A* search
    visited = set()
    state_to_cost = {tuple(state): 0}

    def heuristic(state, goal_state):
        # Count the number of missing goals in the current state
        return len(set(goal_state) - set(state))

    while frontier:
        _, current_state = heapq.heappop(frontier)  # Use heappop to extract the minimum element
        current_state_tuple = tuple(current_state)

        if any(set(current_state).issubset(s) for s in visited):
            continue

        visited.add(current_state_tuple)

        if set(goal_state).issubset(set(current_state)):
            return state_to_cost[current_state_tuple]

        for action in applicable_actions(current_state, All_actions):
            new_state = apply_action(current_state, action)
            new_state_tuple = tuple(new_state)

            if new_state_tuple not in state_to_cost or state_to_cost[new_state_tuple] > state_to_cost[current_state_tuple] + 1:
                state_to_cost[new_state_tuple] = state_to_cost[current_state_tuple] + 1
                f = state_to_cost[new_state_tuple] + heuristic(new_state, goal_state)  # Calculate the evaluation function
                heapq.heappush(frontier, (f, new_state))  # Use heappush to add the new element to the heap

    return float('inf')





### Algo de recherche

In [109]:
import heapq  # Importe le module heapq pour utiliser la file de priorité
import copy

def reconstruct_path(came_from, start, goal):
    current = goal
    plan = []

    while current != start:
        found = False  # Indicateur pour vérifier si l'état courant est trouvé dans les clés
        for key in came_from.keys():
            if set(current).issubset(set(key)):
                prev_state, action = came_from[key]  # Récupère l'état précédent et l'action
                plan.append(action)  # Ajoute le nom de l'action au plan
                current = prev_state  # Met à jour l'état courant pour continuer à remonter
                found = True  # Met à jour l'indicateur pour montrer que nous avons trouvé un match
                break  # Sort de la boucle car nous avons trouvé l'état courant dans les clés

        if not found:  # Si après avoir vérifié toutes les clés, aucun match n'est trouvé
            return None  # Retourne None car un chemin complet ne peut pas être reconstruit

    plan.reverse()  # Inverse le plan pour qu'il commence par l'état initial
    return plan

def astar_search(init_state, goal_state, actions):
    init_state_tuple = tuple(copy.deepcopy(init_state))
    goal_state_tuple = tuple(copy.deepcopy(goal_state))

    print("État initial :", init_state_tuple)
    print("État objectif :", goal_state_tuple)
    
    open_list = []
    heapq.heappush(open_list, (h_relaxed(init_state_tuple, copy.deepcopy(goal_state)), 0, init_state_tuple))
    
    came_from = {}
    cost_so_far = {init_state_tuple: 0}

    iteration = 0  # Ajout pour suivre le nombre d'itérations
    while open_list:
        _, current_cost, current_state = heapq.heappop(open_list)
        
        print(f"Iteration {iteration}")
        iteration += 1

        if set(goal_state).issubset(set(current_state)):
            print("Objectif atteint !")
            return reconstruct_path(came_from, init_state_tuple, goal_state_tuple)
        
        for action in applicable_actions(list(current_state), actions):
            new_state = tuple(apply_action(list(current_state), action))
            # print(f"    Applique l'action: {action['name']} -> Nouvel état: {new_state}")
            new_cost = current_cost + 1  # Supposons un coût constant par action
            if new_state not in cost_so_far or new_cost < cost_so_far[new_state]:
                cost_so_far[new_state] = new_cost
                h = h_relaxed(new_state, copy.deepcopy(goal_state))
                print(h)
                priority = new_cost + h
                heapq.heappush(open_list, (priority, new_cost, new_state))
                came_from[new_state] = (current_state, action)

        if iteration > 100000:  # Condition de sortie pour éviter la boucle infinie pendant le débogage
            print("Arrêt")
            return None
                
    print("Aucun chemin trouvé.")
    return None

# Assurez-vous que `init_state` et `goal_state` sont convertis en tuples là où vous les définissez
import time
start_time = time.time()

path_actions = astar_search(init_state, goal_state, All_actions)
print("Plan trouvé :", path_actions)
for action in path_actions:
    print(action['name'])

print("\n")
print("Nombre d'actions dans le plan :", len(path_actions))
print("Temps d'exécution : %s secondes" % (time.time() - start_time))

État initial : (Predicate(touch, cell_7, cell_8), Predicate(on, tile_4, cell_6), Predicate(touch, cell_8, cell_5), Predicate(touch, cell_4, cell_1), Predicate(touch, cell_4, cell_7), Predicate(on, tile_5, cell_3), Predicate(touch, cell_1, cell_0), Predicate(touch, cell_8, cell_7), Predicate(touch, cell_5, cell_4), Predicate(touch, cell_4, cell_3), Predicate(on, tile_2, cell_4), Predicate(empty, cell_8), Predicate(touch, cell_6, cell_7), Predicate(touch, cell_5, cell_8), Predicate(touch, cell_2, cell_5), Predicate(on, tile_3, cell_0), Predicate(touch, cell_6, cell_3), Predicate(touch, cell_3, cell_4), Predicate(touch, cell_0, cell_3), Predicate(touch, cell_7, cell_4), Predicate(touch, cell_5, cell_2), Predicate(touch, cell_3, cell_6), Predicate(touch, cell_7, cell_6), Predicate(touch, cell_3, cell_0), Predicate(touch, cell_1, cell_2), Predicate(touch, cell_0, cell_1), Predicate(touch, cell_4, cell_5), Predicate(touch, cell_2, cell_1), Predicate(on, tile_0, cell_1), Predicate(on, tile_7,

KeyboardInterrupt: 

In [None]:
#on vérifie le plan obtenu
print(valid_plan(list(path_actions)))

move(d0, d1, p1)
move(d1, d2, p2)
move(d0, p1, d1)
move(d2, d3, p1)
move(d0, d1, d3)
move(d1, p2, d2)
move(d0, d3, d1)
move(d3, p0, p2)
move(d0, d1, d3)
move(d1, d2, p0)
move(d0, d3, d1)
move(d2, p1, d3)
move(d0, d1, p1)
move(d1, p0, d2)
move(d0, p1, d1)
True


# Librairie unified-planning

### Solveur


In [None]:
! pip install 'unified-planning[fast-downward]'


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
from unified_planning.io import PDDLReader
from unified_planning.shortcuts import *
from unified_planning.engines import PlanGenerationResultStatus

reader = PDDLReader()
pddl_problem = reader.parse_problem('data/taquin_domain.pddl', 'data/taquin-size3x3-conf_6.pddl')
# print(pddl_problem)


with OneshotPlanner(
    problem_kind=pddl_problem.kind,
    optimality_guarantee=PlanGenerationResultStatus.SOLVED_OPTIMALLY,
) as planner:
    final_report = planner.solve(pddl_problem)
    plan = final_report.plan

print("Plan found")   
print(plan)

KeyboardInterrupt: 

### Actions applicables, paramètres applicables modification de l'état

In [None]:
# print(pddl_problem)
from unified_planning.io import PDDLReader
from unified_planning.shortcuts import *
from unified_planning.plans import ActionInstance


reader = PDDLReader()
pddl_problem = reader.parse_problem('data/domain.pddl', 'data/problem.pddl')

simulator = SequentialSimulator(pddl_problem)
init_state2=simulator.get_initial_state()
goal=pddl_problem.goals

(action_name, para) = tuple(simulator.get_applicable_actions(init_state2))[0] # récup de la première fonction applicable et des paramètre qui conviennent

action_instance = ActionInstance(action_name, para) #création de l'objet pour appliquer la fonction

new_state = simulator.apply(init_state2, action_instance) #on applique la fonction sur l'état

print(new_state)


{clear(d1): true, on(d0, p1): true, on(d0, d1): false, clear(p1): false, clear(d0): true, clear(p2): true, on(d1, d2): true, on(d2, d3): true, on(d3, p0): true, smaller(d0, d1): true, smaller(d0, d2): true, smaller(d0, d3): true, smaller(d0, p0): true, smaller(d0, p1): true, smaller(d0, p2): true, smaller(d1, d2): true, smaller(d1, d3): true, smaller(d1, p0): true, smaller(d1, p1): true, smaller(d1, p2): true, smaller(d2, d3): true, smaller(d2, p0): true, smaller(d2, p1): true, smaller(d2, p2): true, smaller(d3, p0): true, smaller(d3, p1): true, smaller(d3, p2): true, clear(d2): false, clear(d3): false, clear(p0): false, on(d0, d0): false, on(d1, d0): false, on(d2, d0): false, on(d3, d0): false, on(p0, d0): false, on(p1, d0): false, on(p2, d0): false, on(d1, d1): false, on(d2, d1): false, on(d3, d1): false, on(p0, d1): false, on(p1, d1): false, on(p2, d1): false, on(d0, d2): false, on(d2, d2): false, on(d3, d2): false, on(p0, d2): false, on(p1, d2): false, on(p2, d2): false, on(d0, d3)

# Librairie pddlpy

In [None]:
import pddlpy

# Load your domain and problem PDDL files
domprob = pddlpy.DomainProblem('data/domain.pddl', 'data/problem.pddl')
print(domprob.worldobjects())
print(domprob.initialstate())
print(domprob.goals())
print(domprob.operators())
print(domprob.ground_operator('move'))
print(domprob.domain.operators['move'])

help(domprob.ground_operator('move'))

{'clear': None, 'p2': None, 'p1': None, 'on': None, 'd3': None, 'd2': None, 'smaller': None, 'd1': None, 'p0': None, 'd0': None}
{('on', 'd2', 'd3'), ('smaller', 'd1', 'p0'), ('smaller', 'd0', 'p1'), ('smaller', 'd1', 'd3'), ('on', 'd3', 'p0'), ('smaller', 'd1', 'p2'), ('smaller', 'd1', 'd2'), ('clear', 'd0'), ('on', 'd1', 'd2'), ('smaller', 'd3', 'p0'), ('smaller', 'd2', 'p0'), ('smaller', 'd0', 'd3'), ('smaller', 'd3', 'p1'), ('clear', 'p2'), ('smaller', 'd2', 'p2'), ('clear', 'p1'), ('smaller', 'd0', 'p2'), ('smaller', 'd3', 'p2'), ('smaller', 'd2', 'd3'), ('on', 'd0', 'd1'), ('smaller', 'd0', 'p0'), ('smaller', 'd1', 'p1'), ('smaller', 'd2', 'p1'), ('smaller', 'd0', 'd2'), ('smaller', 'd0', 'd1')}
{('on', 'd0', 'd1'), ('smaller', 'd1', 'p1'), ('clear', 'p1'), ('smaller', 'd0', 'd1'), ('smaller', 'd2', 'p2'), ('smaller', 'd0', 'p0'), ('on', 'd2', 'd3'), ('smaller', 'd0', 'd2'), ('smaller', 'd3', 'p1'), ('clear', 'd0'), ('smaller', 'd3', 'p0'), ('smaller', 'd0', 'd3'), ('on', 'd1', '