diff --git a/airflow/providers/google/cloud/hooks/datafusion.py b/airflow/providers/google/cloud/hooks/datafusion.py index de426bc242082..38b06602120d5 100644 --- a/airflow/providers/google/cloud/hooks/datafusion.py +++ b/airflow/providers/google/cloud/hooks/datafusion.py @@ -39,6 +39,12 @@ Operation = Dict[str, Any] +class ConflictException(AirflowException): + """Exception to catch 409 error.""" + + pass + + class PipelineStates: """Data Fusion pipeline states.""" @@ -163,6 +169,8 @@ def _cdap_request( def _check_response_status_and_data(response, message: str) -> None: if response.status == 404: raise AirflowNotFoundException(message) + elif response.status == 409: + raise ConflictException("Conflict: Resource is still in use.") elif response.status != 200: raise AirflowException(message) if response.data is None: @@ -356,21 +364,18 @@ def delete_pipeline( if version_id: url = os.path.join(url, "versions", version_id) - response = self._cdap_request(url=url, method="DELETE", body=None) - # Check for 409 error: the previous step for starting/stopping pipeline could still be in progress. - # Waiting some time before retry. - for time_to_wait in exponential_sleep_generator(initial=10, maximum=120): + for time_to_wait in exponential_sleep_generator(initial=1, maximum=10): try: + response = self._cdap_request(url=url, method="DELETE", body=None) self._check_response_status_and_data( response, f"Deleting a pipeline failed with code {response.status}: {response.data}" ) - break - except AirflowException as exc: - if "409" in str(exc): - sleep(time_to_wait) - response = self._cdap_request(url=url, method="DELETE", body=None) - else: - raise + if response.status == 200: + break + except ConflictException as exc: + self.log.info(exc) + sleep(time_to_wait) + continue def list_pipelines( self, diff --git a/tests/providers/google/cloud/hooks/test_datafusion.py b/tests/providers/google/cloud/hooks/test_datafusion.py index 58e1c048d0b9c..50ea6b19d0859 100644 --- a/tests/providers/google/cloud/hooks/test_datafusion.py +++ b/tests/providers/google/cloud/hooks/test_datafusion.py @@ -52,6 +52,12 @@ ) +class MockResponse: + def __init__(self, status, data=None): + self.status = status + self.data = data + + @pytest.fixture def hook(): with mock.patch( @@ -255,6 +261,22 @@ def test_delete_pipeline_should_fail_if_status_not_200(self, mock_request, hook) body=None, ) + @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request")) + def test_delete_pipeline_should_fail_if_status_409(self, mock_request, hook, caplog): + mock_request.side_effect = [ + MockResponse(status=409, data="Conflict: Resource is still in use."), + MockResponse(status=200, data="Success"), + ] + hook.delete_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL) + + assert mock_request.call_count == 2 + assert "Conflict: Resource is still in use." in caplog.text + mock_request.assert_called_with( + url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}", + method="DELETE", + body=None, + ) + @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request")) def test_list_pipelines(self, mock_request, hook): data = {"data": "test"}