Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Mostly test fixes.

  • Loading branch information...
commit 5eb3bb72e7b6a2cc1b5c5a4b89f57a8a30155e77 1 parent c6ea04e
@dnephin dnephin authored
View
113 tests/core/actionrun_test.py
@@ -5,7 +5,7 @@
from testify.test_case import class_setup, class_teardown
from tron import node
-from tron.core.actionrun import ActionCommand, ActionRunContext, ActionRun
+from tron.core.actionrun import ActionCommand, ActionRunContext, ActionRun, ActionRunCollection
from tron.core.actionrun import InvalidStartStateError
from tron.utils import timeutils
@@ -45,30 +45,24 @@ class ActionRunTestCase(TestCase):
@setup
def setup_action_run(self):
anode = turtle.Turtle()
- output_path = "random_dir"
+ output_path = ["random_dir"]
self.command = "do command"
self.action_run = ActionRun(
- "id", anode,
- timeutils.current_time(),
- self.command,
- output_path=output_path)
+ "id",
+ "action_name",
+ anode,
+ timeutils.current_time(),
+ self.command,
+ output_path=output_path)
def test_init_state(self):
assert_equal(self.action_run.state, ActionRun.STATE_SCHEDULED)
- def test_attempt_start(self):
- assert self.action_run.attempt_start()
- assert_equal(self.action_run.state, ActionRun.STATE_STARTING)
-
- def test_attempt_start_failed(self):
- self.action_run.machine.transition('queue')
- assert not self.action_run.attempt_start()
- assert_equal(self.action_run.state, ActionRun.STATE_QUEUED)
-
def test_start(self):
self.action_run.machine.transition('ready')
assert self.action_run.start()
assert self.action_run.is_starting
+ assert self.action_run.start_time
def test_start_bad_state(self):
self.action_run.fail()
@@ -102,12 +96,14 @@ def test_build_action_command(self):
def test_watcher_running(self):
self.action_run.build_action_command()
self.action_run.machine.transition('start')
- assert self.action_run.watcher(self.action_run.action_command, ActionCommand.RUNNING)
+ assert self.action_run.watcher(
+ self.action_run.action_command, ActionCommand.RUNNING)
assert self.action_run.is_running
def test_watcher_failstart(self):
self.action_run.build_action_command()
- assert self.action_run.watcher(self.action_run.action_command, ActionCommand.FAILSTART)
+ assert self.action_run.watcher(
+ self.action_run.action_command, ActionCommand.FAILSTART)
assert self.action_run.is_failed
def test_watcher_exiting_fail(self):
@@ -145,7 +141,8 @@ def test_watcher_unhandled(self):
assert self.action_run.is_scheduled
def test_success(self):
- assert self.action_run.attempt_start()
+ assert self.action_run.ready()
+ self.action_run.machine.transition('start')
self.action_run.machine.transition('started')
assert self.action_run.is_running
@@ -173,7 +170,8 @@ def test_failure_bad_state(self):
def test_skip(self):
assert not self.action_run.is_running
- assert self.action_run.attempt_start()
+ self.action_run.ready()
+ assert self.action_run.start()
assert self.action_run.fail(-1)
assert self.action_run.skip()
@@ -201,6 +199,7 @@ def test_command_failed_render(self):
def test__getattr__(self):
assert self.action_run.is_succeeded is not None
+ assert self.action_run.cancel()
def test__getattr__missing_attribute(self):
assert_raises(AttributeError,
@@ -211,38 +210,62 @@ class ActionRunStateRestoreTestCase(TestCase):
@setup
def setup_action_run(self):
- anode = turtle.Turtle()
- output_path = "random_dir"
- self.action_run = ActionRun(
- "id", anode,
- timeutils.current_time(),
- "do command",
- output_path=output_path)
+ self.parent_context = {}
+ self.output_path = ['one', 'two']
self.state_data = {
- 'id': "newid",
- 'state': "running",
- 'run_time': "now",
- 'start_time': "now-1",
- 'end_time': None,
- 'command': "do this",
- }
-
- def test_restore_state_running(self):
- self.action_run.restore_state(self.state_data)
- assert self.action_run.is_unknown
+ 'job_run_id': 'theid',
+ 'action_name': 'theaction',
+ 'node_name': 'anode',
+ 'run_time': 'run_time',
+ 'command': 'do things',
+ 'start_time': 'start_time',
+ 'end_time': 'end_time',
+ 'state': 'succeeded'
+ }
+
+ def test_from_state(self):
+ state_data = self.state_data
+ action_run = ActionRun.from_state(
+ state_data, self.parent_context, self.output_path)
+
for key, value in self.state_data.iteritems():
- if key in ['state']:
+ if key in ['state', 'node_name']:
continue
- assert_equal(getattr(self.action_run, key), value)
+ assert_equal(getattr(action_run, key), value)
- def test_restore_state_complete(self):
- self.state_data['end_time'] = "yesterday"
- self.state_data['state'] = 'succeeded'
- self.action_run.restore_state(self.state_data)
- assert self.action_run.is_succeeded
- assert_equal(self.action_run.end_time, "yesterday")
+ assert action_run.is_succeeded
+ assert not action_run.is_cleanup
+
+ def test_from_state_running(self):
+ self.state_data['state'] = 'running'
+ action_run = ActionRun.from_state(
+ self.state_data, self.parent_context, self.output_path)
+ assert action_run.is_unknown
+
+ def test_from_state_no_node_name(self):
+ del self.state_data['node_name']
+ action_run = ActionRun.from_state(
+ self.state_data, self.parent_context, self.output_path)
+ assert action_run.node is None
+
+ def test_from_state_with_node_exists(self):
+ anode = turtle.Turtle(name="anode", hostname="box")
+ node_store = node.NodePoolStore.get_instance()
+ node_store.put(anode)
+ action_run = ActionRun.from_state(
+ self.state_data, self.parent_context, self.output_path)
+ assert_equal(action_run.node, anode)
+ node_store.clear()
+
+
+class ActionRunCollectionTestCase(TestCase):
+
+ @setup
+ def setup_runs(self):
+ self.run_map = {}
+ self.collection = ActionRunCollection(self.run_map)
if __name__ == "__main__":
run()
View
103 tests/mcp_test.py
@@ -8,11 +8,12 @@
from testify.utils import turtle
from tron.core import action, job
-from tron import mcp, scheduler, event
+from tron import mcp, scheduler, event, node
from tests import testingutils
from tron.utils import timeutils
+# TODO: This does not test anything
class TestStateHandler(TestCase):
@class_setup
def class_setup_time(self):
@@ -46,12 +47,13 @@ def teardown_mcp(self):
shutil.rmtree(self.test_dir)
event.EventManager.get_instance().clear()
+
@suite('integration')
def test_reschedule(self):
def callNow(sleep, func, run):
raise NotImplementedError(sleep)
- self.mcp.job_scheduler.next_runs(self.job)
+ #self.mcp.job_scheduler.next_runs(self.job)
#callLate = reactor.callLater
#reactor.callLater = callNow
@@ -141,50 +143,69 @@ def test(self):
assert_raises(mcp.StateFileVersionError, handler._load_data_file, self.data_file)
-class TestMasterControlProgram(TestCase):
+class MasterControlProgramTestCase(TestCase):
@setup
- def build_actions(self):
- self.nodes = turtle.Turtle()
- self.test_dir = tempfile.mkdtemp()
- self.action = action.Action("Test Action", "doit", self.nodes)
- self.job = job.Job("Test Job", self.action)
- self.job.output_path = self.test_dir
- self.mcp = mcp.MasterControlProgram(self.test_dir, "config")
- self.job.node_pool = testingutils.TestPool()
+ def setup_mcp(self):
+ self.working_dir = tempfile.mkdtemp()
+ config_file = tempfile.NamedTemporaryFile(dir=self.working_dir)
+ self.mcp = mcp.MasterControlProgram(self.working_dir, config_file.name)
@teardown
- def teardown_actions(self):
- shutil.rmtree(self.test_dir)
- event.EventManager.get_instance().clear()
+ def teardown_mcp(self):
+ self.mcp.nodes.clear()
+ self.mcp.event_manager.clear()
+
+ def test_live_reconfig(self):
+ pass
+ # TODO: some of these tests are in tests.config.reconfig_test
+
+ def test_load_config(self):
+ pass
+ # TODO
+
+ def config_lines(self):
+ # TODO:
+ pass
+
+ def test_rewrite_config(self):
+ pass
+ # TODO:
+
+ def test_apply_config(self):
+ pass
+ # TODO:
+
+ def test_apply_working_directory(self):
+ pass
+ # TODO
+
+ def test_ssh_options_from_config(self):
+ ssh_conf = turtle.Turtle(agent=False, identities=[])
+ ssh_options = self.mcp._ssh_options_from_config(ssh_conf)
+
+ assert_equal(ssh_options['agent'], False)
+ assert_equal(ssh_options.identitys, [])
+ # TODO: tests with agent and identities
+
+ def test_add_job(self):
+ job_conf = {
+
+ }
+ pass
+
+ def test_add_job_already_exists(self):
+ pass
+
+ def test_remove_job(self):
+ pass
+
+ def test_disable_all(self):
+ pass
+
+ def test_enable_all(self):
+ pass
- def test_schedule_next_run(self):
- act = action.Action("Test Action", "doit", self.nodes)
- jo = job.Job("Test Job", act)
- jo.output_path = self.test_dir
- jo.node_pool = testingutils.TestPool()
- jo.scheduler = scheduler.DailyScheduler()
-
- act.job = jo
- act.command = "Test command"
- act.node = turtle.Turtle()
-
- def call_now(time, func, next):
- next.start()
- next.action_runs[0].succeed()
-
- callLater = mcp.reactor.callLater
- mcp.reactor.callLater = call_now
- try:
- self.mcp.job_scheduler.schedule(jo)
- finally:
- mcp.reactor.callLater = callLater
- next = jo.runs[0]
-
- assert_equal(len(filter(lambda r:r.is_success, jo.runs)), 1)
- assert_equal(jo.topo_actions[0], next.action_runs[0].action)
- assert next.action_runs[0].is_success
- assert next.is_success
View
1  tests/scheduler_test.py
@@ -130,6 +130,7 @@ def unset_time(self):
timeutils.override_current_time(None)
def hours_until_time(self, run_time, sch):
+ # TODO: use timeutils
tz = sch.time_zone
now = timeutils.current_time()
now = tz.localize(now) if tz else now
View
11 tests/serialize/filehandler_test.py
@@ -235,8 +235,19 @@ def test_append(self):
self.path.append('four')
assert_equal(self.path.parts, ['two', 'three', 'four'])
+ def test_clone(self):
+ new_path = self.path.clone()
+ assert_equal(str(new_path), str(self.path))
+ self.path.append('alpha')
+ assert_equal(str(new_path), 'one/two/three')
+ new_path.append('beta')
+ assert_equal(str(self.path), 'one/two/three/alpha')
+
+ def test_clone_with_parts(self):
+ new_path = self.path.clone('seven', 'ten')
+ assert_equal(list(new_path), ['one/two/three', 'seven', 'ten'])
if __name__ == "__main__":
run()
View
29 tron/actioncommand.py
@@ -32,7 +32,7 @@ def __init__(self, id, command, serializer=None):
self.id = id
self.command = command
self.machine = state.StateMachine(
- initial_state=self.PENDING, delegate=self)
+ initial_state=self.PENDING, delegate=self)
self.exit_status = None
self.start_time = None
self.end_time = None
@@ -51,15 +51,17 @@ def attach(self):
return self.machine.attach
def started(self):
- if self.machine.transition("start"):
- self.start_time = timeutils.current_timestamp()
- return True
+ if not self.machine.check('start'):
+ return False
+ self.start_time = timeutils.current_timestamp()
+ return self.machine.transition('start')
def exited(self, exit_status):
- if self.machine.transition("exit"):
- self.end_time = timeutils.current_timestamp()
- self.exit_status = exit_status
- return True
+ if not self.machine.check('exit'):
+ return False
+ self.end_time = timeutils.current_timestamp()
+ self.exit_status = exit_status
+ return self.machine.transition('exit')
def write_stderr(self, value):
self.stderr.write(value)
@@ -68,10 +70,11 @@ def write_stdout(self, value):
self.stdout.write(value)
def done(self):
- if self.machine.transition("close"):
- self.stdout.close()
- self.stderr.close()
- return True
+ if not self.machine.check('close'):
+ return False
+ self.stdout.close()
+ self.stderr.close()
+ return self.machine.transition('close')
def handle_errback(self, result):
"""Handle an unexpected error while being run. This will likely be
@@ -79,7 +82,7 @@ def handle_errback(self, result):
something useful for debugging.
"""
log.error("Unknown failure for ActionCommand run %s: %s\n%s",
- self.id, self.command, str(result))
+ self.id, self.command, str(result))
self.exited(result)
self.done()
View
21 tron/core/actiongraph.py
@@ -5,6 +5,7 @@
log = logging.getLogger(__name__)
+# TODO: Move to tron.core.action
class ActionGraph(object):
"""A directed graph of actions and their requirements."""
@@ -46,8 +47,11 @@ def _build_dag(cls, actions, actions_config):
def actions_for_names(self, names):
return (self.action_map[name] for name in names)
+ def __getitem__(self, name):
+ return self.action_map[name]
-# TODO: this is a strange place for this class
+
+# TODO: this is a strange place for this class, move to actionrun
class ActionRunFactory(object):
"""Construct ActionRuns and ActionRunCollections for a JobRun and
ActionGraph.
@@ -96,21 +100,26 @@ def action_run_collection_from_state(cls, job_run, state_data):
@classmethod
def build_run_for_action(cls, action, job_run):
"""Create an ActionRun for a JobRun and Action."""
- id = "%s.%s" % (job_run.id, action.name)
node = action.node_pool.next() if action.node_pool else job_run.node
action_run = actionrun.ActionRun(
- id,
+ job_run.id,
+ action.name,
node,
job_run.run_time,
action.command,
parent_context=job_run.context,
- output_path=job_run.output_path,
+ output_path=job_run.output_path.clone(),
cleanup=action.is_cleanup
)
- action_run.attach(True, job_run)
+ job_run.watch(action_run)
return action_run
@classmethod
def action_run_from_state(cls, job_run, state_data):
- pass
+ """Restore an ActionRun for this JobRun from the state data."""
+ return actionrun.ActionRun.from_state(
+ state_data,
+ job_run.context,
+ job_run.output_path.clone()
+ )
View
204 tron/core/actionrun.py
@@ -3,6 +3,7 @@
"""
import logging
import traceback
+import itertools
from tron import command_context
from tron.core import action
from tron.serialize import filehandler
@@ -32,10 +33,11 @@ def __init__(self, action_run):
@property
def runid(self):
- # TODO: is this correct?
return self.action_run.id
- # TODO: actionname
+ @property
+ def actionname(self):
+ return self.action_run.action_name
@property
def node(self):
@@ -54,7 +56,7 @@ def __getitem__(self, name):
class ActionRun(Observer):
"""Tracks the state of a single run of an Action.
- ActionRuns Observer ActionCommands they create and are observed by a
+ ActionRuns observers ActionCommands they create and are observed by a
parent JobRun.
"""
STATE_CANCELLED = state.NamedEventState('cancelled')
@@ -94,23 +96,30 @@ class ActionRun(Observer):
# The set of states that are considered end states. Technically some of
# these states can be manually transitioned to other states.
END_STATES = set(
- (STATE_FAILED, STATE_SUCCEEDED, STATE_CANCELLED, STATE_SKIPPED))
+ (STATE_FAILED,
+ STATE_SUCCEEDED,
+ STATE_CANCELLED,
+ STATE_SKIPPED,
+ STATE_UNKNOWN)
+ )
FAILED_RENDER = 'false'
- def __init__(self, id, node, run_time, bare_command,
- parent_context=None, output_path=None, cleanup=False):
- self.id = id
+ def __init__(self, job_run_id, name, node, run_time, bare_command=None,
+ parent_context=None, output_path=None, cleanup=False,
+ start_time=None, end_time=None, run_state=STATE_SCHEDULED,
+ rendered_command=None):
+ self.job_run_id = job_run_id
+ self.action_name = name
self.node = node
- self.output_path = output_path # list of path parts
- self.bare_command = bare_command
+ self.output_path = output_path or filehandler.OutputPath()
+ self.bare_command = bare_command or rendered_command
self.run_time = run_time # parent JobRun start time
- self.start_time = None # ActionRun start time
- self.end_time = None
+ self.start_time = start_time # ActionRun start time
+ self.end_time = end_time
self.exit_status = None
- self.rendered_command = None
- self.machine = state.StateMachine(
- ActionRun.STATE_SCHEDULED, delegate=self)
+ self.rendered_command = rendered_command
+ self.machine = state.StateMachine(run_state, delegate=self)
context = ActionRunContext(self)
self.context = command_context.CommandContext(
context, parent_context)
@@ -124,26 +133,45 @@ def state(self):
def attach(self):
return self.machine.attach
+ @property
+ def id(self):
+ return "%s.%s" % (self.job_run_id, self.action_name)
+
def check_state(self, state):
"""Check if the state machine can be transitioned to state."""
return self.machine.check(state)
@classmethod
- def from_state(cls, state_data):
+ def from_state(cls, state_data, parent_context, output_path, cleanup=False):
"""Restore the state of this ActionRun from a serialized state."""
- # TODO:
- cls.id = state_data['id']
- cls.machine.state = state.named_event_by_name(
- cls.STATE_SCHEDULED, state_data['state'])
- cls.run_time = state_data['run_time']
- cls.start_time = state_data['start_time']
- cls.end_time = state_data['end_time']
- cls.rendered_command = state_data['command']
-
- # We were running when the state file was built, so we have no idea
- # what happened now.
- if cls.is_running:
- cls.machine.transition('fail_unknown')
+ node_pools = node.NodePoolStore.get_instance()
+
+ # Support state from older version
+ if 'id' in state_data:
+ job_run_id, action_name = state_data['id'].rsplit('.', 1)
+ else:
+ job_run_id = state_data['job_run_id']
+ action_name = state_data['action_name']
+
+ run = cls(
+ job_run_id,
+ action_name,
+ node_pools.get(state_data.get('node_name')),
+ state_data['run_time'],
+ parent_context,
+ output_path,
+ rendered_command=state_data['command'],
+ cleanup=cleanup,
+ start_time=state_data['start_time'],
+ end_time=state_data['end_time'],
+ run_state=state.named_event_by_name(
+ cls.STATE_SCHEDULED, state_data['state'])
+ )
+
+ # Transition running to fail unknown because exit status was missed
+ if run.is_running:
+ run.machine.transition('fail_unknown')
+ return run
def start(self):
"""Start this ActionRun."""
@@ -175,25 +203,9 @@ def build_action_command(self):
self.command,
filehandler.OutputStreamSerializer(self.output_path)
)
- self.watch(action_command, True)
+ self.watch(action_command)
return action_command
- def cancel(self):
- return self.machine.transition('cancel')
-
- def schedule(self):
- return self.machine.transition('schedule')
-
- def queue(self):
- return self.machine.transition('queue')
-
- def skip(self):
- """Mark the run as having been skipped."""
- return self.machine.transition('skip')
-
- def ready(self):
- return self.machine.transition('ready')
-
def watcher(self, action_command, event):
"""Observe ActionCommand state changes."""
log.debug("Action command state change: %s", action_command.state)
@@ -236,19 +248,21 @@ def fail_unknown(self):
def state_data(self):
"""This data is used to serialize the state of this action run."""
return {
- 'id': self.id,
+ 'job_run_id': self.job_run_id,
+ 'action_name': self.action_name,
'state': str(self.state),
'run_time': self.run_time,
'start_time': self.start_time,
'end_time': self.end_time,
'command': self.command,
- }
+ 'node_name': self.node.name if self.node else None
+ }
def repr_data(self, max_lines=None):
"""Return a dictionary that represents the external view of this
action run.
"""
- data = {
+ return {
'id': self.id,
'state': self.state.short_name,
'node': self.node.hostname,
@@ -259,7 +273,6 @@ def repr_data(self, max_lines=None):
'end_time': self.end_time,
'exit_status': self.exit_status,
}
- return data
def render_command(self):
"""Render our configured command using the command context."""
@@ -295,8 +308,11 @@ def is_done(self):
def __getattr__(self, name):
"""Support convenience properties for checking if this ActionRun is in
a specific state (Ex: self.is_running would check if self.state is
- STATE_RUNNING).
+ STATE_RUNNING) or for transitioning to a new state (ex: ready).
"""
+ if name in ['cancel', 'schedule', 'queue', 'skip', 'ready']:
+ return lambda: self.machine.transition(name)
+
state_name = name.replace('is_', 'state_').upper()
try:
return self.state == self.__getattribute__(state_name)
@@ -307,44 +323,49 @@ def __getattr__(self, name):
class ActionRunCollection(object):
"""A collection of ActionRuns used by a JobRun."""
- def __init__(self, run_map):
- self.run_map = run_map
+ def __init__(self, action_graph, run_map):
+ self.action_graph = action_graph
+ self.run_map = run_map
self.cleanup_action_run = self.run_map.get(action.CLEANUP_ACTION_NAME)
# Setup proxies
self.proxy_action_runs_with_cleanup = proxy.CollectionProxy(
self.action_runs_with_cleanup, [
- ('is_failure', any, False),
- ('is_starting', any, False),
- ('is_running', any, False),
- ('is_scheduled', any, False),
- ('is_unknown', any, False),
- ('is_queued', all, False),
- ('is_cancelled', all, False),
- ('is_skipped', all, False),
- ('is_done', all, False),
- ('check_state', all, True),
- ('cancel', all, True),
- ('succeed', all, True),
- ('fail', any, True)
+# ('is_failure', any, False),
+# ('is_starting', any, False),
+# ('is_running', any, False),
+# ('is_scheduled', any, False),
+# ('is_unknown', any, False),
+# ('is_queued', all, False),
+# ('is_cancelled', all, False),
+# ('is_skipped', all, False),
+# ('is_done', all, False),
+# ('check_state', all, True),
+# ('cancel', all, True),
+# ('succeed', all, True),
+# ('fail', any, True),
+ ('ready', all, True),
])
self.proxy_action_runs = proxy.CollectionProxy(
self.action_runs, [
- ('schedule', all, True),
- ('queue', all, True),
+# ('schedule', all, True),
+# ('queue', all, True),
])
def action_runs_for_names(self, names):
return (self.run_map[name] for name in names)
+ def action_runs_for_actions(self, actions):
+ return (self.run_map[a.name] for a in actions)
+
@property
def action_runs_with_cleanup(self):
- return self.run_map.values()
+ return self.run_map.itervalues()
@property
def action_runs(self):
- return [run for run in self.run_map.itervalues() if not run.is_cleanup]
+ return (run for run in self.run_map.itervalues() if not run.is_cleanup)
@property
def state_data(self):
@@ -355,12 +376,51 @@ def cleanup_action_state_data(self):
if self.cleanup_action_run:
return self.cleanup_action_run.state_data
+ def _get_runs_using(self, func, include_cleanup=False):
+ """Return an iterator of all the ActionRuns which cause func to return
+ True. func should be a callable that takes a single ActionRun and
+ returns True or False.
+ """
+ if include_cleanup:
+ action_runs = self.action_runs_with_cleanup
+ else:
+ action_runs = self.action_runs
+ return itertools.ifilter(func, action_runs)
+
def get_startable_actions(self):
- """Returns if there are any actions that are scheduled or queued
- that can be run. Otherwise returns false if all actions are done or
- are blocked on fail.
+ """Returns any actions that are scheduled or queued that can be run.
"""
+ def startable(action_run):
+ return (
+ (action_run.is_scheduled or action_run.is_queued) and
+ not self._is_blocked(action_run)
+ )
+ return self._get_runs_using(startable)
+
+
+# def get_unblocked_actions(self):
+# return self._get_runs_using(lambda ar: not self._is_blocked(ar))
+#
+# def get_blocked_actions(self):
+# return self._get_runs_using(self._is_blocked)
+
+ def _is_blocked(self, action_run):
+ """Returns True if this ActionRun is waiting on a required run to
+ finish before it can run.
+ """
+ if (action_run.is_done or
+ action_run.is_running or action_run.is_starting):
+ return False
+ required_actions = self.action_graph[action_run.name].required
+ if not required_actions:
+ return False
+
+ required_runs = self.action_runs_for_actions(required_actions)
+ return any(
+ self._is_blocked(run) or not run.is_done
+ for run in required_runs
+ )
# TODO: is this needed?
@property
View
3  tron/core/job.py
@@ -6,6 +6,7 @@
from tron.core import jobrun
from tron.core import actiongraph
from tron.core.actionrun import ActionRun
+from tron.serialize import filehandler
from tron.utils import timeutils
from tron.utils.observer import Observable, Observer
@@ -81,7 +82,7 @@ def __init__(self, name, scheduler, queueing=True, all_nodes=False,
self.node_pool = node_pool
self.context = command_context.CommandContext(
JobContext(self), parent_context)
- self.output_path = output_path or []
+ self.output_path = output_path or filehandler.OutputPath()
self.output_path.append(name)
@classmethod
View
13 tron/core/jobrun.py
@@ -9,6 +9,7 @@
from tron.core import actionrun
from tron.core.actionrun import ActionRun
from tron.core.actiongraph import ActionRunFactory
+from tron.serialize import filehandler
from tron.utils import timeutils
from tron.utils.observer import Observable, Observer
@@ -62,7 +63,7 @@ def __init__(self, job_name, run_num, run_time, node, output_path=None,
self.run_num = run_num
self.run_time = run_time
self.node = node
- self.output_path = output_path or []
+ self.output_path = output_path or filehandler.OutputPath()
self.output_path.append(self.id)
self.start_time = start_time
self.end_time = end_time
@@ -82,7 +83,7 @@ def id(self):
@classmethod
def for_job(cls, job, run_num, run_time, node):
"""Create a JobRun for a job."""
- run = cls(job.name, run_num, run_time, node, job.output_path,
+ run = cls(job.name, run_num, run_time, node, job.output_path.clone(),
job.context, action_graph=job.action_graph)
action_runs = ActionRunFactory.build_action_run_collection(run)
@@ -133,7 +134,7 @@ def register_action_runs(self, action_runs):
"""Store action runs and register callbacks."""
self.action_runs = action_runs
for action_run in action_runs:
- self.watch(action_run, True)
+ self.watch(action_run)
def seconds_until_run_time(self):
run_time = self.run_time
@@ -154,15 +155,11 @@ def _do_start(self):
self.start_time = timeutils.current_time()
try:
- for action_run in self.action_runs:
- action_run.ready()
+ self.action_runs.ready()
for action_run in self.action_runs.get_startable_actions():
action_run.start()
- if self.action_runs.cleanup_action_run:
- self.action_runs.cleanup_action_run.ready()
-
self.notify(self.EVENT_STARTED)
return True
except actionrun.Error, e:
View
2  tron/mcp.py
@@ -25,7 +25,7 @@
from tron.utils.observer import Observer, Observable
-log = logging.getLogger('tron.mcp')
+log = logging.getLogger(__name__)
STATE_FILE = 'tron_state.yaml'
STATE_SLEEP_SECS = 1
View
3  tron/node.py
@@ -91,6 +91,9 @@ def __getitem__(self, name):
def get(self, name, default=None):
return self.store.get(name, default)
+ def clear(self):
+ self.store.clear()
+
class NodePool(object):
"""A pool of Node objects."""
View
12 tron/serialize/filehandler.py
@@ -173,12 +173,12 @@ def open(self, filename):
class OutputPath(object):
- """A list like object which used to construct a file path for output. The
+ """A list like object used to construct a file path for output. The
file path is constructed by joining the base path with any additional
path elements.
"""
- def __init__(self, base, *path_parts):
+ def __init__(self, base='.', *path_parts):
self.base = base
self.parts = list(path_parts or [])
@@ -191,4 +191,10 @@ def __iter__(self):
yield p
def __str__(self):
- return os.path.join(*self)
+ return os.path.join(*self)
+
+ def clone(self, *parts):
+ """Return a new OutputPath object which has a base of the str value
+ of this object.
+ """
+ return type(self)(str(self), *parts)
Please sign in to comment.
Something went wrong with that request. Please try again.