Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add arg to tronctl retry for waiting for dependencies #867

Merged
merged 8 commits into from
Apr 13, 2022
103 changes: 78 additions & 25 deletions bin/tronctl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import argparse
import asyncio
import datetime
import logging
import pprint
import sys
from collections import defaultdict
from typing import Any
Expand All @@ -34,29 +35,46 @@ from tron.commands.client import RequestError
from tron.commands.cmd_utils import ExitCode
from tron.commands.cmd_utils import suggest_possibilities
from tron.commands.cmd_utils import tron_jobs_completer
from tron.commands.retry import parse_deps_timeout
from tron.commands.retry import print_retries_table
from tron.commands.retry import retry_actions
from tron.commands.retry import RetryAction

COMMAND_HELP = (
("start", "Start the selected job, job run, or action. Creates a new job run if starting a job.",),
("rerun", "Start a new job run with the same start time command context as the given job run.",),
(
"start",
"job name, job run id, or action id",
"Start the selected job, job run, or action. Creates a new job run if starting a job.",
),
("rerun", "job run id", "Start a new job run with the same start time command context as the given job run.",),
(
"retry",
"action id",
"Re-run a job action within an existing job run. Uses latest code/config except the command by default. Add --use-latest-command to use the latest command.",
),
("recover", "Ask Tron to start tracking an UNKNOWN action run again"),
("cancel", "Cancel the selected job run."),
("backfill", "Start job runs for a particular date range",),
("disable", "Disable selected job and cancel any outstanding runs"),
("enable", "Enable the selected job and schedule the next run"),
("fail", "Mark an UNKNOWN job or action as failed. Does not publish action triggers.",),
("success", "Mark an UNKNOWN job or action as having succeeded. Will publish action triggers.",),
("skip", "Skip a failed action, unblocks dependent actions. Does *not* publish action triggers.",),
("skip-and-publish", "Skip a failed action, unblocks dependent actions. *Does* publish action triggers.",),
("stop", "Stop the action run (SIGTERM)"),
("kill", "Force kill the action run (SIGKILL)"),
("move", "Rename a job"),
("publish", "Publish actionrun trigger to kick off downstream jobs"),
("discard", "Discard existing actionrun trigger"),
("version", "Print tron client and server versions"),
("recover", "action id", "Ask Tron to start tracking an UNKNOWN action run again"),
("cancel", "job run id", "Cancel the selected job run."),
("backfill", "job name", "Start job runs for a particular date range",),
("disable", "job name", "Disable selected job and cancel any outstanding runs"),
("enable", "job name", "Enable the selected job and schedule the next run"),
("fail", "job run or action id", "Mark an UNKNOWN job or action as failed. Does not publish action triggers.",),
(
"success",
"job run or action id",
"Mark an UNKNOWN job or action as having succeeded. Will publish action triggers.",
),
("skip", "action id", "Skip a failed action, unblocks dependent actions. Does *not* publish action triggers.",),
(
"skip-and-publish",
"action id",
"Skip a failed action, unblocks dependent actions. *Does* publish action triggers.",
),
("stop", "action id", "Stop the action run (SIGTERM)"),
("kill", "action id", "Force kill the action run (SIGKILL)"),
("move", "job name", "Rename a job"),
("publish", "action id", "Publish actionrun trigger to kick off downstream jobs"),
("discard", "trigger id", "Discard existing actionrun trigger"),
("version", None, "Print tron client and server versions"),
)

log = logging.getLogger("tronctl")
Expand All @@ -68,15 +86,16 @@ def parse_date(date_string):

def parse_cli():
parser = cmd_utils.build_option_parser()

subparsers = parser.add_subparsers(dest="command", title="commands", help="Tronctl command to run")
subparsers.required = True # add_subparsers only supports required arg from py37

