Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
Checking mergeability… Don't worry, you can still create the pull request.
  • 15 commits
  • 25 files changed
  • 0 commit comments
  • 1 contributor
Commits on May 20, 2012
@dnephin dnephin Replace override_current_time with proper patching. Resolves #141. 89a227c
@dnephin dnephin Some cleanup of sandbox. It now uses the commands.client. aaec024
@dnephin dnephin Sandbox cleanup.
Check for SSH_AUTH_SOCK on startup
Remove time.sleep in trond_test
trond_test now runs in 7-10s
1c3cf86
@dnephin dnephin Check for $PYTHONPATH in sandbox as well. f297743
@dnephin dnephin tronfig use run_command, and ClientProxy catch RequestErrors and retu…
…rn False.
6c3df31
Commits on May 24, 2012
@dnephin dnephin Attempt to check for a tty width even if sys.stdout.isatty() returns …
…false. Needed for greping the output of tronview.
e1cdb9e
Commits on May 26, 2012
@dnephin dnephin Job runs which are restored to unknown status will now have an end ti…
…me. Moves Job run start and end time to be a computed value based on its runs.
3c75e90
@dnephin dnephin Add missing iteration module and tests. 1b219be
@dnephin dnephin Merge branch 'master' of github.com:Yelp/Tron into release_0.4.1 c450a86
Commits on May 29, 2012
@dnephin dnephin Job context should not be overriden on reconfigure. 14b6293
Commits on May 30, 2012
@dnephin dnephin Manually cancelled jobs should still continue to schedule new runs. f4e522f
@dnephin dnephin Merge pull request #157 from dnephin/fix_testing_sandbox
Fix testing sandbox
925635d
@dnephin dnephin Merge pull request #156 from dnephin/release_0.4.1
Release 0.4.1
b9d1776
@dnephin dnephin Merge pull request #159 from dnephin/r_0.4.1_schedule_on_cancel
Manually cancelled jobs should still continue to schedule new runs.
001894e
@dnephin dnephin Bump version to 0.4.1, and add release notes. 7f7d8f6
View
8 debian/changelog
@@ -1,3 +1,11 @@
+tron (0.4.1) unstable; urgency=low
+
+ * tronview will once again attempt to find the tty width even when stdout is not a tty.
+ * Fixed last_success for job context.
+ * Job runs which are manually cancelled will now continue to schedule new runs.
+
+ -- Daniel Nephin <dnephin@yelp.com> Wed, 30 May 2012 16:35:44 -0700
+
tron (0.4.0) unstable; urgency=low
* Jobs now continue to run all possible actions after one of its actions fail
View
6 docs/whats-new.rst
@@ -1,6 +1,12 @@
What's New
==========
+0.4.1
+-----
+* :ref:`tronview` will once again attempt to find the tty width even when stdout is not a tty.
+* Fixed last_success for job context.
+* Job runs which are manually cancelled will now continue to schedule new runs.
+
0.4.0
-----
View
22 tests/core/actionrun_test.py
@@ -2,9 +2,9 @@
import shutil
import tempfile
-from testify import run, setup, TestCase, assert_equal, turtle
+from testify import run, setup, TestCase, assert_equal, turtle, teardown
from testify.assertions import assert_raises, assert_in
-from testify.test_case import class_setup, class_teardown, teardown
+from tests import testingutils
from tests.assertions import assert_length
from tests.mocks import MockNode
from tests.testingutils import Turtle
@@ -17,16 +17,9 @@
from tron.serialize import filehandler
from tron.utils import timeutils
-class ActionRunContextTestCase(TestCase):
+class ActionRunContextTestCase(testingutils.MockTimeTestCase):
- @class_setup
- def freeze_time(self):
- timeutils.override_current_time(datetime.datetime.now())
- self.now = timeutils.current_time()
-
- @class_teardown
- def unfreeze_time(self):
- timeutils.override_current_time(None)
+ now = datetime.datetime.now()
@setup
def build_context(self):
@@ -338,7 +331,9 @@ def test__getattr__missing_attribute(self):
self.action_run.__getattr__, 'is_not_a_real_state')
-class ActionRunStateRestoreTestCase(TestCase):
+class ActionRunStateRestoreTestCase(testingutils.MockTimeTestCase):
+
+ now = datetime.datetime(2012, 3, 14, 15, 19)
@setup
def setup_action_run(self):
@@ -377,12 +372,15 @@ def test_from_state_running(self):
action_run = ActionRun.from_state(self.state_data,
self.parent_context, self.output_path, self.run_node)
assert action_run.is_unknown
+ assert_equal(action_run.exit_status, 0)
+ assert_equal(action_run.end_time, self.now)
def test_from_state_queued(self):
self.state_data['state'] = 'queued'
action_run = ActionRun.from_state(self.state_data, self.parent_context,
self.output_path, self.run_node)
assert action_run.is_failed
+ assert_equal(action_run.end_time, self.now)
def test_from_state_no_node_name(self):
del self.state_data['node_name']
View
17 tests/core/job_test.py
@@ -4,11 +4,11 @@
from tests import mocks
from tests.assertions import assert_length, assert_call
from tests.mocks import MockNode
-from tests.testingutils import MockReactorTestCase, Turtle
+from tests.testingutils import Turtle
+from tests import testingutils
from tron import node, event
from tron.core import job, jobrun
from tron.core.actionrun import ActionRun
-from tron.utils import timeutils
class JobContextTestCase(TestCase):
@@ -358,7 +358,9 @@ def __call__(self, *args, **kwargs):
return [self.manual_run]
-class JobSchedulerManualStartTestCase(TestCase):
+class JobSchedulerManualStartTestCase(testingutils.MockTimeTestCase):
+
+ now = datetime.datetime.now()
@setup
def setup_job(self):
@@ -375,13 +377,6 @@ def setup_job(self):
self.manual_run = Turtle()
self.job.build_new_runs = MockRunBuilder(manual_run=self.manual_run)
- self.now = datetime.datetime.now()
- timeutils.override_current_time(self.now)
-
- @teardown
- def teardown_timeutils(self):
- timeutils.override_current_time(None)
-
def test_manual_start(self):
manual_runs = self.job_scheduler.manual_start()
@@ -398,7 +393,7 @@ def test_manual_start_with_run_time(self):
assert_length(self.manual_run.start.calls, 1)
-class JobSchedulerScheduleTestCase(MockReactorTestCase):
+class JobSchedulerScheduleTestCase(testingutils.MockReactorTestCase):
module_to_mock = job
View
26 tests/core/jobrun_test.py
@@ -1,12 +1,12 @@
import datetime
import pytz
-from testify import TestCase, setup, assert_equal, teardown
+from testify import TestCase, setup, assert_equal
from testify.assertions import assert_in
from tests.assertions import assert_length, assert_raises, assert_call
from tests.mocks import MockNode
from tron.core import jobrun, actionrun
from tests.testingutils import Turtle
-from tron.utils import timeutils
+from tests import testingutils
class JobRunContextTestCase(TestCase):
@@ -23,11 +23,11 @@ def test_cleanup_job_status(self):
def test_cleanup_job_status_failure(self):
self.jobrun.action_runs.is_failed = True
- assert_equal(self.context.cleanup_job_status, 'FAILURE')
+class JobRunTestCase(testingutils.MockTimeTestCase):
-class JobRunTestCase(TestCase):
+ now = datetime.datetime(2012, 3, 14, 15, 9, 20)
@setup
def setup_jobrun(self):
@@ -44,10 +44,6 @@ def setup_jobrun(self):
self.job_run.watch = Turtle()
self.job_run.notify = Turtle()
- @teardown
- def teardown_jobrun(self):
- timeutils.override_current_time(None)
-
def test__init__(self):
assert_equal(self.job_run.job_name, 'jobname')
assert_equal(self.job_run.run_time, self.run_time)
@@ -103,15 +99,11 @@ def test_set_action_runs_duplicate(self):
self.job_run._set_action_runs, run_collection)
def test_seconds_until_run_time(self):
- now = datetime.datetime(2012, 3, 14, 15, 9, 20)
- timeutils.override_current_time(now)
seconds = self.job_run.seconds_until_run_time()
assert_equal(seconds, 6)
def test_seconds_until_run_time_with_tz(self):
self.job_run.run_time = self.run_time.replace(tzinfo=pytz.utc)
- now = datetime.datetime(2012, 3, 14, 15, 9, 20)
- timeutils.override_current_time(now)
seconds = self.job_run.seconds_until_run_time()
assert_equal(seconds, 6)
@@ -139,12 +131,10 @@ def test_start_no_startable_action_runs(self):
assert_length(self.job_run.notify.calls, 1)
def test_do_start(self):
- timeutils.override_current_time(self.run_time)
startable_runs = [Turtle(), Turtle(), Turtle()]
self.job_run.action_runs.get_startable_action_runs = lambda: startable_runs
assert self.job_run._do_start()
- assert_equal(self.job_run.start_time, self.run_time)
assert_call(self.job_run.action_runs.ready, 0)
for i, startable_run in enumerate(startable_runs):
assert_call(startable_run.start, 0)
@@ -153,19 +143,15 @@ def test_do_start(self):
assert_call(self.job_run.notify, 0, self.job_run.EVENT_STARTED)
def test_do_start_all_failed(self):
- timeutils.override_current_time(self.run_time)
self.job_run._start_action_runs = lambda: [None]
assert not self.job_run._do_start()
- assert_equal(self.job_run.start_time, self.run_time)
assert_length(self.job_run.notify.calls, 0)
def test_do_start_some_failed(self):
- timeutils.override_current_time(self.run_time)
self.job_run._start_action_runs = lambda: [True, None]
assert self.job_run._do_start()
- assert_equal(self.job_run.start_time, self.run_time)
assert_length(self.job_run.notify.calls, 1)
assert_call(self.job_run.notify, 0, self.job_run.EVENT_STARTED)
@@ -251,19 +237,15 @@ def test_state_with_no_action_runs(self):
assert_equal(self.job_run.state, actionrun.ActionRun.STATE_UNKNOWN)
def test_finalize(self):
- timeutils.override_current_time(self.run_time)
self.job_run.action_runs.is_failed = False
self.job_run.finalize()
assert_call(self.job_run.notify, 0, self.job_run.EVENT_SUCCEEDED)
assert_call(self.job_run.notify, 1, self.job_run.NOTIFY_DONE)
- assert_equal(self.job_run.end_time, self.run_time)
def test_finalize_failure(self):
- timeutils.override_current_time(self.run_time)
self.job_run.finalize()
assert_call(self.job_run.notify, 0, self.job_run.EVENT_FAILED)
assert_call(self.job_run.notify, 1, self.job_run.NOTIFY_DONE)
- assert_equal(self.job_run.end_time, self.run_time)
def test_cleanup(self):
self.job_run.clear_observers = Turtle()
View
10 tests/data/logging.conf
@@ -8,26 +8,26 @@ keys=fileHandler
keys=defaultFormatter
[logger_root]
-level=DEBUG
+level=WARN
handlers=fileHandler
[logger_twisted]
-level=WARNING
+level=WARN
handlers=fileHandler
qualname=twisted
propagate=0
[logger_tron]
-level=INFO
+level=WARN
handlers=fileHandler
qualname=tron
propagate=0
[handler_fileHandler]
class=logging.FileHandler
-level=DEBUG
+level=WARN
formatter=defaultFormatter
-args=('tron.log',)
+args=('{0}',)
[formatter_defaultFormatter]
format=%(asctime)s %(name)s %(levelname)s %(message)s
View
4 tests/data/test_config.yaml
@@ -162,11 +162,11 @@ jobs:
schedule: "daily 16:00:00"
actions:
- name: "task0"
- command: "%(ECHO)s %(actionname)s"
+ command: "%(ECHO)s %(actionname)s 1 && false"
node: box2
requires: ["task1"]
- name: "task1"
- command: "sleep 10 && %(ECHO)s %(actionname)s"
+ command: "sleep 10 && %(ECHO)s %(actionname)s %(last_success:shortdate)s"
node: pool0
cleanup_action:
command: "%(ECHO)s %(actionname)s %(cleanup_job_status)s"
View
1  tests/mcp_reconfigure_test.py
@@ -237,6 +237,7 @@ def test_job_changed(self):
assert new_job_sched.job.runs.runs[1].is_starting
assert new_job_sched.job.runs.runs[0].is_scheduled
assert_equal(job_sched.job.context['a_variable'], 'is_constant')
+ assert new_job_sched.job.context.base.job is new_job_sched.job
@suite('integration')
def test_job_new(self):
View
21 tests/mcp_test.py
@@ -3,31 +3,26 @@
import shutil
import StringIO
import tempfile
+import time
+import yaml
-from testify import TestCase, class_setup, class_teardown, setup, teardown
+from testify import TestCase, setup, teardown
from testify import assert_raises, assert_equal, suite, run
from testify.utils import turtle
-import time
-import yaml
+
+from tests import testingutils
from tests.assertions import assert_length
from tests.mocks import MockNode
+
import tron
from tron.config import config_parse
-
from tron.core import job, actionrun
from tron import mcp, scheduler, event, node, service
-from tron.utils import timeutils
-class StateHandlerIntegrationTestCase(TestCase):
- @class_setup
- def class_setup_time(self):
- timeutils.override_current_time(datetime.datetime.now())
- self.now = timeutils.current_time()
+class StateHandlerIntegrationTestCase(testingutils.MockTimeTestCase):
- @class_teardown
- def class_teardown_time(self):
- timeutils.override_current_time(None)
+ now = datetime.datetime.now()
def _advance_run(self, job_run, state):
for action_run in job_run.action_runs:
View
328 tests/sandbox.py
@@ -1,51 +1,37 @@
-from __future__ import with_statement
-
import logging
import os
import shutil
import signal
+import socket
from subprocess import Popen, PIPE, CalledProcessError
import sys
import tempfile
import time
+import contextlib
+import functools
-from testify import turtle
+from testify import TestCase, setup, teardown, turtle
-from tron import commands
from tron.commands import client
-# Used for getting the locations of the executables
+# Used for getting the locations of the executable
_test_folder, _ = os.path.split(__file__)
-_repo_root, _ = os.path.split(_test_folder)
+_repo_root, _ = os.path.split(_test_folder)
log = logging.getLogger(__name__)
-def wait_for_sandbox_success(func, delay=0.1, max_wait=5.0):
- """Call *func* repeatedly until it stops throwing TronSandboxException.
- Wait increasing amounts from *start_delay* but wait no more than a total
- of *stop_at* seconds
+def wait_on_sandbox(func, delay=0.1, max_wait=5.0):
+ """Poll for func() to return True. Sleeps `delay` seconds between polls
+ up to a max of `max_wait` seconds.
"""
start_time = time.time()
while time.time() - start_time < max_wait:
time.sleep(delay)
- try:
- return func()
- except TronSandboxException:
- pass
- raise
-
-
-def make_file_existence_sandbox_exception_thrower(path):
- def func():
- if not os.path.exists(path):
- raise TronSandboxException('File does not exist: %s' % path)
- return func
-
-def wait_for_file_to_exist(path, max_wait=5.0):
- func = make_file_existence_sandbox_exception_thrower(path)
- wait_for_sandbox_success(func, max_wait=max_wait)
+ if func():
+ return
+ raise TronSandboxException("Failed %s" % func.__name__)
def handle_output(cmd, (stdout, stderr), returncode):
@@ -56,207 +42,155 @@ def handle_output(cmd, (stdout, stderr), returncode):
log.info("%s: %s", cmd, stdout)
if stderr:
log.warning("%s: %s", cmd, stderr)
- if returncode != 0:
+ if returncode:
raise CalledProcessError(returncode, cmd)
+def find_unused_port():
+ """Return a port number that is not in use."""
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ with contextlib.closing(sock) as sock:
+ sock.bind(('localhost', 0))
+ _, port = sock.getsockname()
+ return port
+
+
class TronSandboxException(Exception):
pass
-class MockConfigOptions(object):
+class SandboxTestCase(TestCase):
- def __init__(self, server):
- self.server = server
+ _suites = ['sandbox']
+ @setup
+ def make_sandbox(self):
+ self.sandbox = TronSandbox()
-class TronSandbox(object):
+ @teardown
+ def delete_sandbox(self):
+ self.sandbox.delete()
+ self.sandbox = None
- def __init__(self):
- """Set up a temp directory and store paths to relevant binaries"""
- super(TronSandbox, self).__init__()
- self.tmp_dir = tempfile.mkdtemp(prefix='tron-')
- self.tron_bin = os.path.join(_repo_root, 'bin')
- self.tronctl_bin = os.path.join(self.tron_bin, 'tronctl')
- self.trond_bin = os.path.join(self.tron_bin, 'trond')
- self.tronfig_bin = os.path.join(self.tron_bin, 'tronfig')
- self.tronview_bin = os.path.join(self.tron_bin, 'tronview')
- self.log_file = 'tron.log'
- self.log_conf = 'tests/data/logging.conf'
+class ClientProxy(object):
+ """Wrap calls to client and raise a TronSandboxException on connection
+ failures.
+ """
- self.pid_file = os.path.join(self.tmp_dir, 'tron.pid')
- self.config_file = os.path.join(self.tmp_dir, 'tron_config.yaml')
+ def __init__(self, client, log_filename):
+ self.client = client
+ self.log_filename = log_filename
- self.port = 8089
- self.host = 'localhost'
+ def log_contents(self):
+ """Return the contents of the log file."""
+ with open(self.log_filename, 'r') as f:
+ return f.read()
- self.run_time = None
+ def wrap(self, func, *args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except client.RequestError, e:
+ log.warn("%r, Log:\n%s" % (e, self.log_contents()))
+ return False
- self.trond_debug_args = ['--working-dir=%s' % self.tmp_dir,
- '--pid-file=%s' % self.pid_file,
- '--port=%d' % self.port,
- '--host=%s' % self.host,
- '--config=%s' % self.config_file,
- '--log-conf=%s' % self.log_conf,
- '--verbose', '--verbose']
+ def __getattr__(self, name):
+ attr = getattr(self.client, name)
+ if not callable(attr):
+ return attr
- self.tron_server_address = '%s:%d' % (self.host, self.port)
- self.tron_server_uri = 'http://%s' % self.tron_server_address
- self.tron_server_arg = '--server=%s' % self.tron_server_address
+ return functools.partial(self.wrap, attr)
- # mock a config object
- self.config_obj = MockConfigOptions(self.tron_server_uri)
- commands.save_config(self.config_obj)
- self._last_trond_launch_args = []
+class TronSandbox(object):
+ """A sandbox for running trond and tron commands in subprocesses."""
- def log_contents(self):
- with open(self.log_file, 'r') as f:
- return f.read()
+ def __init__(self):
+ """Set up a temp directory and store paths to relevant binaries"""
+ self.verify_environment()
+ self.tmp_dir = tempfile.mkdtemp(prefix='tron-')
+ cmd_path_func = functools.partial(os.path.join, _repo_root, 'bin')
+ cmds = 'tronctl', 'trond', 'tronfig', 'tronview'
+ self.commands = dict((cmd, cmd_path_func(cmd)) for cmd in cmds)
+ self.log_file = self.abs_path('tron.log')
+ self.log_conf = self.abs_path('logging.conf')
+ self.pid_file = self.abs_path('tron.pid')
+ self.config_file = self.abs_path('tron_config.yaml')
+ self.port = find_unused_port()
+ self.host = 'localhost'
+ self.api_uri = 'http://%s:%s' % (self.host, self.port)
+ client_config = turtle.Turtle(server=self.api_uri,
+ warn=False, num_displays=100)
+ cclient = client.Client(client_config)
+ self.client = ClientProxy(cclient, self.log_file)
+ self.setup_logging_conf()
+
+ def abs_path(self, filename):
+ """Return the absolute path for a file in the sandbox."""
+ return os.path.join(self.tmp_dir, filename)
+
+ def setup_logging_conf(self):
+ config_template = os.path.join(_repo_root, 'tests/data/logging.conf')
+ with open(config_template, 'r') as fh:
+ config = fh.read()
+
+ with open(self.log_conf, 'w') as fh:
+ fh.write(config.format(self.log_file))
+
+ def verify_environment(self):
+ ssh_sock = 'SSH_AUTH_SOCK'
+ msg = "Missing $%s in test environment."
+ if not os.environ.get(ssh_sock):
+ raise TronSandboxException(msg % ssh_sock)
+
+ path = 'PYTHONPATH'
+ if not os.environ.get(path):
+ raise TronSandboxException(msg % path)
def delete(self):
- """Delete the temp directory and its contents"""
+ """Delete the temp directory and shutdown trond."""
if os.path.exists(self.pid_file):
- self.stop_trond()
+ with open(self.pid_file, 'r') as f:
+ os.kill(int(f.read()), signal.SIGKILL)
shutil.rmtree(self.tmp_dir)
- os.unlink(self.log_file)
- self.tmp_dir = None
- self.tron_bin = None
- self.tronctl_bin = None
- self.trond_bin = None
- self.tronfig_bin = None
- self.tronview_bin = None
- self.tron_server_uri = None
def save_config(self, config_text):
- """Save a tron configuration to tron_config.yaml. Mainly useful for
- setting trond's initial configuration.
- """
+ """Save the initial tron configuration."""
with open(self.config_file, 'w') as f:
f.write(config_text)
- return config_text
-
- ### trond control ###
-
- def start_trond(self, args=None):
- """Start trond"""
- args = args or []
- self._last_trond_launch_args = args
- command = [sys.executable, self.trond_bin] + self.trond_debug_args + args
- p = Popen(command, stdout=PIPE, stderr=PIPE)
-
- handle_output(command, p.communicate(), p.returncode)
-
- # make sure trond has actually launched
- wait_for_sandbox_success(self.list_all)
-
- # (but p.communicate() already waits for the process to exit... -Steve)
- return p.wait()
-
- def stop_trond(self):
- """Stop trond based on the tron.pid in the temp directory"""
- with open(self.pid_file, 'r') as f:
- os.kill(int(f.read()), signal.SIGKILL)
-
- def restart_trond(self, args=None):
- """Stop and start trond"""
- if args == None:
- args = self._last_trond_launch_args
- self.stop_tron()
- self.start_tron(args=args)
-
- ### www API ###
-
- def _check_call_api(self, uri, data=None):
- commands.load_config(self.config_obj)
- status, content = client.request(self.tron_server_uri, uri, data=data)
-
- if status != client.OK or not content:
- log.warning('trond appears to have crashed. Log:')
- log.warning(self.log_contents())
- raise TronSandboxException("Error connecting to tron server at %s%s" % (self.tron_server_uri, uri))
-
- return content
-
- def upload_config(self, config_text):
- """Upload a tron configuration to the server"""
- return self._check_call_api('/config', {'config': config_text})
-
- def get_config(self):
- """Get the text of the current configuration"""
- return self._check_call_api('/config')['config']
-
- def ctl(self, command, arg=None, run_time=None):
- """Call the www API like tronctl does. ``command`` can be one of
- ``(start, cancel, disable, enable, disableall, enableall, fail, succeed)``.
- ``run_time`` should be of the form ``YYYY-MM-DD HH:MM:SS``.
- """
- data = {'command': command}
-
- if run_time is not None:
- data['run_time'] = run_time
-
- if arg is not None:
- options = turtle.Turtle(server=self.tron_server_uri)
- cclient = client.Client(options)
- full_uri = cclient.get_url_from_identifier(arg)
-
- else:
- full_uri = '/jobs'
-
- self._check_call_api(full_uri, data=data)
-
- def list_all(self):
- """Call the www API to list jobs and services."""
- return self._check_call_api('/')
-
- def list_events(self):
- """Call the www API to list all events."""
- return self._check_call_api('/events')
-
- def list_job(self, job_name):
- """Call the www API to list all runs of one job."""
- return self._check_call_api('/jobs/%s' % job_name)
-
- def list_job_events(self, job_name):
- """Call the www API to list all events of one job."""
- return self._check_call_api('/jobs/%s/_events' % job_name)
-
- def list_job_run(self, job_name, run_number):
- """Call the www API to list all actions of one job run."""
- return self._check_call_api('/jobs/%s/%d' % (job_name, run_number))
-
- def list_job_run_events(self, job_name, run_number):
- """Call the www API to list all actions of one job run."""
- return self._check_call_api('/jobs/%s/%d/_events' % (job_name, run_number))
-
- def list_action_run(self, job_name, run_number, action_name, num_lines=100):
- """Call the www API to display the results of an action."""
- return self._check_call_api('/jobs/%s/%d/%s?num_lines=%d' % (job_name, run_number, action_name, num_lines))
-
- def list_service(self, service_name):
- return self._check_call_api('/services/%s' % service_name)
-
- def list_service_events(self, service_name):
- return self._check_call_api('/services/%s/_events' % service_name)
- ### Basic subprocesses ###
+ def run_command(self, command_name, args=None, stdin_lines=None):
+ """Run the command by name and return (stdout, stderr)."""
+ args = args or []
+ command = [sys.executable, self.commands[command_name]] + args
+ stdin = PIPE if stdin_lines else None
+ proc = Popen(command, stdout=PIPE, stderr=PIPE, stdin=stdin)
+ streams = proc.communicate(stdin_lines)
+ handle_output(command, streams, proc.returncode)
+ return streams
def tronctl(self, args=None):
- """Call tronctl with args and return ``(stdout, stderr)``"""
- args = args or []
- command = [sys.executable, self.tronctl_bin] + args
- p = Popen(command, stdout=PIPE, stderr=PIPE)
- retval = p.communicate()
- handle_output(command, retval, p.returncode)
- return retval
+ args = list(args) if args else []
+ return self.run_command('tronctl', args + ['--server', self.api_uri])
def tronview(self, args=None):
- """Call tronview with args and return ``(stdout, stderr)``"""
- args = args or ['--nocolor']
- command = [sys.executable, self.tronview_bin] + args
- p = Popen(command, stdout=PIPE, stderr=PIPE)
- retval = p.communicate()
- handle_output(command, retval, p.returncode)
- return retval
+ args = list(args) if args else []
+ args += ['--nocolor', '--server', self.api_uri]
+ return self.run_command('tronview', args)
+
+ def trond(self, args=None):
+ args = list(args) if args else []
+ args += ['--working-dir=%s' % self.tmp_dir,
+ '--pid-file=%s' % self.pid_file,
+ '--port=%d' % self.port,
+ '--host=%s' % self.host,
+ '--config=%s' % self.config_file,
+ '--log-conf=%s' % self.log_conf]
+
+ self.run_command('trond', args)
+ wait_on_startup = lambda: bool(self.client.home())
+ wait_on_sandbox(wait_on_startup)
+
+ def tronfig(self, config_content):
+ args = ['--server', self.api_uri, '-']
+ return self.run_command('tronfig', args, stdin_lines=config_content)
View
100 tests/scheduler_test.py
@@ -2,41 +2,39 @@
import datetime
import pytz
-from testify import setup, teardown, run, TestCase, assert_equal
+from testify import setup, run, assert_equal
from testify import assert_gte, assert_lte, assert_gt, assert_lt
+from tests import testingutils
from tron import scheduler
from tron.config.schedule_parse import parse_daily_expression as parse_daily
from tron.utils import timeutils
-class ConstantSchedulerTest(TestCase):
+class ConstantSchedulerTest(testingutils.MockTimeTestCase):
+
+ now = datetime.datetime(2012, 3, 14)
@setup
def build_scheduler(self):
self.scheduler = scheduler.ConstantScheduler()
def test_next_run_time(self):
- current_time = timeutils.current_time()
scheduled_time = self.scheduler.next_run_time(None)
- assert current_time <= scheduled_time <= timeutils.current_time()
+ assert_equal(scheduled_time, self.now)
def test__str__(self):
assert_equal(str(self.scheduler), "CONSTANT")
-class DailySchedulerTest(TestCase):
+class DailySchedulerTestCase(testingutils.MockTimeTestCase):
+
+ now = datetime.datetime.now().replace(hour=15, minute=0)
@setup
def build_scheduler(self):
- self.now = datetime.datetime.now().replace(hour=15, minute=0)
- timeutils.override_current_time(self.now)
self.scheduler = scheduler.DailyScheduler(timestr='14:30')
- @teardown
- def unset_time(self):
- timeutils.override_current_time(None)
-
def test_next_run_time(self):
one_day = datetime.timedelta(days=1)
today = self.now.date()
@@ -53,22 +51,18 @@ def test__str__(self):
assert_equal(str(self.scheduler), "DAILY")
-class DailySchedulerTimeTestBase(TestCase):
+class DailySchedulerTimeTestBase(testingutils.MockTimeTestCase):
+
+ now = datetime.datetime(2012, 3, 14, 15, 9, 26)
+
@setup
def build_scheduler(self):
self.scheduler = scheduler.DailyScheduler(timestr='14:30')
- @teardown
- def unset_time(self):
- timeutils.override_current_time(None)
-
class DailySchedulerTodayTest(DailySchedulerTimeTestBase):
- @setup
- def set_time(self):
- self.now = datetime.datetime.now().replace(hour=12, minute=0)
- timeutils.override_current_time(self.now)
+ now = datetime.datetime.now().replace(hour=12, minute=0)
def test(self):
# If we schedule a job for later today, it should run today
@@ -83,10 +77,7 @@ def test(self):
class DailySchedulerTomorrowTest(DailySchedulerTimeTestBase):
- @setup
- def set_time(self):
- self.now = datetime.datetime.now().replace(hour=15, minute=0)
- timeutils.override_current_time(self.now)
+ now = datetime.datetime.now().replace(hour=15, minute=0)
def test(self):
# If we schedule a job for later today, it should run today
@@ -102,10 +93,7 @@ def test(self):
class DailySchedulerLongJobRunTest(DailySchedulerTimeTestBase):
- @setup
- def set_time(self):
- self.now = datetime.datetime.now().replace(hour=12, minute=0)
- timeutils.override_current_time(self.now)
+ now = datetime.datetime.now().replace(hour=12, minute=0)
def test_long_jobs_dont_wedge_scheduler(self):
# Advance days twice as fast as they are scheduled, demonstrating
@@ -118,16 +106,12 @@ def test_long_jobs_dont_wedge_scheduler(self):
assert_equal(next_run, last_run + datetime.timedelta(days=1))
self.now += datetime.timedelta(days=2)
- timeutils.override_current_time(self.now)
-
last_run = next_run
-class DailySchedulerDSTTest(TestCase):
+class DailySchedulerDSTTest(testingutils.MockTimeTestCase):
- @teardown
- def unset_time(self):
- timeutils.override_current_time(None)
+ now = datetime.datetime(2011, 11, 6, 1, 10, 0)
def hours_until_time(self, run_time, sch):
tz = sch.time_zone
@@ -140,13 +124,8 @@ def hours_diff_at_datetime(self, sch, *args, **kwargs):
"""Return the number of hours until the next *two* runs of a job with
the given scheduler
"""
- # if you need to print a datetime with tz info, use this:
- # fmt = '%Y-%m-%d %H:%M:%S %Z%z'
- # my_datetime.strftime(fmt)
-
- now = datetime.datetime(*args, **kwargs)
- timeutils.override_current_time(now)
- next_run = sch.next_run_time(now)
+ self.now = datetime.datetime(*args, **kwargs)
+ next_run = sch.next_run_time(self.now)
t1 = self.hours_until_time(next_run, sch)
next_run = sch.next_run_time(next_run.replace(tzinfo=None))
t2 = self.hours_until_time(next_run, sch)
@@ -181,10 +160,7 @@ def test_fall_back(self):
def test_correct_time(self):
sch = scheduler.DailyScheduler(time_zone=pytz.timezone('US/Pacific'))
-
- now = datetime.datetime(2011, 11, 6, 1, 10, 0)
- timeutils.override_current_time(now)
- next_run_time = sch.next_run_time(now)
+ next_run_time = sch.next_run_time(self.now)
assert_equal(next_run_time.hour, 0)
def test_spring_forward(self):
@@ -211,16 +187,9 @@ def test_spring_forward(self):
self._assert_range(s1a - s2a, -0.61, -0.59)
-class ComplexParserTest(TestCase):
-
- @setup
- def build_scheduler(self):
- self.today = datetime.datetime(2011, 6, 1)
- timeutils.override_current_time(self.today)
+class ComplexParserTest(testingutils.MockTimeTestCase):
- @teardown
- def unset_time(self):
- timeutils.override_current_time(None)
+ now = datetime.datetime(2011, 6, 1)
def test_parse_all(self):
cfg = parse_daily('1st,2nd,3rd,4th monday,Tue of march,apr,September at 00:00')
@@ -270,7 +239,7 @@ def test_daily(self):
sch = scheduler.DailyScheduler(**cfg._asdict())
next_run_date = sch.next_run_time(None)
- assert_gte(next_run_date, self.today)
+ assert_gte(next_run_date, self.now)
assert_equal(next_run_date.month, 6)
assert_equal(next_run_date.day, 2)
assert_equal(next_run_date.hour, 0)
@@ -280,8 +249,8 @@ def test_daily_with_time(self):
sch = scheduler.DailyScheduler(**cfg._asdict())
next_run_date = sch.next_run_time(None)
- assert_gte(next_run_date, self.today)
- assert_equal(next_run_date.year, self.today.year)
+ assert_gte(next_run_date, self.now)
+ assert_equal(next_run_date.year, self.now.year)
assert_equal(next_run_date.month, 6)
assert_equal(next_run_date.day, 1)
assert_equal(next_run_date.hour, 2)
@@ -292,7 +261,7 @@ def test_weekly(self):
sch = scheduler.DailyScheduler(**cfg._asdict())
next_run_date = sch.next_run_time(None)
- assert_gte(next_run_date, self.today)
+ assert_gte(next_run_date, self.now)
assert_equal(calendar.weekday(next_run_date.year,
next_run_date.month,
next_run_date.day), 0)
@@ -302,8 +271,8 @@ def test_weekly_in_month(self):
sch = scheduler.DailyScheduler(**cfg._asdict())
next_run_date = sch.next_run_time(None)
- assert_gte(next_run_date, self.today)
- assert_equal(next_run_date.year, self.today.year+1)
+ assert_gte(next_run_date, self.now)
+ assert_equal(next_run_date.year, self.now.year+1)
assert_equal(next_run_date.month, 1)
assert_equal(next_run_date.hour, 0)
assert_equal(next_run_date.minute, 1)
@@ -316,11 +285,13 @@ def test_monthly(self):
sch = scheduler.DailyScheduler(**cfg._asdict())
next_run_date = sch.next_run_time(None)
- assert_gt(next_run_date, self.today)
+ assert_gt(next_run_date, self.now)
assert_equal(next_run_date.month, 7)
-class IntervalSchedulerTest(TestCase):
+class IntervalSchedulerTest(testingutils.MockTimeTestCase):
+
+ now = datetime.datetime(2012, 3, 14)
@setup
def build_scheduler(self):
@@ -328,9 +299,8 @@ def build_scheduler(self):
self.scheduler = scheduler.IntervalScheduler(self.interval)
def test_next_run_time(self):
- current_time = timeutils.current_time()
- run_time = self.scheduler.next_run_time(current_time)
- assert_gte(current_time + self.interval, run_time)
+ run_time = self.scheduler.next_run_time(self.now)
+ assert_equal(self.now + self.interval, run_time)
def test__str__(self):
assert_equal(str(self.scheduler), "INTERVAL:%s" % self.interval)
View
22 tests/testingutils.py
@@ -3,6 +3,8 @@
from testify import TestCase, setup
from testify import class_setup, class_teardown
+from testify import teardown
+from tron.utils import timeutils
log = logging.getLogger(__name__)
@@ -43,7 +45,7 @@ class MockReactorTestCase(TestCase):
@class_setup
def class_setup_patched_reactor(self):
- msg = "%s must set a reactor_to_mock field" % self.__class__
+ msg = "%s must set a module_to_mock field" % self.__class__
assert self.module_to_mock, msg
self.old_reactor = getattr(self.module_to_mock, 'reactor')
@@ -57,6 +59,24 @@ def setup_mock_reactor(self):
setattr(self.module_to_mock, 'reactor', self.reactor)
+class MockTimeTestCase(TestCase):
+
+ now = None
+
+ @setup
+ def setup_current_time(self):
+ assert self.now, "%s must set a now field" % self.__class__
+ self.old_current_time = timeutils.current_time
+ timeutils.current_time = lambda: self.now
+
+ @teardown
+ def teardown_current_time(self):
+ timeutils.current_time = self.old_current_time
+ # Reset 'now' back to what was set on the class because some test may
+ # have changed it
+ self.now = self.__class__.now
+
+
class Turtle(object):
"""A more complete Mock implementation."""
def __init__(self, *args, **kwargs):
View
258 tests/trond_test.py
@@ -1,14 +1,10 @@
import datetime
import os
-import sys
from textwrap import dedent
-import time
-from testify import TestCase, setup, teardown, assert_equal, assert_in, suite
+from testify import assert_equal
from testify import assert_gt
-
-from tests.sandbox import TronSandbox, TronSandboxException
-from tests.sandbox import wait_for_file_to_exist
+from tests import sandbox
BASIC_CONFIG = """
@@ -31,91 +27,77 @@
DOUBLE_ECHO_CONFIG = SINGLE_ECHO_CONFIG + """
- name: "another_echo_action"
- command: "echo 'Today is %(shortdate)s, which is the same as %(year)s-%(month)s-%(day)s' && false" """
+ command: "echo 'Today is %(shortdate)s, which is the same
+ as %(year)s-%(month)s-%(day)s' && false" """
TOUCH_CLEANUP_FMT = """
cleanup_action:
- command: "touch %s.%%(runid)s" """
-
-
-class SandboxTestCase(TestCase):
-
- @setup
- def make_sandbox(self):
- self.sandbox = TronSandbox()
-
- @teardown
- def delete_sandbox(self):
- self.sandbox.delete()
- self.sandbox = None
+ command: "echo 'at last'"
+"""
-class BasicTronTestCase(SandboxTestCase):
+class TrondTestCase(sandbox.SandboxTestCase):
- @suite('sandbox')
def test_end_to_end_basic(self):
+ client = self.sandbox.client
# start with a basic configuration
self.sandbox.save_config(SINGLE_ECHO_CONFIG)
- self.sandbox.start_trond()
+ self.sandbox.trond()
# make sure it got in
- assert_equal(self.sandbox.get_config(), SINGLE_ECHO_CONFIG)
+ assert_equal(client.config(), SINGLE_ECHO_CONFIG)
# reconfigure and confirm results
- canary = os.path.join(self.sandbox.tmp_dir, 'end_to_end_done')
- second_config = DOUBLE_ECHO_CONFIG + TOUCH_CLEANUP_FMT % canary
- self.sandbox.upload_config(second_config)
- assert_equal(self.sandbox.list_events()['data'][0]['name'], 'restoring')
- assert_equal(self.sandbox.list_events()['data'][1]['name'], 'run_created')
- assert_equal(self.sandbox.get_config(), second_config)
-
- expected = {'jobs': [
- {
- 'action_names': ['echo_action', 'cleanup', 'another_echo_action'],
- 'status': 'ENABLED',
- 'href': '/jobs/echo_job',
- 'last_success': None,
- 'name': 'echo_job',
- 'scheduler': 'INTERVAL:1:00:00',
- 'node_pool': ['localhost'],
- 'runs': None
- }
- ],
+ second_config = DOUBLE_ECHO_CONFIG + TOUCH_CLEANUP_FMT
+ self.sandbox.tronfig(second_config)
+ events = client.events()
+ assert_equal(events[0]['name'], 'restoring')
+ assert_equal(events[1]['name'], 'run_created')
+ assert_equal(client.config(), second_config)
+
+ job = {
+ 'action_names': ['echo_action', 'cleanup', 'another_echo_action'],
+ 'status': 'ENABLED',
+ 'href': '/jobs/echo_job',
+ 'last_success': None,
+ 'name': 'echo_job',
+ 'scheduler': 'INTERVAL:1:00:00',
+ 'node_pool': ['localhost'],
+ 'runs': None
+ }
+ expected = {
+ 'jobs': [job],
'status_href': '/status',
'jobs_href': '/jobs',
'config_href': '/config',
'services': [],
'services_href': '/services'
}
- result = self.sandbox.list_all()
+ result = self.sandbox.client.home()
assert_equal(result, expected)
# run the job and check its output
- self.sandbox.ctl('start', 'echo_job')
-
- try:
- wait_for_file_to_exist(canary + '.echo_job.1')
- except TronSandboxException:
- print >> sys.stderr, "trond appears to have crashed. Log:"
- print >> sys.stderr, self.sandbox.log_contents()
- raise
-
- echo_action_run = self.sandbox.list_action_run(
- 'echo_job', 1, 'echo_action')
- another_echo_action_run = self.sandbox.list_action_run(
- 'echo_job', 1, 'another_echo_action')
+ self.sandbox.tronctl(['start', 'echo_job'])
+
+ def wait_on_cleanup():
+ return (len(client.job('echo_job')['runs']) >= 2 and
+ client.action('echo_job.1.echo_action')['state'] == 'SUCC')
+ sandbox.wait_on_sandbox(wait_on_cleanup)
+
+ echo_action_run = client.action('echo_job.1.echo_action')
+ other_act_run = client.action('echo_job.1.another_echo_action')
assert_equal(echo_action_run['state'], 'SUCC')
assert_equal(echo_action_run['stdout'], ['Echo!'])
- assert_equal(another_echo_action_run['state'], 'FAIL')
- assert_equal(another_echo_action_run['stdout'],
- [datetime.datetime.now().strftime(
- 'Today is %Y-%m-%d, which is the same as %Y-%m-%d')])
- assert_equal(self.sandbox.list_job_run('echo_job', 1)['state'],
- 'FAIL')
-
- @suite('sandbox')
+ assert_equal(other_act_run['state'], 'FAIL')
+
+ now = datetime.datetime.now()
+ stdout = now.strftime('Today is %Y-%m-%d, which is the same as %Y-%m-%d')
+ assert_equal(other_act_run['stdout'], [stdout])
+
+ assert_equal(client.job_runs('echo_job.1')['state'], 'FAIL')
+
def test_tronview_basic(self):
self.sandbox.save_config(SINGLE_ECHO_CONFIG)
- self.sandbox.start_trond()
+ self.sandbox.trond()
expected = """\nServices:\nNo Services\n\n\nJobs:
Name State Scheduler Last Success
@@ -128,20 +110,19 @@ def remove_line_space(s):
actual = self.sandbox.tronview()[0]
assert_equal(remove_line_space(actual), remove_line_space(expected))
- @suite('sandbox')
def test_tronctl_basic(self):
- canary = os.path.join(self.sandbox.tmp_dir, 'tronctl_basic_done')
- self.sandbox.save_config(SINGLE_ECHO_CONFIG + TOUCH_CLEANUP_FMT % canary)
- self.sandbox.start_trond()
-
- # run the job and check its output
+ client = self.sandbox.client
+ self.sandbox.save_config(SINGLE_ECHO_CONFIG + TOUCH_CLEANUP_FMT)
+ self.sandbox.trond()
self.sandbox.tronctl(['start', 'echo_job'])
- wait_for_file_to_exist(canary + '.echo_job.1')
- assert_equal(self.sandbox.list_action_run('echo_job', 1, 'echo_action')['state'], 'SUCC')
- assert_equal(self.sandbox.list_job_run('echo_job', 1)['state'], 'SUCC')
+ def wait_on_cleanup():
+ return client.action('echo_job.1.cleanup')['state'] == 'SUCC'
+ sandbox.wait_on_sandbox(wait_on_cleanup)
+
+ assert_equal(client.action('echo_job.1.echo_action')['state'], 'SUCC')
+ assert_equal(client.job_runs('echo_job.1')['state'], 'SUCC')
- @suite('sandbox')
def test_tronctl_service_zap(self):
SERVICE_CONFIG = dedent("""
nodes:
@@ -156,47 +137,53 @@ def test_tronctl_service_zap(self):
monitor_interval: 0.1
""" % {'pid': os.getpid()})
- self.sandbox.start_trond()
- self.sandbox.upload_config(SERVICE_CONFIG)
- time.sleep(1)
+ client = self.sandbox.client
+ self.sandbox.trond()
+ self.sandbox.tronfig(SERVICE_CONFIG)
+
+ wait_on_config = lambda: 'fake_service' in client.config()
+ sandbox.wait_on_sandbox(wait_on_config)
- self.sandbox.ctl('start', 'fake_service')
self.sandbox.tronctl(['start', 'fake_service'])
- time.sleep(1)
+ def wait_on_start():
+ return client.service('fake_service')['state'] == 'STARTING'
+ sandbox.wait_on_sandbox(wait_on_start)
+
self.sandbox.tronctl(['zap', 'fake_service'])
- assert_equal('DOWN', self.sandbox.list_service('fake_service')['state'])
+ assert_equal('DOWN', client.service('fake_service')['state'])
- @suite('sandbox')
def test_cleanup_on_failure(self):
- canary = os.path.join(self.sandbox.tmp_dir, 'end_to_end_done')
-
FAIL_CONFIG = BASIC_CONFIG + dedent("""
jobs:
- name: "failjob"
node: local
- schedule: "interval 1 seconds"
+ schedule: "constant"
actions:
- name: "failaction"
command: "failplz"
- """) + TOUCH_CLEANUP_FMT % canary
+ """) + TOUCH_CLEANUP_FMT
- # start with a basic configuration
+ client = self.sandbox.client
self.sandbox.save_config(FAIL_CONFIG)
- self.sandbox.start_trond()
+ self.sandbox.trond()
- time.sleep(3)
+ def wait_on_failaction():
+ return client.action('failjob.0.failaction')['state'] == 'FAIL'
+ sandbox.wait_on_sandbox(wait_on_failaction)
- assert os.path.exists(canary + '.failjob.0')
- assert_gt(len(self.sandbox.list_job('failjob')['runs']), 1)
+ def wait_on_cleanup():
+ return client.action('failjob.1.cleanup')['state'] == 'SUCC'
+ sandbox.wait_on_sandbox(wait_on_cleanup)
+
+ assert_gt(len(client.job('failjob')['runs']), 1)
- @suite('sandbox')
def test_skip_failed_actions(self):
CONFIG = BASIC_CONFIG + dedent("""
jobs:
- name: "multi_step_job"
node: local
- schedule: "interval 1 seconds"
+ schedule: "constant"
actions:
- name: "broken"
command: "failingcommand"
@@ -205,61 +192,46 @@ def test_skip_failed_actions(self):
requires: broken
""")
+ client = self.sandbox.client
self.sandbox.save_config(CONFIG)
- self.sandbox.start_trond()
- time.sleep(2)
+ self.sandbox.trond()
+
+ def build_wait_func(state):
+ def wait_on_multi_step_job():
+ action_name = 'multi_step_job.0.broken'
+ return client.action(action_name)['state'] == state
+ return wait_on_multi_step_job
+ sandbox.wait_on_sandbox(build_wait_func('FAIL'))
self.sandbox.tronctl(['skip', 'multi_step_job.0.broken'])
- action_run = self.sandbox.list_action_run('multi_step_job', 0, 'broken')
- assert_equal(action_run['state'], 'SKIP')
- time.sleep(1)
+ assert_equal(client.action('multi_step_job.0.broken')['state'], 'SKIP')
- action_run = self.sandbox.list_action_run('multi_step_job', 0, 'works')
- assert_equal(action_run['state'], 'SUCC')
- job_run = self.sandbox.list_job_run('multi_step_job', 0)
- assert_equal(job_run['state'], 'SUCC')
+ sandbox.wait_on_sandbox(build_wait_func('SKIP'))
+ assert_equal(client.action('multi_step_job.0.works')['state'], 'SUCC')
+ assert_equal(client.job_runs('multi_step_job.0')['state'], 'SUCC')
- @suite('sandbox')
def test_failure_on_multi_step_job_doesnt_wedge_tron(self):
- # WARNING: This test may be flaky.
- FAIL_CONFIG = dedent("""
- ssh_options: !SSHOptions
- agent: true
-
- nodes:
- - &local
- hostname: 'localhost'
+ FAIL_CONFIG = BASIC_CONFIG + dedent("""
jobs:
- - &random_job
- name: "random_failure_job"
- node: *local
- queueing: true
- schedule: "interval 1 seconds"
- actions:
- - &fa
- name: "fa"
- command: "sleep 1.1; failplz"
- - &sa
- name: "sa"
- command: "echo 'you will never see this'"
- requires: [*fa]
+ - name: "random_failure_job"
+ node: local
+ queueing: true
+ schedule: "constant"
+ actions:
+ - name: "fa"
+ command: "sleep 0.1; failplz"
+ - name: "sa"
+ command: "echo 'you will never see this'"
+ requires: [fa]
""")
- with open(self.sandbox.config_file, 'w') as f:
- f.write(FAIL_CONFIG)
- self.sandbox.start_trond()
-
- # Wait a little to give things time to explode
- time.sleep(1.5)
- jerb = self.sandbox.list_job('random_failure_job')
- total_tries = 0
- while ((len(jerb['runs']) < 3 or
- jerb['runs'][-1][u'state'] not in [u'FAIL', u'SUCC']) and
- total_tries < 30):
- time.sleep(0.2)
- jerb = self.sandbox.list_job('random_failure_job')
- total_tries += 1
-
- assert_equal(jerb['runs'][-1][u'state'], u'FAIL')
- assert_in(jerb['runs'][-2][u'state'], [u'FAIL', u'RUNN'])
- assert_equal(jerb['runs'][0][u'state'], u'SCHE')
+ client = self.sandbox.client
+ self.sandbox.save_config(FAIL_CONFIG)
+ self.sandbox.trond()
+
+ def wait_on_random_failure_job():
+ return len(client.job('random_failure_job')['runs']) >= 4
+ sandbox.wait_on_sandbox(wait_on_random_failure_job)
+
+ job_runs = client.job('random_failure_job')['runs']
+ assert_equal([run['state'] for run in job_runs[-3:]], ['FAIL'] * 3)
View
51 tests/utils/iteration_test.py
@@ -0,0 +1,51 @@
+from testify import TestCase, assert_equal, setup, run
+from tron.utils.iteration import min_filter, max_filter
+
+class FilterFuncTestCase(TestCase):
+
+ __test__ = False
+
+ @setup
+ def setup_seq(self):
+ self.test_func = None
+
+ def test_filter_empty_seq(self):
+ assert_equal(self.test_func([]), None)
+
+ def test_filter_all_nones(self):
+ assert_equal(self.test_func([None, None, None]), None)
+
+ def test_filter_none(self):
+ assert_equal(self.test_func(None), None)
+
+ def test_filter_single_item(self):
+ assert_equal(self.test_func([1]), 1)
+
+ def test_filter_single_item_with_nones(self):
+ assert_equal(self.test_func([None, 4, None, None]), 4)
+
+
+class FilteredMinTestCase(FilterFuncTestCase):
+
+ @setup
+ def setup_func(self):
+ self.test_func = min_filter
+
+ def test_min_filter(self):
+ seq = [None, 2, None, 7, None, 9, 10, 12, 1]
+ assert_equal(min_filter(seq), 1)
+
+
+class FilteredMaxTestCase(FilterFuncTestCase):
+
+ @setup
+ def setup_func(self):
+ self.test_func = max_filter
+
+ def test_max_filter(self):
+ seq = [None, 2, None, 7, None, 9, 10, 12, 1]
+ assert_equal(max_filter(seq), 12)
+
+
+if __name__ == "__main__":
+ run()
View
18 tests/utils/timeutils_test.py
@@ -1,6 +1,6 @@
import datetime
from testify import TestCase, assert_equal, setup
-from testify.test_case import class_setup, class_teardown
+from tests import testingutils
from tron.utils import timeutils
from tron.utils.timeutils import duration, macro_timedelta, DateArithmetic
@@ -110,19 +110,11 @@ def test(self):
assert_equal(delta_seconds, expected)
-class DateArithmeticTestCase(TestCase):
+class DateArithmeticTestCase(testingutils.MockTimeTestCase):
- @class_setup
- def freeze_time(self):
- # Set a date with days less then 28, otherwise some tests will fail
- # when run on days > 28.
- dt = datetime.datetime(2012, 3, 20)
- timeutils.override_current_time(dt)
- self.now = timeutils.current_time()
-
- @class_teardown
- def unfreeze_time(self):
- timeutils.override_current_time(None)
+ # Set a date with days less then 28, otherwise some tests will fail
+ # when run on days > 28.
+ now = datetime.datetime(2012, 3, 20)
def _cmp_date(self, item, dt):
assert_equal(DateArithmetic.parse(item), dt.strftime("%Y-%m-%d"))
View
2  tron/__init__.py
@@ -1,4 +1,4 @@
-__version_info__ = (0, 4, 0, 3)
+__version_info__ = (0, 4, 1)
__version__ = ".".join("%s" % v for v in __version_info__)
__author__ = 'Rhett Garber <rhettg@gmail.com>'
View
19 tron/commands/client.py
@@ -44,6 +44,10 @@ def request(host, path, data=None):
return OK, result
+class RequestError(ValueError):
+ """Raised when there is a connection failure."""
+
+
class Client(object):
"""A client used in commands to make requests to the tron.www """
@@ -62,8 +66,11 @@ def config(self, data=None):
return self.request('/config', dict(config=data))
return self.request('/config', )['config']
+ def home(self):
+ return self.request('/')
+
def index(self):
- content = self.request('/')
+ content = self.home()
def name_href_dict(source):
return dict((i['name'], i['href']) for i in source)
@@ -134,9 +141,9 @@ def action_events(self, action_id):
return self.request('/jobs/%s/_events' % action_id)['data']
def request(self, url, data=None):
- status, content = request(self.options.server, url, data)
+ server = self.options.server
+ status, content = request(server, url, data)
if not status == OK:
- err_msg = "Failed to request %s%s: %s %s" % (
- self.options.server, url, content, data or '')
- raise ValueError(err_msg)
- return content
+ err_msg = "%s%s: %s %s"
+ raise RequestError(err_msg % (server, url, content, data or ''))
+ return content
View
6 tron/commands/display.py
@@ -3,7 +3,6 @@
"""
from operator import itemgetter
import os
-import sys
class Color(object):
@@ -76,9 +75,10 @@ def __init__(self, options=None):
self.num_cols = self.console_width()
def console_width(self):
- if not sys.stdout.isatty():
+ console_sizes = os.popen('stty size', 'r').read().split()
+ if not console_sizes or len(console_sizes) != 2:
return 80
- return int(os.popen('stty size', 'r').read().split()[1])
+ return int(console_sizes[1])
def banner(self):
if not self.title:
View
7 tron/core/actionrun.py
@@ -10,7 +10,7 @@
from tron import node
from tron.actioncommand import ActionCommand
-from tron.utils import state, timeutils, proxy
+from tron.utils import state, timeutils, proxy, iteration
from tron.utils.observer import Observer
log = logging.getLogger(__name__)
@@ -235,7 +235,7 @@ def from_state(cls, state_data, parent_context, output_path,
# Transition running to fail unknown because exit status was missed
if run.is_running:
- run.machine.transition('fail_unknown')
+ run._done('fail_unknown')
if run.is_queued or run.is_starting:
run.fail(None)
return run
@@ -400,7 +400,6 @@ class ActionRunCollection(object):
def __init__(self, action_graph, run_map):
self.action_graph = action_graph
self.run_map = run_map
-
# Setup proxies
self.proxy_action_runs_with_cleanup = proxy.CollectionProxy(
self.get_action_runs_with_cleanup, [
@@ -416,6 +415,8 @@ def __init__(self, action_graph, run_map):
('success', all, True),
('fail', all, True),
('ready', all, True),
+ ('start_time', iteration.min_filter, False),
+ ('end_time', iteration.max_filter, False),
])
def action_runs_for_actions(self, actions):
View
7 tron/core/job.py
@@ -127,7 +127,6 @@ def update_from_job(self, job):
self.action_graph = job.action_graph
self.enabled = job.enabled
self.output_path = job.output_path
- self.context = job.context
self.notify(self.EVENT_RECONFIGURED)
@property
@@ -332,8 +331,10 @@ def handle_job_events(self, _observable, event):
if queued_run:
reactor.callLater(0, self.run_job, queued_run, run_queued=True)
- if self.job.scheduler.schedule_on_complete:
- self.schedule()
+ # Attempt to schedule a new run. This will only schedule a run if the
+ # previous run was cancelled from a scheduled state, or if the job
+ # scheduler is `schedule_on_complete`.
+ self.schedule()
handler = handle_job_events
def get_runs_to_schedule(self, ignore_last_run_time=False):
View
19 tron/core/jobrun.py
@@ -1,5 +1,5 @@
"""
- tron.core.jobrun
+ Classes to manage job runs.
"""
from collections import deque
@@ -51,8 +51,8 @@ class JobRun(Observable, Observer):
EVENT_SUCCEEDED = event.EventType(event.LEVEL_OK, "succeeded")
def __init__(self, job_name, run_num, run_time, node, output_path=None,
- base_context=None, action_runs=None, start_time=None,
- end_time=None, action_graph=None, manual=None):
+ base_context=None, action_runs=None,
+ action_graph=None, manual=None):
super(JobRun, self).__init__()
self.job_name = job_name
self.run_num = run_num
@@ -60,8 +60,6 @@ def __init__(self, job_name, run_num, run_time, node, output_path=None,
self.node = node
self.output_path = output_path or filehandler.OutputPath()
self.output_path.append(self.id)
- self.start_time = start_time
- self.end_time = end_time
self.action_runs_proxy = None
self._action_runs = None
self.action_graph = action_graph
@@ -108,8 +106,6 @@ def from_state(cls, state_data, action_graph, output_path, context,
state_data['run_num'],
state_data['run_time'],
run_node,
- end_time=state_data['end_time'],
- start_time=state_data['start_time'],
action_graph=action_graph,
manual=state_data.get('manual', False),
output_path=output_path,
@@ -129,8 +125,6 @@ def state_data(self):
'run_time': self.run_time,
'node_name': self.node.name if self.node else None,
'runs': self.action_runs.state_data,
- 'start_time': self.start_time,
- 'end_time': self.end_time,
'cleanup_run': self.action_runs.cleanup_action_state_data,
'manual': self.manual,
}
@@ -164,6 +158,8 @@ def _set_action_runs(self, run_collection):
'is_scheduled',
'is_skipped',
'is_starting',
+ 'start_time',
+ 'end_time'
])
def _del_action_runs(self):
@@ -186,7 +182,6 @@ def start(self):
def _do_start(self):
log.info("Starting JobRun %s", self.id)
- self.start_time = timeutils.current_time()
self.action_runs.ready()
started_runs = self._start_action_runs()
@@ -242,9 +237,8 @@ def finalize(self):
completes or if the job has no cleanup action, called once all action
runs have reached a 'done' state.
- Sets end_time and triggers an event to notifies the Job that is is done.
+ Triggers an event to notifies the Job that is is done.
"""
- self.end_time = timeutils.current_time()
failure = self.action_runs.is_failed
event = self.EVENT_FAILED if failure else self.EVENT_SUCCEEDED
self.notify(event)
@@ -353,6 +347,7 @@ def cancel_pending(self):
def remove_pending(self):
"""Remove pending runs from the run list."""
for pending in list(self.get_pending()):
+ pending.clear_observers()
pending.cancel()
pending.cleanup()
self.runs.remove(pending)
View
3  tron/mcp.py
@@ -2,7 +2,6 @@
import logging
import os
import shutil
-import time
import yaml
from twisted.conch.client.options import ConchOptions
@@ -191,7 +190,7 @@ def _load_data_file(self, data_file):
def state_data(self):
data = {
'version': tron.__version_info__,
- 'create_time': int(time.time()),
+ 'create_time': timeutils.current_timestamp(),
'jobs': {},
'services': {},
}
View
2  tron/node.py
@@ -242,7 +242,7 @@ def run(self, run):
else:
log.info("Delaying execution of %s for %.2f secs",
run.id, fudge_factor)
- reactor.callLater(fudge_factor, lambda: self._do_run(run))
+ reactor.callLater(fudge_factor, self._do_run, run)
# We return the deferred here, but really we're trying to keep the rest
# of the world from getting too involved with twisted.
View
16 tron/utils/iteration.py
@@ -0,0 +1,16 @@
+"""Iteration utility functions."""
+
+
+def build_filtered_func(func):
+ def filtered_func(seq):
+ """Filter out Nones and return the return of func."""
+ if not seq:
+ return None
+ seq = filter(None, seq)
+ if not seq:
+ return None
+ return func(seq)
+ return filtered_func
+
+min_filter = build_filtered_func(min)
+max_filter = build_filtered_func(max)
View
34 tron/utils/timeutils.py
@@ -1,33 +1,17 @@
-"""Time utilites"""
+"""Functions for working with dates and timestamps."""
from __future__ import division
import datetime
import re
import time
-
-# Global time override
-_current_time_override = None
-
-
-def override_current_time(current_time):
- """Interface for overriding the current time
-
- This is a test hook for forcing the app to think it's a specific time.
- """
- global _current_time_override
- _current_time_override = current_time
-
-
def current_time():
- """Standard interface for generating the current time."""
- if _current_time_override is not None:
- return _current_time_override
- else:
- return datetime.datetime.now()
+ """Return the current datetime."""
+ return datetime.datetime.now()
def current_timestamp():
+ """Return the current time as a timestamp."""
return to_timestamp(current_time())
@@ -35,11 +19,13 @@ def to_timestamp(time_val):
"""Generate a unix timestamp for the given datetime instance"""
return time.mktime(time_val.timetuple())
+
def delta_total_seconds(td):
- """Equivalent to timedelta.total_seconds() which is only available in 2.7.
+ """Equivalent to timedelta.total_seconds() available in Python 2.7.
"""
- return (
- td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
+ microseconds, seconds, days = td.microseconds, td.seconds, td.days
+ return (microseconds + (seconds + days * 24 * 3600) * 10**6) / 10**6
+
def macro_timedelta(start_date, years=0, months=0, days=0):
"""Since datetime doesn't provide timedeltas at the year or month level,
@@ -110,4 +96,4 @@ def parse(cls, date_str, dt=None):
return int(to_timestamp(dt)) + delta
if attr == 'daynumber':
- return dt.toordinal() + delta
+ return dt.toordinal() + delta

No commit comments for this range

Something went wrong with that request. Please try again.