Skip to content

Commit

Permalink
Correction of how dependency ordering happens during study constructi…
Browse files Browse the repository at this point in the history
…on. (#97)

Introduces the ability to specify new types of dependencies and corrects study construction.

This PR introduces a bug fix for a problem that's persisted for a while -- previously study construction would use a BFS spanning tree to attempt to construct the ExecutionGraph but that implementation fell short because BFS does not guarantee all parents are visited before their children. That meant that multi-step dependent study steps would fail. This bug fix now makes it so that Maestro uses a topological sort in order to construct the study which guarantee all ancestors of a step will be visited before attempting to construct the step.

Additionally, this PR also introduces different types of dependencies to nodes. The major addition is the creation and specification of the "_*" or "*" dependencies which we call "parameter independent" dependencies. The notation represents dependencies where a step requires that all combinations of another step have completed (or it waits to funnel all combinations into the step). Because these steps don't make use of the parameters for a dependency, they are not parameterized based on them (because they only want the resulting combinations). 

* Addition of topological sorting to the DAG.

* Correction to the construction of a linear study.

* Tweak to check for node/proc values instead of existence.

* A correction to additional kwargs for SLURM.

* Checkpoint #1 -- reimplementation of parameterization.

* Addition of hub case code.

* Complete rework of parameterized expansion

* Moved make_safe_path to general utils.

* Tweaks and some bug fixes.

* Correction to logging bug.

* Addition of topological sorting to the DAG.

* Correction to the construction of a linear study.

* Tweak to check for node/proc values instead of existence.

* A correction to additional kwargs for SLURM.

* Checkpoint #1 -- reimplementation of parameterization.

* Addition of hub case code.

* Complete rework of parameterized expansion

* Moved make_safe_path to general utils.

* Tweaks and some bug fixes.

* Correction to logging bug.

* Addition of topological sorting to the DAG.

* Correction to the construction of a linear study.

* Tweak to check for node/proc values instead of existence.

* A correction to additional kwargs for SLURM.

* Checkpoint #1 -- reimplementation of parameterization.

* Addition of hub case code.

* Complete rework of parameterized expansion

* Moved make_safe_path to general utils.

* Tweaks and some bug fixes.

* Correction to logging bug.

* Checkpoint #1 -- reimplementation of parameterization.

* Tweaks and some bug fixes.

* Rework of where OUTPUT_PATH is defined.

* Tweaks and refactors to fix bugs and other small items.

* Updates to samples to conform to new mechanics.

* Added tweaks to handle workspaces for funnels.

* Removal of a missed print statement.

* Updates to how dependencies are checked to account for _*

* Tweak to make use of new dependency map to avoid iteration.

* Updates to add a funnel step to samples.

* Tweaks that put linear steps into their own workspaces.

* Added different flavors of funnel steps to samples.

* Echo messages to document some nomenclature.
  • Loading branch information
FrankD412 committed May 20, 2018
1 parent 7f8b237 commit 661af30
Show file tree
Hide file tree
Showing 9 changed files with 501 additions and 242 deletions.
6 changes: 3 additions & 3 deletions maestrowf/abstracts/interfaces/schedulerscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ def get_scheduler_command(self, step):

# If the user is requesting nodes, we need to request the nodes and
# set up the command with scheduling.
_procs = step.run.get("procs")
_nodes = step.run.get("nodes")
if _procs or _nodes:
_nodes = step.run.get("nodes", 0)
_procs = step.run.get("procs", 0)
if _nodes or _procs:
to_be_scheduled = True
cmd = self._substitute_parallel_command(
step.run["cmd"],
Expand Down
68 changes: 47 additions & 21 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from maestrowf.abstracts.enums import JobStatusCode, State, SubmissionCode, \
CancelCode
from maestrowf.datastructures.dag import DAG
from maestrowf.datastructures.environment import Variable
from maestrowf.interfaces import ScriptAdapterFactory
from maestrowf.utils import create_parentdir

Expand All @@ -29,7 +30,7 @@ class to the ExecutionGraph and maintains all information for any given
step in the DAG.
"""

def __init__(self, **kwargs):
def __init__(self, workspace, step, **kwargs):
"""
Initialize a new instance of a StepRecord.
Expand All @@ -45,13 +46,13 @@ def __init__(self, **kwargs):
tmp_dir: A provided temp directory to write scripts to instead of step
workspace.
"""
self.workspace = kwargs.get("workspace", "")
self.workspace = Variable("OUTPUT_PATH", workspace)

self.jobid = kwargs.get("jobid", [])
self.script = kwargs.get("script", "")
self.restart_script = kwargs.get("restart", "")
self.to_be_scheduled = False
self.step = kwargs.get("step", None)
self.step = step
self.restart_limit = kwargs.get("restart_limit", 3)

# Status Information
Expand All @@ -63,7 +64,7 @@ def __init__(self, **kwargs):

def setup_workspace(self):
"""Initialize the record's workspace."""
create_parentdir(self.workspace)
create_parentdir(self.workspace.value)

def generate_script(self, adapter, tmp_dir=""):
"""
Expand All @@ -76,7 +77,9 @@ def generate_script(self, adapter, tmp_dir=""):
if tmp_dir:
scr_dir = tmp_dir
else:
scr_dir = self.workspace
scr_dir = self.workspace.value

self.step.run["cmd"] = self.workspace.substitute(self.step.run["cmd"])

logger.info("Generating script for %s into %s", self.name, scr_dir)
self.to_be_scheduled, self.script, self.restart_script = \
Expand All @@ -103,12 +106,12 @@ def restart(self, adapter):
def _execute(self, adapter, script):
if self.to_be_scheduled:
retcode, jobid = adapter.submit(
self.step, script, self.workspace)
self.step, script, self.workspace.value)
else:
self.mark_running()
ladapter = ScriptAdapterFactory.get_adapter("local")()
retcode, jobid = ladapter.submit(
self.step, script, self.workspace)
self.step, script, self.workspace.value)

return retcode, jobid

Expand Down Expand Up @@ -315,6 +318,12 @@ def __init__(self, submission_attempts=1, submission_throttle=0,
self._submission_attempts = submission_attempts
self._submission_throttle = submission_throttle

# A map that tracks the dependencies of a step.
# NOTE: I don't know how performant the Python dict structure is, but
# we'll use it for now. I think this may want to be changed to an AVL
# tree or something of that nature to guarantee worst case performance.
self._dependencies = {}

logger.info(
"\n------------------------------------------\n"
"Submission attempts = %d\n"
Expand Down Expand Up @@ -362,8 +371,19 @@ def add_step(self, name, step, workspace, restart_limit):
"restart_limit": restart_limit
}
record = _StepRecord(**data)
self._dependencies[name] = set()
super(ExecutionGraph, self).add_node(name, record)

def add_connection(self, parent, step):
"""
Add a connection between two steps in the ExecutionGraph.
:param parent: The parent step that is required to execute 'step'
:param step: The dependent step that relies on parent.
"""
self.add_edge(parent, step)
self._dependencies[step].add(parent)

def set_adapter(self, adapter):
"""
Set the adapter used to interface for scheduling tasks.
Expand Down Expand Up @@ -544,7 +564,7 @@ def _execute_record(self, record, adapter, restart=False):
retcode = record.restart(adapter)

# Increment the number of restarts we've attempted.
logger.debug("Completed submission attempt %d")
logger.debug("Completed submission attempt %d", num_restarts)
num_restarts += 1

if retcode == SubmissionCode.OK:
Expand Down Expand Up @@ -579,7 +599,7 @@ def write_status(self, path):
for key in keys:
value = self.values[key]
_ = [
value.name, os.path.split(value.workspace)[1],
value.name, os.path.split(value.workspace.value)[1],
str(value.status), value.run_time, value.elapsed_time,
value.time_start, value.time_submitted, value.time_end,
str(value.restarts)
Expand Down Expand Up @@ -723,18 +743,24 @@ def execute_ready_steps(self):
# that needs consideration.
if record.status == State.INITIALIZED:
logger.debug("'%s' found to be initialized. Checking "
"dependencies...", key)
# Count the number of its dependencies have finised.
num_finished = 0
for dependency in record.step.run["depends"]:
logger.debug("Checking '%s'...", dependency)
if dependency in self.completed_steps:
logger.debug("Found in completed steps.")
num_finished += 1
# If the total number of dependencies finished is the same
# as the number of dependencies the step has, it's ready to
# be executed. Add it to the map.
if num_finished == len(record.step.run["depends"]):
"dependencies. ", key)

logger.info(
"Unfulfilled dependencies: %s",
self._dependencies[record.name])

s_completed = filter(
lambda x: x in self.completed_steps,
self._dependencies[record.name])
self._dependencies[record.name] = \
self._dependencies[record.name] - set(s_completed)
logger.info(
"Completed dependencies: %s\n"
"Remaining dependencies: %s",
s_completed, self._dependencies[record.name])

# If the gating dependencies set is empty, we can execute.
if not self._dependencies[record.name]:
if key not in self.ready_steps:
logger.debug("All dependencies completed. Staging.")
self.ready_steps.append(record)
Expand Down

0 comments on commit 661af30

Please sign in to comment.