cmd_parsers = {}
for cmd_name, desc in COMMAND_HELP:
for cmd_name, id_help_text, desc in COMMAND_HELP:
cmd_parsers[cmd_name] = subparsers.add_parser(cmd_name, help=desc, description=desc)
cmd_parsers[cmd_name].add_argument(
"id", nargs="*", help="job name, job run id, or action id",
).completer = cmd_utils.tron_jobs_completer
if id_help_text:
cmd_parsers[cmd_name].add_argument(
"id", nargs="*", help=id_help_text
).completer = cmd_utils.tron_jobs_completer
kawaiwanyelp marked this conversation as resolved.
Show resolved Hide resolved

# start
cmd_parsers["start"].add_argument(
Expand Down Expand Up @@ -153,12 +172,28 @@ def parse_cli():
)

# retry
cmd_parsers["retry"].add_argument(
retry_parser = cmd_parsers["retry"]
retry_parser.add_argument(
"--use-latest-command",
action="store_true",
default=False,
help="Use the latest command in tronfig rather than the original command when the action run was created",
)
retry_parser.add_argument(
"--wait-for-deps",
type=parse_deps_timeout,
default=0,
dest="deps_timeout",
help=(
"Max duration to wait for upstream dependencies (upstream triggers "
"and/or same job actions) before attempting to retry. "
"If all dependencies are not done when the timeout expires, "
"this command will exit with an error, and the action will NOT be retried. "
"Must be either an int number of seconds, a human-readable/"
"pytimeparse-parsable string, or 'infinity' to wait forever. "
"Defaults to 0 (don't wait)."
),
)

argcomplete.autocomplete(parser)
args = parser.parse_args()
Expand Down Expand Up @@ -256,11 +291,28 @@ def control_objects(args: argparse.Namespace):
data = dict(command=args.command)
if args.command == "start" and args.run_date:
data["run_time"] = str(args.run_date)
if args.command == "retry":
data["use_latest_command"] = int(args.use_latest_command)
yield request(urljoin(args.server, tron_id.url), data)


def retry(args):
if args.deps_timeout != RetryAction.NO_TIMEOUT:
deps_timeout_str = "forever" # timeout = -1 (RetryAction.WAIT_FOREVER)
if args.deps_timeout > 0:
deps_timeout_str = "up to " + str(datetime.timedelta(seconds=args.deps_timeout))
print(
f"We will wait {deps_timeout_str} for all upstream triggers to be published "
"and required actions to finish successfully before issuing retries for the "
"following actions:"
)
print()
pprint.pprint(args.id)
kawaiwanyelp marked this conversation as resolved.
Show resolved Hide resolved
print()

retries = retry_actions(args.server, args.id, args.use_latest_command, args.deps_timeout)
print_retries_table(retries)
yield all([r.succeeded for r in retries])


def move(args):
try:
old_name = args.id[0]
Expand Down Expand Up @@ -338,6 +390,7 @@ COMMANDS: Dict[str, Callable[[argparse.Namespace], Generator[bool, None, None]]]
discard=event_discard,
backfill=backfill,
move=move,
retry=retry,
version=tron_version,
)

Expand Down
2 changes: 2 additions & 0 deletions cluster_itests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
environment:
- MESOS_SYSTEMD_ENABLE_SUPPORT=false

tronmaster:
build:
Expand Down
204 changes: 204 additions & 0 deletions tests/commands/retry_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import random

import mock
import pytest

from tron.commands import client
from tron.commands import retry


async def _empty_coro(*args, **kwargs):
return None


@pytest.fixture(autouse=True)
def mock_sleep():
with mock.patch("asyncio.sleep", _empty_coro, autospec=None):
yield


@pytest.fixture(autouse=True)
def mock_client():
with mock.patch.object(client, "Client", autospec=True) as m:
m.return_value.url_base = "http://localhost"
yield m


@pytest.fixture(autouse=True)
def mock_urlopen(): # prevent any requests from being made
with mock.patch("urllib.request.urlopen", autospec=True) as m:
yield m


@pytest.fixture
def mock_client_request():
with mock.patch.object(client, "request", autospec=True) as m:
m.return_value = mock.Mock(error=False, content={}) # response
yield m


