Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ jobs:
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
python -m pip install julia
python -m pip install pycall
python -m pip install -r requirements.txt
- name: Lint with flake8
run: |
python -m pip install flake8
Expand Down
5 changes: 2 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ jobs:
version: '1.4.1'
- name: Install Julia dependencies
run: |
julia --color=yes -e 'using Pkg; Pkg.add(Pkg.PackageSpec(path="https://github.com/APLA-Toolbox/PDDL.jl"))'
julia --color=yes -e 'using Pkg; Pkg.add(Pkg.PackageSpec(path="https://github.com/JuliaPy/PyCall.jl"))'
julia --color=yes -e 'using Pkg; Pkg.add(Pkg.PackageSpec(path="https://github.com/APLA-Toolbox/PDDL.jl"))'
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
python -m pip install julia
python -m pip install pycall
python -m pip install -r requirements.txt
- name: Lint with flake8
run: |
python -m pip install flake8
Expand Down
6 changes: 3 additions & 3 deletions .mergify.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
pull_request_rules:
- name: Assign the main reviewers
conditions:
- check-success=tests
- check-success=build
- check-success=CodeFactor
actions:
request_reviews:
teams:
- "@APLA-Toolbox/reviewers"
users:
- guilyx
- sampreets3
- name: Automatic merge on approval
conditions:
- "#approved-reviews-by>=1"
Expand Down
30 changes: 12 additions & 18 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import src.automated_planner as parser
from src.automated_planner import AutomatedPlanner
import argparse
import logging
import os


Expand All @@ -13,22 +12,17 @@ def main():
args_parser.add_argument("problem", type=str, help="PDDL problem file")
args_parser.add_argument("-v", "--verbose", help="Increases the output's verbosity")
args = args_parser.parse_args()
logging.basicConfig(
filename="logs/main.log",
format="%(levelname)s:%(message)s",
filemode="w",
level=logging.INFO,
) # Creates the log file
apla_tbx = parser.AutomatedPlanner(args.domain, args.problem)
logging.info("Starting the tool")
path, time = apla_tbx.depth_first_search(time_it=True)
logging.info(apla_tbx.get_actions_from_path(path))
logging.info("Computation time: %.2f seconds" % time)
logging.info("Tool finished")
# Output the log (to show something in the output screen)
logfile = open("logs/main.log", "r")
print(logfile.read())
logfile.close()
apla_tbx = AutomatedPlanner(args.domain, args.problem)
apla_tbx.logger.info("Starting the planning script")
apla_tbx.logger.debug(
"Available heuristics: " + str(apla_tbx.available_heuristics.keys())
)

path, computation_time = apla_tbx.dijktra_best_first_search(time_it=True)
apla_tbx.logger.debug(apla_tbx.get_actions_from_path(path))

apla_tbx.logger.debug("Computation time: %.2f seconds" % computation_time)
apla_tbx.logger.info("Terminate with grace...")


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
julia==0.5.6
coloredlogs==15.0
82 changes: 82 additions & 0 deletions src/a_star.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from .heuristics import zero_heuristic
from .node import Node
import logging
import math
from time import time as now
from datetime import datetime as timestamp


class AStarBestFirstSearch:
def __init__(self, automated_planner, heuristic_function):
self.automated_planner = automated_planner
self.init = Node(
self.automated_planner.initial_state,
automated_planner,
is_closed=False,
is_open=True,
heuristic=heuristic_function,
)
self.heuristic_function = heuristic_function
self.open_nodes_n = 1
self.nodes = dict()
self.nodes[self.__hash(self.init)] = self.init

def __hash(self, node):
sep = ", Dict{Symbol,Any}"
string = str(node.state)
return string.split(sep, 1)[0] + ")"

def search(self):
time_start = now()
self.automated_planner.logger.debug(
"Search started at: " + str(timestamp.now())
)
while self.open_nodes_n > 0:
current_key = min(
[n for n in self.nodes if self.nodes[n].is_open],
key=(lambda k: self.nodes[k].f_cost),
)
current_node = self.nodes[current_key]

if self.automated_planner.satisfies(
self.automated_planner.problem.goal, current_node.state
):
computation_time = now() - time_start
self.automated_planner.logger.debug(
"Search finished at: " + str(timestamp.now())
)
return current_node, computation_time

current_node.is_closed = True
current_node.is_open = False
self.open_nodes_n -= 1

actions = self.automated_planner.available_actions(current_node.state)
for act in actions:
child = Node(
state=self.automated_planner.transition(current_node.state, act),
automated_planner=self.automated_planner,
parent_action=act,
parent=current_node,
heuristic=self.heuristic_function,
is_closed=False,
is_open=True,
)
child_hash = self.__hash(child)
if child_hash in self.nodes:
if self.nodes[child_hash].is_closed:
continue
if not self.nodes[child_hash].is_open:
self.nodes[child_hash] = child
self.open_nodes_n += 1
else:
if child.g_cost < self.nodes[child_hash].g_cost:
self.nodes[child_hash] = child
self.open_nodes_n += 1

