From 5c4364aafe57f089746a833d96a76c68c8869e20 Mon Sep 17 00:00:00 2001 From: Jordi Escrich <810325+listik@users.noreply.github.com> Date: Wed, 24 Jan 2024 01:58:29 +0100 Subject: [PATCH] Fix successful Apache Druid task submissions reported as failed (#36813) * Accept 202 status code response * Fix status message * Addresses [comment](https://github.com/apache/airflow/pull/36813#discussion_r1454463089) * Make tests cover actual behaviour --- airflow/providers/apache/druid/hooks/druid.py | 5 +++-- tests/providers/apache/druid/hooks/test_druid.py | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/providers/apache/druid/hooks/druid.py b/airflow/providers/apache/druid/hooks/druid.py index 11c415e42ca60..85bb1167fc91c 100644 --- a/airflow/providers/apache/druid/hooks/druid.py +++ b/airflow/providers/apache/druid/hooks/druid.py @@ -107,9 +107,10 @@ def submit_indexing_job( req_index = requests.post(url, data=json_index_spec, headers=self.header, auth=self.get_auth()) code = req_index.status_code - if code != 200: + not_accepted = not (200 <= code < 300) + if not_accepted: self.log.error("Error submitting the Druid job to %s (%s) %s", url, code, req_index.content) - raise AirflowException(f"Did not get 200 when submitting the Druid job to {url}") + raise AirflowException(f"Did not get 200 or 202 when submitting the Druid job to {url}") req_json = req_index.json() # Wait until the job is completed diff --git a/tests/providers/apache/druid/hooks/test_druid.py b/tests/providers/apache/druid/hooks/test_druid.py index 221371eac6cec..5a389cb710c65 100644 --- a/tests/providers/apache/druid/hooks/test_druid.py +++ b/tests/providers/apache/druid/hooks/test_druid.py @@ -65,6 +65,7 @@ def test_submit_gone_wrong(self, requests_mock): def test_submit_ok(self, requests_mock): task_post = requests_mock.post( "http://druid-overlord:8081/druid/indexer/v1/task", + status_code=200, text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}', ) status_check = requests_mock.get( @@ -81,6 +82,7 @@ def test_submit_ok(self, requests_mock): def test_submit_sql_based_ingestion_ok(self, requests_mock): task_post = requests_mock.post( "http://druid-overlord:8081/druid/v2/sql/task", + status_code=202, text='{"taskId":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}', ) status_check = requests_mock.get(