Skip to content

Commit

Permalink
Add the LULESH sample specification to TravisCI (#101)
Browse files Browse the repository at this point in the history
* Refactoring and addition of running Maestro in the foreground.

* Addition of LULESH testing to tox.ini

* Indentation fix.

* Tweaks to environmonts and envlist.

* Tweak to reduce time between conductor loops

* Addition of travis config to tox.ini

* Elevated DAG logging messages to debug

* Conversion of some more info logging to debug.

* Added an override for output path.

* Update to point to to a fixed output path.
  • Loading branch information
FrankD412 committed May 21, 2018
1 parent f8d99da commit faf9648
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 67 deletions.
91 changes: 53 additions & 38 deletions maestrowf/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,36 +123,13 @@ def setup_logging(args, name):
logger.debug("DEBUG Logging Level -- Enabled")


def main():
"""Run the main segment of the conductor."""
# Set up and parse the ArgumentParser
parser = setup_argparser()
args = parser.parse_args()

# Unpickle the ExecutionGraph
study_pkl = glob.glob(os.path.join(args.directory, "*.pkl"))
# We expect only a single pickle file.
if len(study_pkl) == 1:
dag = ExecutionGraph.unpickle(study_pkl[0])
else:
if len(study_pkl) > 1:
msg = "More than one pickle found. Expected only one. Aborting."
status = 2
else:
msg = "No pickle found. Aborting."
status = 1

sys.stderr.write(msg)
sys.exit(status)

# Set up logging
setup_logging(args, dag.name)
# Use ExecutionGraph API to determine next jobs to be launched.
logger.info("Checking the ExecutionGraph for study '%s' located in "
"%s...", dag.name, study_pkl[0])
logger.info("Study Description: %s", dag.description)

cancel_lock_path = os.path.join(args.directory, ".cancel.lock")
def monitor_study(dag, pickle_path, cancel_lock_path, sleep_time):
"""Monitor a running study."""
logger.debug("\n -------- Calling monitor study -------\n"
"pkl path = %s"
"cancel path = %s"
"sleep time = %s",
pickle_path, cancel_lock_path, sleep_time)

study_complete = False
while not study_complete:
Expand All @@ -173,18 +150,56 @@ def main():
# Execute steps that are ready
study_complete = dag.execute_ready_steps()
# Re-pickle the ExecutionGraph.
dag.pickle(study_pkl[0])
dag.pickle(pickle_path)
# Write out the state
dag.write_status(os.path.split(study_pkl[0])[0])
dag.write_status(os.path.split(pickle_path)[0])
# Sleep for SLEEPTIME in args
sleep(args.sleeptime)
sleep(sleep_time)

logger.info("Cleaning up...")
dag.cleanup()
logger.info("Squeaky clean!")

# Explicitly return a 0 status.
sys.exit(0)
def main():
"""Run the main segment of the conductor."""
try:
# Set up and parse the ArgumentParser
parser = setup_argparser()
args = parser.parse_args()

# Unpickle the ExecutionGraph
study_pkl = glob.glob(os.path.join(args.directory, "*.pkl"))
# We expect only a single pickle file.
if len(study_pkl) == 1:
dag = ExecutionGraph.unpickle(study_pkl[0])
else:
if len(study_pkl) > 1:
msg = "More than one pickle found. Expected one. Aborting."
status = 2
else:
msg = "No pickle found. Aborting."
status = 1

sys.stderr.write(msg)
sys.exit(status)

# Set up logging
setup_logging(args, dag.name)
# Use ExecutionGraph API to determine next jobs to be launched.
logger.info("Checking the ExecutionGraph for study '%s' located in "
"%s...", dag.name, study_pkl[0])
logger.info("Study Description: %s", dag.description)

cancel_lock_path = os.path.join(args.directory, ".cancel.lock")
logger.info("Starting to monitor '%s'", dag.name)
monitor_study(dag, study_pkl[0], cancel_lock_path, args.sleeptime)

logger.info("Cleaning up...")
dag.cleanup()
logger.info("Squeaky clean!")

# Explicitly return a 0 status.
sys.exit(0)
except Exception as e:
logger.error(e.message, exc_info=True)
raise e


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ def execute_ready_steps(self):
logger.debug("'%s' found to be initialized. Checking "
"dependencies. ", key)

logger.info(
logger.debug(
"Unfulfilled dependencies: %s",
self._dependencies[record.name])

Expand All @@ -754,7 +754,7 @@ def execute_ready_steps(self):
self._dependencies[record.name])
self._dependencies[record.name] = \
self._dependencies[record.name] - set(s_completed)
logger.info(
logger.debug(
"Completed dependencies: %s\n"
"Remaining dependencies: %s",
s_completed, self._dependencies[record.name])
Expand Down
6 changes: 3 additions & 3 deletions maestrowf/datastructures/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from collections import deque, OrderedDict
import logging

from ..abstracts import Graph
from maestrowf.abstracts.graph import Graph

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -94,12 +94,12 @@ def add_edge(self, src, dest):
return

if dest in self.adjacency_table[src]:
logger.info("Edge (%s, %s) already in DAG. Returning.", src, dest)
logger.debug("Edge (%s, %s) already in DAG. Returning.", src, dest)
return

# If dest is not already and edge from src, add it.
self.adjacency_table[src].append(dest)
logging.info("Edge (%s, %s) added.", src, dest)
logging.debug("Edge (%s, %s) added.", src, dest)

# Check to make sure we've not created a cycle.
if self.detect_cycle():
Expand Down
81 changes: 59 additions & 22 deletions maestrowf/maestro.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import tabulate
import time

from maestrowf.conductor import monitor_study
from maestrowf.datastructures import YAMLSpecification
from maestrowf.datastructures.core import Study
from maestrowf.datastructures.environment import Variable
Expand Down Expand Up @@ -97,18 +98,41 @@ def run_study(args):

# Set up the output directory.
out_dir = environment.remove("OUTPUT_PATH")
if out_dir is None:
# If we don't find OUTPUT_PATH in the environment, assume pwd.
out_dir = os.path.abspath("./")
else:
# We just take the value from the environment.
out_dir = os.path.abspath(out_dir.value)
if args.out:
# If out is specified in the args, ignore OUTPUT_PATH.
output_path = os.path.abspath(make_safe_path(args.out))

# If we are automatically launching, just set the input as yes.
if os.path.exists(output_path):
if args.autoyes:
uinput = "y"
elif args.autono:
uinput = "n"
else:
uinput = six.moves.input(
"Output path already exists. Would you like to overwrite "
"it? [yn] ")

if uinput.lower() in ACCEPTED_INPUT:
print("Cleaning up existing out path...")
shutil.rmtree(output_path)
else:
print("Opting to quit -- not cleaning up old out path.")
sys.exit(0)

out_name = "{}_{}".format(
spec.name.replace(" ", "_"),
time.strftime("%Y%m%d-%H%M%S")
)
output_path = make_safe_path(out_dir, out_name)
else:
if out_dir is None:
# If we don't find OUTPUT_PATH in the environment, assume pwd.
out_dir = os.path.abspath("./")
else:
# We just take the value from the environment.
out_dir = os.path.abspath(out_dir.value)

out_name = "{}_{}".format(
spec.name.replace(" ", "_"),
time.strftime("%Y%m%d-%H%M%S")
)
output_path = make_safe_path(out_dir, out_name)

# Now that we know outpath, set up logging.
setup_logging(args, output_path, spec.name)
Expand Down Expand Up @@ -166,26 +190,33 @@ def run_study(args):
raise NotImplementedError("The 'dryrun' mode is in development.")

# Pickle up the DAG
exec_dag.pickle(os.path.join(path, "{}.pkl".format(study.name)))
pkl_path = os.path.join(path, "{}.pkl".format(study.name))
exec_dag.pickle(pkl_path)

# If we are automatically launching, just set the input as yes.
if args.autoyes:
uinput = "y"
elif args.autono:
uinput = "n"
else:
uinput = six.moves.input("Would you like to launch the study?[yn] ")
uinput = six.moves.input("Would you like to launch the study? [yn] ")

if uinput.lower() in ACCEPTED_INPUT:
# Launch manager with nohup
cmd = ["nohup", "conductor",
"-t", str(args.sleeptime),
"-d", str(args.debug_lvl),
path,
"&>", "{}.txt".format(os.path.join(
study.output_path, exec_dag.name))]
LOGGER.debug(" ".join(cmd))
Popen(" ".join(cmd), shell=True, stdout=PIPE, stderr=PIPE)
if args.fg:
# Launch in the foreground.
LOGGER.info("Running Maestro Conductor in the foreground.")
cancel_path = os.path.join(path, ".cancel.lock")
monitor_study(exec_dag, pkl_path, cancel_path, args.sleeptime)
else:
# Launch manager with nohup
cmd = ["nohup", "conductor",
"-t", str(args.sleeptime),
"-d", str(args.debug_lvl),
path,
"&>", "{}.txt".format(os.path.join(
study.output_path, exec_dag.name))]
LOGGER.debug(" ".join(cmd))
Popen(" ".join(cmd), shell=True, stdout=PIPE, stderr=PIPE)

return 0

Expand Down Expand Up @@ -228,6 +259,12 @@ def setup_argparser():
run.add_argument("-d", "--dryrun", action="store_true", default=False,
help="Generate the directory structure and scripts for a "
"study but do not launch it. [Default: %(default)s]")
run.add_argument("-o", "--out", type=str,
help="Output path to place study in. [NOTE: overrides "
"OUTPUT_PATH in the specified specification]")
run.add_argument("-fg", action="store_true", default=False,
help="Runs the backend conductor in the foreground "
"instead of using nohup. [Default: %(default)s]")

prompt_opts = run.add_mutually_exclusive_group()
prompt_opts.add_argument(
Expand Down
14 changes: 12 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@
# and then run "tox" from this directory.

[tox]
envlist = py27, py36
envlist = py{27,34,35,36}, lulesh, nose
skip_missing_interpreters=True

[testenv]
[travis]
python =
2.7: py27, nose, lulesh
3.4: py34, nose, lulesh
3.5: py35, nose, lulesh
3.6: py36, nose, lulesh

[testenv:nose]
deps = nose
commands =
nosetests

[testenv:lulesh]
commands = maestro run -fg -y -s 10 ./samples/lulesh/lulesh_sample1_unix.yaml -o ./testing/lulesh

0 comments on commit faf9648

Please sign in to comment.