else:
self.nodes[child_hash] = child
self.open_nodes_n += 1
computation_time = now() - time_start
self.automated_planner.logger.warning("!!! No path found !!!")
return None, computation_time
7 changes: 0 additions & 7 deletions src/astar.py

This file was deleted.

109 changes: 34 additions & 75 deletions src/automated_planner.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,58 @@
from .modules import loading_bar_handler
from .bfs import BreadthFirstSearch
from .dfs import DepthFirstSearch
from .dijkstra import DijkstraBestFirstSearch
import logging

UI = False

if UI:
loading_bar_handler(False)
from .a_star import AStarBestFirstSearch
from .heuristics import goal_count_heuristic, zero_heuristic
import coloredlogs, logging
import julia

_ = julia.Julia(compiled_modules=False)

if UI:
loading_bar_handler(True)

_ = julia.Julia(compiled_modules=False, debug=False)
from julia import PDDL
from time import time as now

logging.getLogger("julia").setLevel(logging.WARNING)


class AutomatedPlanner:
def __init__(self, domain_path, problem_path):
def __init__(self, domain_path, problem_path, log_level="DEBUG"):
# Planning Tool
self.pddl = PDDL
self.domain = self.pddl.load_domain(domain_path)
self.problem = self.pddl.load_problem(problem_path)
self.initial_state = self.pddl.initialize(self.problem)
self.goals = self.__flatten_goal()

"""
Transition from one state to the next using an action
"""
self.available_heuristics = dict()
self.available_heuristics["goal_count"] = goal_count_heuristic
self.available_heuristics["zero"] = zero_heuristic

# Logging
logging.basicConfig(
filename="logs/main.log",
format="%(levelname)s:%(message)s",
filemode="w",
level=log_level,
) # Creates the log file
self.logger = logging.getLogger("automated_planning")
coloredlogs.install(level=log_level)

def transition(self, state, action):
return self.pddl.transition(self.domain, state, action, check=False)

"""
Returns all available actions from the given state
"""

def available_actions(self, state):
return self.pddl.available(state, self.domain)

"""
Check if a vector of terms is satisfied by the given state
"""

def satisfies(self, asserted_state, state):
return self.pddl.satisfy(asserted_state, state, self.domain)[0]

"""
Check if the term is satisfied by the state
To do: compare if it's faster to compute the check on a vector of terms in julia or python
"""

def state_has_term(self, state, term):
if self.pddl.has_term_in_state(self.domain, state, term):
return True
else:
return False

"""
Flatten the goal to a vector of terms
To do: check if we can iterate over the jl vector
"""

def __flatten_goal(self):
return self.pddl.flatten_goal(self.problem)

"""
Retrieves the linked list path
"""

def __retrace_path(self, node):
if not node:
return []
Expand All @@ -81,13 +63,9 @@ def __retrace_path(self, node):
path.reverse()
return path

"""
Returns all the actions operated to reach the goal
"""

def get_actions_from_path(self, path):
if not path:
logging.warning("Path is empty, can't operate...")
self.logger.warning("Path is empty, can't operate...")
return []
actions = []
for node in path:
Expand All @@ -99,64 +77,45 @@ def get_actions_from_path(self, path):
else:
return (actions, cost)

"""
Returns all the states that should be opened from start to goal
"""

def get_state_def_from_path(self, path):
if not path:
logging.warning("Path is empty, can't operate...")
self.logger.warning("Path is empty, can't operate...")
return []
trimmed_path = []
for node in path:
trimmed_path.append(node.state)
return trimmed_path

"""
Runs the BFS algorithm on the loaded domain/problem
"""

def breadth_first_search(self, time_it=False):
if time_it:
start_time = now()
bfs = BreadthFirstSearch(self)
last_node = bfs.search()
if time_it:
total_time = now() - start_time
last_node, total_time = bfs.search()
path = self.__retrace_path(last_node)
if time_it:
return path, total_time
else:
return path, None

"""
Runs the DFS algorithm on the domain/problem
"""

def depth_first_search(self, time_it=False):
if time_it:
start_time = now()
dfs = DepthFirstSearch(self)
last_node = dfs.search()
if time_it:
total_time = now() - start_time
last_node, total_time = dfs.search()
path = self.__retrace_path(last_node)
if time_it:
return path, total_time
else:
return path, None

"""
Runs the Dijkstra algorithm on the domain/problem
"""

def dijktra_best_first_search(self, time_it=False):
if time_it:
start_time = now()
dijkstra = DijkstraBestFirstSearch(self)
last_node = dijkstra.search()
last_node, total_time = dijkstra.search()
path = self.__retrace_path(last_node)
if time_it:
total_time = now() - start_time
return path, total_time
else:
return path, None

def astar_best_first_search(self, time_it=False, heuristic=goal_count_heuristic):
astar = AStarBestFirstSearch(self, heuristic)
last_node, total_time = astar.search()
path = self.__retrace_path(last_node)
if time_it:
return path, total_time
Expand Down
Loading