@mock.patch.object(
client,
"get_object_type_from_identifier",
return_value=client.TronObjectIdentifier("JOB_RUN", "/a_job_run"),
autospec=True,
)
def test_retry_action_init_not_an_action(mock_get_obj_type, mock_client):
tron_client = mock_client.return_value
with pytest.raises(ValueError):
retry.RetryAction(tron_client, "a_fake_action_run")


@pytest.fixture
def fake_retry_action(mock_client):
tron_client = mock_client.return_value
tron_client.action_runs.return_value = dict(
action_name="a_fake_action",
requirements=["required_action_0", "required_action_1"],
triggered_by="a_fake_trigger_0 (done), a_fake_trigger_1",
)
tron_client.job_runs.return_value = dict(
job_name="a_fake_job",
run_num=1234,
runs=[
dict(action_name="required_action_0", state="succeeded"),
dict(action_name="non_required_action", state="succeeded"),
dict(action_name="required_action_1", state="failed"),
dict(action_name="upstream_action_0", trigger_downstreams="a_fake_trigger_0"),
dict(action_name="upstream_action_1", trigger_downstreams="a_fake_trigger_1"),
tron_client.action_runs.return_value,
],
)

with mock.patch.object(
client,
"get_object_type_from_identifier",
side_effect=[
client.TronObjectIdentifier("ACTION_RUN", "/a_fake_job/0/a_fake_action"),
client.TronObjectIdentifier("JOB_RUN", "/a_fake_job/0"),
],
autospec=True,
):
yield retry.RetryAction(tron_client, "a_fake_job.0.a_fake_action", use_latest_command=True)


def test_retry_action_init_ok(fake_retry_action):
assert fake_retry_action.retry_params == dict(command="retry", use_latest_command=1)
assert fake_retry_action.full_action_name == "a_fake_job.0.a_fake_action"
fake_retry_action.tron_client.action_runs.assert_called_once_with(
"/a_fake_job/0/a_fake_action", num_lines=0,
)
assert fake_retry_action.action_name == "a_fake_action"
assert fake_retry_action.action_run_id.url == "/a_fake_job/0/a_fake_action"
fake_retry_action.tron_client.job_runs.assert_called_once_with("/a_fake_job/0")
assert fake_retry_action.job_run_name == "a_fake_job.0"
assert fake_retry_action.job_run_id.url == "/a_fake_job/0"
assert fake_retry_action._required_action_indices == {"required_action_0": 0, "required_action_1": 2}


def test_check_trigger_statuses(fake_retry_action, event_loop):
expected = dict(a_fake_trigger_0=True, a_fake_trigger_1=False)
assert expected == event_loop.run_until_complete(fake_retry_action.check_trigger_statuses())
assert fake_retry_action.tron_client.action_runs.call_args_list[1] == mock.call( # 0th call is in init
"/a_fake_job/0/a_fake_action", num_lines=0,
)


def test_check_required_actions_statuses(fake_retry_action, event_loop):
expected = dict(required_action_0=True, required_action_1=False)
assert expected == event_loop.run_until_complete(fake_retry_action.check_required_actions_statuses())
assert fake_retry_action.tron_client.job_runs.call_args_list[1] == mock.call("/a_fake_job/0") # 0th call is in init


@pytest.mark.parametrize(
"expected,triggered_by,required_action_1_state",
[
(False, "a_fake_trigger_0 (done), a_fake_trigger_1", "skipped"), # unpublished triggers
(False, "a_fake_trigger_0 (done), a_fake_trigger_1 (done)", "failed"), # required not succeeded
(True, "a_fake_trigger_0 (done), a_fake_trigger_1 (done)", "succeeded"), # all done
],
)
def test_can_retry(fake_retry_action, event_loop, expected, triggered_by, required_action_1_state):
fake_retry_action.tron_client.action_runs.return_value["triggered_by"] = triggered_by
fake_retry_action.tron_client.job_runs.return_value["runs"][2]["state"] = required_action_1_state
assert expected == event_loop.run_until_complete(fake_retry_action.can_retry())


