Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Apr 15, 2024
1 parent fa17f5b commit 1b234c4
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3399,7 +3399,7 @@ async def cancel_job(self, job_id: str, project_id: str | None, location: str |
"""
async with ClientSession() as session:
token = await self.get_token(session=session)
job = Job(job_id=job_id, project=project_id, location=location, token=token, session=session) # type: ignore
job = Job(job_id=job_id, project=project_id, location=location, token=token, session=session) # type: ignore[arg-type]

self.log.info(
"Attempting to cancel BigQuery job: %s in project: %s, location: %s",
Expand Down
1 change: 1 addition & 0 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,7 @@ async def test_get_job_output_assert_once_with(self, mock_job_instance):
assert resp == response

@pytest.mark.asyncio
@pytest.mark.db_test
@mock.patch("google.auth.default")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Job")
async def test_cancel_job_success(self, mock_job, mock_auth_default):
Expand Down
35 changes: 35 additions & 0 deletions tests/providers/google/cloud/triggers/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,41 @@ async def test_bigquery_op_trigger_exception(self, mock_job_status, caplog, inse
actual = await generator.asend(None)
assert TriggerEvent({"status": "error", "message": "Test exception"}) == actual

@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.cancel_job")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_status")
async def test_bigquery_insert_job_trigger_cancellation(
self, mock_get_job_status, mock_cancel_job, caplog, insert_job_trigger
):
"""
Test that BigQueryInsertJobTrigger handles cancellation correctly, logs the appropriate message,
and conditionally cancels the job based on the `cancel_on_kill` attribute.
"""
insert_job_trigger.cancel_on_kill = True
insert_job_trigger.job_id = "1234"

mock_get_job_status.side_effect = [
{"status": "running", "message": "Job is still running"},
asyncio.CancelledError(),
]

mock_cancel_job.return_value = asyncio.Future()
mock_cancel_job.return_value.set_result(None)

caplog.set_level(logging.INFO)

try:
async for _ in insert_job_trigger.run():
pass
except asyncio.CancelledError:
pass

assert (
"Task was killed" in caplog.text
or "Bigquery job status is running. Sleeping for 4.0 seconds." in caplog.text
), "Expected messages about task status or cancellation not found in log."
mock_cancel_job.assert_awaited_once()


class TestBigQueryGetDataTrigger:
def test_bigquery_get_data_trigger_serialization(self, get_data_trigger):
Expand Down

0 comments on commit 1b234c4

Please sign in to comment.