Skip to content

Commit

Permalink
Refactor of the Study class to breakdown complex APIs (#118)
Browse files Browse the repository at this point in the history
* Minor docstring correction.

* Change to study setup API to break out workspace creation.

* Update Maestro frontend to use new Study method.

* Renamed Study.setup to be better communicate its functionality.

* Correction to style for configure_study method.

* Moved environment application to add_step.

* Split out acquiring environment elements to its own method.

* Updates to staging checks.

* Renamed _setup_linear and _setup_parameterized from setup to stage for clarity.
  • Loading branch information
FrankD412 committed Jul 20, 2018
1 parent b05d4c8 commit a08cf6e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 52 deletions.
84 changes: 41 additions & 43 deletions maestrowf/datastructures/core/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"""Class related to the construction of study campaigns."""
import copy
import logging
import os
import re

from maestrowf.abstracts import SimObject
Expand Down Expand Up @@ -228,6 +229,11 @@ def add_step(self, step):
"""
# Add the node to the DAG.
self.add_node(step.name, step)
logger.info(
"Adding step '%s' to study '%s'...", step.name, self.name)
# Apply the environment to the incoming step.
step.__dict__ = \
apply_function(step.__dict__, self.environment.apply_environment)

# If the step depends on a prior step, create an edge.
if "depends" in step.run and step.run["depends"]:
Expand Down Expand Up @@ -260,14 +266,29 @@ def walk_study(self, src=SOURCE):
for node in path:
yield parents[node], node, self.values[node]

def setup(self, submission_attempts=1, restart_limit=1, throttle=0,
use_tmp=False):
def setup_workspace(self):
"""Set up the study's main workspace directory."""
try:
logger.info("Setting up study workspace in '%s'", self._out_path)
create_parentdir(self._out_path)
except Exception as e:
logger.error(e.message)
return False

def setup_environment(self):
"""Set up the environment by acquiring outside dependencies."""
# Set up the environment if it hasn't been already.
if not self.environment.is_set_up:
logger.info("Environment is setting up.")
self.environment.acquire_environment()

def configure_study(self, submission_attempts=1, restart_limit=1,
throttle=0, use_tmp=False):
"""
Perform initial setup of a study.
Perform initial configuration of a study.
The method is used for going through and actually acquiring each
dependency, substituting variables, sources and labels. Also sets up
the folder structure for the study.
dependency, substituting variables, sources and labels.
:param submission_attempts: Number of attempted submissions before
marking a step as failed.
Expand All @@ -279,11 +300,6 @@ def setup(self, submission_attempts=1, restart_limit=1, throttle=0,
ExecutionGraph dumps its information into a temporary directory.
:returns: True if the Study is successfully setup, False otherwise.
"""
# If the study has been set up, just return.
if self._issetup:
logger.info("%s is already set up, returning.")
return True

self._submission_attempts = submission_attempts
self._restart_limit = restart_limit
self._submission_throttle = throttle
Expand All @@ -301,32 +317,7 @@ def setup(self, submission_attempts=1, restart_limit=1, throttle=0,
use_tmp
)

# Set up the environment if it hasn't been already.
if not self.environment.is_set_up:
logger.info("Environment is setting up.")
self.environment.acquire_environment()

try:
logger.info("Environment is setting up.")
create_parentdir(self._out_path)
except Exception as e:
logger.error(e.message)
return False

# Apply all environment artifcacts and acquire everything.
for key, node in self.values.items():
logger.info("Applying to step '%s' of the study '%s'...",
key, node)
if node:
node.__dict__ = apply_function(
node.__dict__,
self.environment.apply_environment)

# Flag the study as set up.
self._issetup = True
return True

def _setup_parameterized(self):
def _stage_parameterized(self):
"""
Set up the ExecutionGraph of a parameterized study.
Expand Down Expand Up @@ -622,7 +613,7 @@ def _setup_parameterized(self):

return self._out_path, dag

def _setup_linear(self):
def _stage_linear(self):
"""
Execute a linear workflow without parameters.
Expand Down Expand Up @@ -708,10 +699,17 @@ def stage(self):
:param throttle: Maximum number of in progress jobs allowed.
:returns: An ExecutionGraph object with the expanded workflow.
"""
# If not set up, return None.
if not self._issetup:
msg = "Study {} is not set up for staging. Run setup before " \
"attempting to stage.".format(self.name)
# If the workspace doesn't exist, raise an exception.
if not os.path.exists(self._out_path):
msg = "Study {} is not set up for staging. Workspace does not " \
"exists (Output Dir = {}).".format(self.name, self._out_path)
logger.error(msg)
raise Exception(msg)

# If the environment isn't set up, raise an exception.
if not self.environment.is_set_up:
msg = "Study {} is not set up for staging. Environment is not " \
"set up. Aborting.".format(self.name)
logger.error(msg)
raise Exception(msg)

Expand Down Expand Up @@ -739,6 +737,6 @@ def stage(self):
# 2. A linear, execute as specified workflow
# NOTE: This scheme could be how we handle derived use cases.
if self.parameters:
return self._setup_parameterized()
return self._stage_parameterized()
else:
return self._setup_linear()
return self._stage_linear()
4 changes: 1 addition & 3 deletions maestrowf/datastructures/core/studyenvironment.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,7 @@ def remove(self, key):
return None

def acquire_environment(self):
"""
Acquire any environment items that may be stored remotely.
"""
"""Acquire any environment items that may be stored remotely."""
if self._is_set_up:
logger.info("Environment already set up. Returning.")
return
Expand Down
12 changes: 6 additions & 6 deletions maestrowf/maestro.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ def run_study(args):
LOGGER.error(_msg)
raise ArgumentError(_msg)

study.setup(
throttle=args.throttle,
submission_attempts=args.attempts,
restart_limit=args.rlimit,
use_tmp=args.usetmp
)
# Set up the study workspace and configure it for execution.
study.setup_workspace()
study.setup_environment()
study.configure_study(
throttle=args.throttle, submission_attempts=args.attempts,
restart_limit=args.rlimit, use_tmp=args.usetmp)

# Stage the study.
path, exec_dag = study.stage()
Expand Down

0 comments on commit a08cf6e

Please sign in to comment.