From b5fa5c1f5ccfc2fb70b89ee489c391820267bc75 Mon Sep 17 00:00:00 2001 From: Becky Smith Date: Mon, 8 Jan 2024 13:33:04 +0000 Subject: [PATCH 1/2] Add a command to remove a job by id If some unexpected error is encountered during the running of a job, the job may be updated in the database to indicate that it's running, but never complete or fail. This prevents the job being retried, and any further attempts to run the job only update the job scheduled to run after the current one is done. This adds a command to remove specific jobs by id (identified by calling status first). --- ebmbot/bot.py | 28 ++++++++++++++++++++++++++++ tests/test_bot.py | 26 ++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/ebmbot/bot.py b/ebmbot/bot.py index cd5f6e17..682e0548 100644 --- a/ebmbot/bot.py +++ b/ebmbot/bot.py @@ -153,6 +153,10 @@ def _listener(event, say): handle_status(event, say) return + if text.startswith("remove job id"): + handle_remove_job(app, event, say, text) + return + for slack_config in config["slack"]: if slack_config["regex"].match(text): handle_command(app, event, say, slack_config) @@ -235,6 +239,24 @@ def handle_status(message, say): say(status, thread_ts=message.get("thread_ts")) +@log_call +def handle_remove_job(app, message, say, text): + """Remove a job from the database so it can be rerun.""" + app.client.reactions_add( + channel=message["channel"], timestamp=message["ts"], name="crossed_fingers" + ) + job_id = int(text.split("remove job id ")[1]) + jobs_ids = [job["id"] for job in scheduler.get_jobs()] + if job_id not in jobs_ids: + say( + f"Job id [{job_id}] not found in running or scheduled jobs", + thread_ts=message.get("thread_ts"), + ) + else: + scheduler.mark_job_done(job_id) + say(f"Job id [{job_id}] removed", thread_ts=message.get("thread_ts")) + + def _build_status(): running_jobs = [] scheduled_jobs = [] @@ -410,6 +432,12 @@ def handle_help(message, say, help_configs, include_apology): lines.append( f"Enter `{prefix}[namespace] help` (eg `{prefix}{random.choice(list(help_configs))} help`) for more help" ) + lines.append(f"Enter `{prefix}status` to see running and scheduled jobs") + lines.append( + f"Enter `{prefix}remove job id [id]` to remove a job; this will not " + "cancel jobs that are in progress, but will let you retry a job that " + "appears to have stalled." + ) say("\n".join(lines), thread_ts=message.get("thread_ts")) diff --git a/tests/test_bot.py b/tests/test_bot.py index f654f8b0..ed4527d3 100644 --- a/tests/test_bot.py +++ b/tests/test_bot.py @@ -474,6 +474,32 @@ def test_new_channel_created(mock_app): ] == {"channel": "C0NEW", "users": "U1234"} +def test_remove_job(mock_app): + recorder = mock_app.recorder + handle_message(mock_app, "<@U1234> test do job 10", reaction_count=1) + jobs = scheduler.get_jobs_of_type("test_good_job") + assert len(jobs) == 1 + job_id = jobs[0]["id"] + handle_message(mock_app, f"<@U1234> remove job id {job_id}", reaction_count=2) + assert not scheduler.get_jobs_of_type("test_good_job") + + post_message = recorder.mock_received_requests_kwargs["/chat.postMessage"][0] + assert ( + "text", + "Job id [1] removed", + ) in post_message.items() + + +def test_remove_non_existent_job(mock_app): + recorder = mock_app.recorder + handle_message(mock_app, "<@U1234> remove job id 10", reaction_count=1) + post_message = recorder.mock_received_requests_kwargs["/chat.postMessage"][0] + assert ( + "text", + "Job id [10] not found in running or scheduled jobs", + ) in post_message.items() + + def handle_message( mock_app, text, From 8b69286a5c70b23e98df9e53aecb0c4763ba0023 Mon Sep 17 00:00:00 2001 From: Becky Smith Date: Mon, 8 Jan 2024 13:37:41 +0000 Subject: [PATCH 2/2] Retry slack notifications in case of unexpected errors If the slack client errors in some unexpected way, it can cause a scheduled job to error without returning any success/fail status, which means the job gets stuck in the "running" state, even though it's only been reserved and isn't actually running. Now we retry a max of 3 times and log the error if we can't do the notification. This could mean that jobs run and never report back, if the slack client is more than transiently broken, but then the chances are that simple commands like help are also broken and at least the output of jobs should be accessible in the job logs. --- ebmbot/slack.py | 27 +++++++++++++++++++++++---- tests/conftest.py | 27 +++++++++++++++++++++------ tests/test_dispatcher.py | 20 ++++++++++++++++++++ 3 files changed, 64 insertions(+), 10 deletions(-) diff --git a/ebmbot/slack.py b/ebmbot/slack.py index ee0604fe..c9dbd38e 100644 --- a/ebmbot/slack.py +++ b/ebmbot/slack.py @@ -1,8 +1,10 @@ +from time import sleep + from .logger import logger def notify_slack( - slack_client, channel, message_text, thread_ts=None, message_format=None + slack_client, channel, message_text, thread_ts=None, message_format=None, fail=False ): """Send message to Slack.""" msg_kwargs = {"text": str(message_text)} @@ -12,7 +14,24 @@ def notify_slack( logger.info( "Sending message", channel=channel, message=message_text, thread_ts=thread_ts ) - resp = slack_client.chat_postMessage( - channel=channel, thread_ts=thread_ts, **msg_kwargs + # In case of any unexpected transient exception posting to slack, retry up to 3 + # times and then log the error, to avoid errors in scheduled jobs. + attempts = 0 + while attempts < 3: + try: + resp = slack_client.chat_postMessage( + channel=channel, thread_ts=thread_ts, **msg_kwargs + ) + return resp.data + except Exception as err: + attempts += 1 + sleep(1) + error = err + + logger.error( + "Could not notify slack", + channel=channel, + message=message_text, + thread_ts=thread_ts, + error=error, ) - return resp.data diff --git a/tests/conftest.py b/tests/conftest.py index 356804bc..e11c8d34 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -26,6 +26,11 @@ def reset_db(): pass +class WebClientWithSlackException(WebClient): + def chat_postMessage(self, *args, **kwargs): + raise Exception("Error notifying slack") + + @dataclass class MockRecordingClient: client: WebClient @@ -38,19 +43,29 @@ class MockRecordingApp: recorder: Mock -@pytest.fixture -def mock_client(): - test_recorder = Mock() +def _get_mock_recording_client(client_class, test_recorder): setup_mock_web_api_server(test_recorder) mock_api_server_base_url = "http://localhost:8888" - - yield MockRecordingClient( - client=WebClient( + return MockRecordingClient( + client=client_class( token="xoxb-valid", base_url=mock_api_server_base_url, ), recorder=test_recorder, ) + + +@pytest.fixture +def mock_client(): + test_recorder = Mock() + yield _get_mock_recording_client(WebClient, test_recorder) + cleanup_mock_web_api_server(test_recorder) + + +@pytest.fixture +def mock_client_with_slack_exception(): + test_recorder = Mock() + yield _get_mock_recording_client(WebClientWithSlackException, test_recorder) cleanup_mock_web_api_server(test_recorder) diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py index 867850d5..b94b79bf 100644 --- a/tests/test_dispatcher.py +++ b/tests/test_dispatcher.py @@ -153,6 +153,26 @@ def test_job_success_with_no_report(mock_client): assert f.read() == "" +def test_job_success_with_slack_exception(mock_client_with_slack_exception): + # Test that the job still succeeds even if notifying slack errors + log_dir = build_log_dir("test_good_job") + + scheduler.schedule_job("test_good_job", {}, "channel", TS, 0) + job = scheduler.reserve_job() + + do_job(mock_client_with_slack_exception.client, job) + assert_slack_client_sends_messages( + mock_client_with_slack_exception.recorder, + messages_kwargs=[], + ) + + with open(os.path.join(log_dir, "stdout")) as f: + assert f.read() == "the owl and the pussycat\n" + + with open(os.path.join(log_dir, "stderr")) as f: + assert f.read() == "" + + def test_job_failure(mock_client): log_dir = build_log_dir("test_bad_job")