def test_wait_for_deps_timeout(fake_retry_action, event_loop):
assert not event_loop.run_until_complete(fake_retry_action.wait_for_deps(deps_timeout_s=3, poll_interval_s=1))
assert fake_retry_action._elapsed.seconds == 3
assert fake_retry_action.tron_client.action_runs.call_count == 5 # 1 in init, 4 in this test


def test_wait_for_deps_all_deps_done(fake_retry_action, event_loop):
fake_retry_action.tron_client.job_runs.return_value["runs"][2]["state"] = "skipped"
fake_retry_action.tron_client.action_runs.return_value = None
triggered_by_results = [
"a_fake_trigger_0 (done), a_fake_trigger_1",
"a_fake_trigger_0 (done), a_fake_trigger_1",
"a_fake_trigger_0 (done), a_fake_trigger_1 (done)",
]
fake_retry_action.tron_client.action_runs.side_effect = [
dict(action_name="a_fake_action", requirements=["required_action_0", "required_action_1"], triggered_by=r,)
for r in triggered_by_results
]

assert event_loop.run_until_complete(fake_retry_action.wait_for_deps(deps_timeout_s=3, poll_interval_s=1))
# 3rd triggered_by result returned on check at 2nd second
assert fake_retry_action._elapsed.seconds == 2
assert fake_retry_action.tron_client.action_runs.call_count == 4 # 1 in init, 3 in this test


@pytest.mark.parametrize("expected,error", [(False, True), (True, False)])
def test_issue_retry(fake_retry_action, mock_client_request, event_loop, expected, error):
mock_client_request.return_value.error = error
assert expected == event_loop.run_until_complete(fake_retry_action.issue_retry())
assert expected == fake_retry_action.succeeded


def test_wait_for_retry_deps_not_done(fake_retry_action, mock_client_request, event_loop):
assert not event_loop.run_until_complete(
fake_retry_action.wait_and_retry(deps_timeout_s=10, poll_interval_s=1, jitter=True),
)
assert fake_retry_action._elapsed.seconds == 10 # timeout
mock_client_request.assert_not_called() # retry not attempted


def test_wait_for_retry_deps_done(fake_retry_action, mock_client_request, event_loop):
fake_retry_action.tron_client.job_runs.return_value["runs"][2]["state"] = "skipped"
fake_retry_action.tron_client.action_runs.return_value[
"triggered_by"
] = "a_fake_trigger_0 (done), a_fake_trigger_1 (done)"
mock_client_request.return_value.error = False
random.seed(1) # init delay is 1s

assert event_loop.run_until_complete(
fake_retry_action.wait_and_retry(deps_timeout_s=10, poll_interval_s=5, jitter=True),
)
assert fake_retry_action._elapsed.seconds == 1 # init delay only
mock_client_request.assert_called_once_with(
"http://localhost/a_fake_job/0/a_fake_action", data=dict(command="retry", use_latest_command=1)
)


@mock.patch.object(retry, "RetryAction", autospec=True)
def test_retry_actions(mock_retry_action, mock_client, event_loop):
mock_wait_and_retry = mock_retry_action.return_value.wait_and_retry
mock_wait_and_retry.return_value = _empty_coro()

r_actions = retry.retry_actions(
"http://localhost",
["a_job.0.an_action_0", "another_job.1.an_action_1"],
use_latest_command=True,
deps_timeout_s=4,
)

assert r_actions == [mock_retry_action.return_value] * 2
assert mock_retry_action.call_args_list == [
mock.call(mock_client.return_value, "a_job.0.an_action_0", use_latest_command=True),
mock.call(mock_client.return_value, "another_job.1.an_action_1", use_latest_command=True),
]
assert mock_wait_and_retry.call_args_list == [
mock.call(deps_timeout_s=4, jitter=False),
mock.call(deps_timeout_s=4),
]
Loading