Skip to content

Commit

Permalink
Fix catching 409 error (#33173)
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova committed Aug 7, 2023
1 parent 8542cdd commit 0e076dc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
27 changes: 16 additions & 11 deletions airflow/providers/google/cloud/hooks/datafusion.py
Expand Up @@ -39,6 +39,12 @@
Operation = Dict[str, Any]


class ConflictException(AirflowException):
"""Exception to catch 409 error."""

pass


class PipelineStates:
"""Data Fusion pipeline states."""

Expand Down Expand Up @@ -163,6 +169,8 @@ def _cdap_request(
def _check_response_status_and_data(response, message: str) -> None:
if response.status == 404:
raise AirflowNotFoundException(message)
elif response.status == 409:
raise ConflictException("Conflict: Resource is still in use.")
elif response.status != 200:
raise AirflowException(message)
if response.data is None:
Expand Down Expand Up @@ -356,21 +364,18 @@ def delete_pipeline(
if version_id:
url = os.path.join(url, "versions", version_id)

response = self._cdap_request(url=url, method="DELETE", body=None)
# Check for 409 error: the previous step for starting/stopping pipeline could still be in progress.
# Waiting some time before retry.
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
for time_to_wait in exponential_sleep_generator(initial=1, maximum=10):
try:
response = self._cdap_request(url=url, method="DELETE", body=None)
self._check_response_status_and_data(
response, f"Deleting a pipeline failed with code {response.status}: {response.data}"
)
break
except AirflowException as exc:
if "409" in str(exc):
sleep(time_to_wait)
response = self._cdap_request(url=url, method="DELETE", body=None)
else:
raise
if response.status == 200:
break
except ConflictException as exc:
self.log.info(exc)
sleep(time_to_wait)
continue

def list_pipelines(
self,
Expand Down
22 changes: 22 additions & 0 deletions tests/providers/google/cloud/hooks/test_datafusion.py
Expand Up @@ -52,6 +52,12 @@
)


class MockResponse:
def __init__(self, status, data=None):
self.status = status
self.data = data


@pytest.fixture
def hook():
with mock.patch(
Expand Down Expand Up @@ -255,6 +261,22 @@ def test_delete_pipeline_should_fail_if_status_not_200(self, mock_request, hook)
body=None,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_delete_pipeline_should_fail_if_status_409(self, mock_request, hook, caplog):
mock_request.side_effect = [
MockResponse(status=409, data="Conflict: Resource is still in use."),
MockResponse(status=200, data="Success"),
]
hook.delete_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)

assert mock_request.call_count == 2
assert "Conflict: Resource is still in use." in caplog.text
mock_request.assert_called_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
method="DELETE",
body=None,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_list_pipelines(self, mock_request, hook):
data = {"data": "test"}
Expand Down

0 comments on commit 0e076dc

Please sign in to comment.