Skip to content

Commit

Permalink
fix: BigQuery job error message (#34208)
Browse files Browse the repository at this point in the history
* Modified output to return a dict also containing a job message

* Modified async functions to handle the dict response from `get_job_status`

* Updated expected states to reflect dict output

* Updated tests to check dictionary output
  • Loading branch information
nathadfield committed Sep 9, 2023
1 parent f5c2748 commit 774125a
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 62 deletions.
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/hooks/bigquery.py
Expand Up @@ -3099,16 +3099,16 @@ async def get_job_instance(
with await self.service_file_as_context() as f:
return Job(job_id=job_id, project=project_id, service_file=f, session=cast(Session, session))

async def get_job_status(self, job_id: str | None, project_id: str | None = None) -> str:
async def get_job_status(self, job_id: str | None, project_id: str | None = None) -> dict[str, str]:
async with ClientSession() as s:
job_client = await self.get_job_instance(project_id, job_id, s)
job = await job_client.get_job()
status = job.get("status", {})
if status["state"] == "DONE":
if "errorResult" in status:
return "error"
return "success"
return status["state"].lower()
return {"status": "error", "message": status["errorResult"]["message"]}
return {"status": "success", "message": "Job completed"}
return {"status": status["state"].lower(), "message": "Job running"}

async def get_job_output(
self,
Expand Down
74 changes: 44 additions & 30 deletions airflow/providers/google/cloud/triggers/bigquery.py
Expand Up @@ -72,27 +72,28 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent."""
"""Gets current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
try:
while True:
job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
if job_status == "success":
if job_status["status"] == "success":
yield TriggerEvent(
{
"job_id": self.job_id,
"status": job_status,
"message": "Job completed",
"status": job_status["status"],
"message": job_status["message"],
}
)
return
elif job_status == "error":
yield TriggerEvent({"status": "error"})
elif job_status["status"] == "error":
yield TriggerEvent(job_status)
return
else:
self.log.info(
"Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
"Bigquery job status is %s. Sleeping for %s seconds.",
job_status["status"],
self.poll_interval,
)
await asyncio.sleep(self.poll_interval)
except Exception as e:
Expand Down Expand Up @@ -127,16 +128,16 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
while True:
# Poll for job execution status
job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
if job_status == "success":
if job_status["status"] == "success":
query_results = await hook.get_job_output(job_id=self.job_id, project_id=self.project_id)

records = hook.get_records(query_results)

# If empty list, then no records are available
if not records:
yield TriggerEvent(
{
"status": "success",
"status": job_status["status"],
"message": job_status["message"],
"records": None,
}
)
Expand All @@ -146,17 +147,20 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
first_record = records.pop(0)
yield TriggerEvent(
{
"status": "success",
"status": job_status["status"],
"message": job_status["message"],
"records": first_record,
}
)
return
elif job_status == "error":
yield TriggerEvent({"status": "error", "message": job_status})
elif job_status["status"] == "error":
yield TriggerEvent({"status": "error", "message": job_status["message"]})
return
else:
self.log.info(
"Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
"Bigquery job status is %s. Sleeping for %s seconds.",
job_status["status"],
self.poll_interval,
)
await asyncio.sleep(self.poll_interval)
except Exception as e:
Expand Down Expand Up @@ -198,24 +202,26 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
while True:
# Poll for job execution status
job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
if job_status == "success":
if job_status["status"] == "success":
query_results = await hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
records = hook.get_records(query_results=query_results, as_dict=self.as_dict)
self.log.debug("Response from hook: %s", job_status)
self.log.debug("Response from hook: %s", job_status["status"])
yield TriggerEvent(
{
"status": "success",
"message": job_status,
"message": job_status["message"],
"records": records,
}
)
return
elif job_status == "error":
yield TriggerEvent({"status": "error"})
elif job_status["status"] == "error":
yield TriggerEvent(job_status)
return
else:
self.log.info(
"Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
"Bigquery job status is %s. Sleeping for %s seconds.",
job_status["status"],
self.poll_interval,
)
await asyncio.sleep(self.poll_interval)
except Exception as e:
Expand Down Expand Up @@ -308,7 +314,10 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
job_id=self.second_job_id, project_id=self.project_id
)

if first_job_response_from_hook == "success" and second_job_response_from_hook == "success":
if (
first_job_response_from_hook["status"] == "success"
and second_job_response_from_hook["status"] == "success"
):
first_query_results = await hook.get_job_output(
job_id=self.first_job_id, project_id=self.project_id
)
Expand Down Expand Up @@ -352,13 +361,16 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
}
)
return
elif first_job_response_from_hook == "pending" or second_job_response_from_hook == "pending":
elif (
first_job_response_from_hook["status"] == "pending"
or second_job_response_from_hook["status"] == "pending"
):
self.log.info("Query is still running...")
self.log.info("Sleeping for %s seconds.", self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent(
{"status": "error", "message": second_job_response_from_hook, "data": None}
{"status": "error", "message": second_job_response_from_hook["message"], "data": None}
)
return

Expand Down Expand Up @@ -430,19 +442,21 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
while True:
# Poll for job execution status
response_from_hook = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
if response_from_hook == "success":
if response_from_hook["status"] == "success":
query_results = await hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
records = hook.get_records(query_results)
records = records.pop(0) if records else None
hook.value_check(self.sql, self.pass_value, records, self.tolerance)
yield TriggerEvent({"status": "success", "message": "Job completed", "records": records})
return
elif response_from_hook == "pending":
elif response_from_hook["status"] == "pending":
self.log.info("Query is still running...")
self.log.info("Sleeping for %s seconds.", self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "error", "message": response_from_hook, "records": None})
yield TriggerEvent(
{"status": "error", "message": response_from_hook["message"], "records": None}
)
return
except Exception as e:
self.log.exception("Exception occurred while checking for query completion")
Expand Down Expand Up @@ -574,8 +588,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
job_id = None
while True:
if job_id is not None:
status = await hook.get_job_status(job_id=job_id, project_id=self.project_id)
if status == "success":
job_status = await hook.get_job_status(job_id=job_id, project_id=self.project_id)
if job_status["status"] == "success":
is_partition = await self._partition_exists(
hook=hook, job_id=job_id, project_id=self.project_id
)
Expand All @@ -588,8 +602,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
)
return
job_id = None
elif status == "error":
yield TriggerEvent({"status": "error", "message": status})
elif job_status["status"] == "error":
yield TriggerEvent(job_status)
return
self.log.info("Sleeping for %s seconds.", self.poll_interval)
await asyncio.sleep(self.poll_interval)
Expand Down
9 changes: 6 additions & 3 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Expand Up @@ -2142,9 +2142,12 @@ async def test_get_job_instance(self, mock_session):
@pytest.mark.parametrize(
"job_status, expected",
[
({"status": {"state": "DONE"}}, "success"),
({"status": {"state": "DONE", "errorResult": "Timeout"}}, "error"),
({"status": {"state": "running"}}, "running"),
({"status": {"state": "DONE"}}, {"status": "success", "message": "Job completed"}),
(
{"status": {"state": "DONE", "errorResult": {"message": "Timeout"}}},
{"status": "error", "message": "Timeout"},
),
({"status": {"state": "running"}}, {"status": "running", "message": "Job running"}),
],
)
@pytest.mark.asyncio
Expand Down

0 comments on commit 774125a

Please sign in to comment.