Skip to content

Commit

Permalink
Fix DataFusionAsyncHook catch 404 (#32855)
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova committed Aug 2, 2023
1 parent 077fa4d commit d9121a7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
31 changes: 19 additions & 12 deletions airflow/providers/google/cloud/hooks/datafusion.py
Expand Up @@ -511,17 +511,25 @@ def _base_url(instance_url: str, namespace: str) -> str:
return urljoin(f"{instance_url}/", f"v3/namespaces/{quote(namespace)}/apps/")

async def _get_link(self, url: str, session):
async with Token(scopes=self.scopes) as token:
session_aio = AioSession(session)
headers = {
"Authorization": f"Bearer {await token.get()}",
}
try:
pipeline = await session_aio.get(url=url, headers=headers)
except AirflowException:
pass # Because the pipeline may not be visible in system yet

return pipeline
# Adding sleep generator to catch 404 in case if pipeline was not retrieved during first attempt.
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
async with Token(scopes=self.scopes) as token:
session_aio = AioSession(session)
headers = {
"Authorization": f"Bearer {await token.get()}",
}
try:
pipeline = await session_aio.get(url=url, headers=headers)
break
except Exception as exc:
if "404" in str(exc):
sleep(time_to_wait)
else:
raise
if pipeline:
return pipeline
else:
raise AirflowException("Could not retrieve pipeline. Aborting.")

async def get_pipeline(
self,
Expand Down Expand Up @@ -567,7 +575,6 @@ async def get_pipeline_status(
pipeline_id=pipeline_id,
session=session,
)
self.log.info("Response pipeline: %s", pipeline)
pipeline = await pipeline.json(content_type=None)
current_pipeline_state = pipeline["status"]

Expand Down
Expand Up @@ -42,21 +42,22 @@
# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
LOCATION = "europe-north1"
DAG_ID = "example_data_fusion"
INSTANCE_NAME = "test-instance"
DAG_ID = "example_datafusion"
INSTANCE_NAME = f"df-{ENV_ID}".replace("_", "-")
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}

BUCKET_NAME_1 = "test-datafusion-1"
BUCKET_NAME_2 = "test-datafusion-2"
BUCKET_NAME_1 = f"bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-")
BUCKET_NAME_2 = f"bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-")
BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"

PIPELINE_NAME = "test-pipe"
PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",
Expand Down
Expand Up @@ -41,21 +41,22 @@
# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
DAG_ID = "example_data_fusion_async"
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_datafusion_async"
LOCATION = "europe-north1"
INSTANCE_NAME = "test-instance-async"
INSTANCE_NAME = f"async-df-{ENV_ID}".replace("_", "-")
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}

BUCKET_NAME_1 = "test-datafusion-async-1"
BUCKET_NAME_2 = "test-datafusion-async-2"
BUCKET_NAME_1 = f"a-bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-")
BUCKET_NAME_2 = f"a-bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-")
BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"

PIPELINE_NAME = "test-pipe"
PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",
Expand Down

0 comments on commit d9121a7

Please sign in to comment.