Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6485] BigQuery hook - add missing tests #7077

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
149 changes: 149 additions & 0 deletions tests/gcp/hooks/test_bigquery.py
Expand Up @@ -850,6 +850,155 @@ def test_get_dataset_tables_list(self, mock_get_service, mock_project_id):

self.assertEqual(table_list, result)

@parameterized.expand([
("US", None, True),
(None, None, True),
(
None,
HttpError(resp=type('', (object,), {"status": 500, })(), content=b'Internal Server Error'),
False
),
(
None,
HttpError(resp=type('', (object,), {"status": 503, })(), content=b'Service Unavailable'),
False
),
])
@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
def test_poll_job_complete_pass(
self, location, exception, expected_result, mock_get_service, mock_get_creds_and_proj_id
):
method_jobs = mock_get_service.return_value.jobs
method_get = method_jobs.return_value.get
method_execute = method_get.return_value.execute
method_execute.return_value = {"status": {"state": "DONE"}}
method_execute.side_effect = exception

hook_params = {"location": location} if location else {}
bq_hook = hook.BigQueryHook(**hook_params)
cursor = bq_hook.get_cursor()

result = cursor.poll_job_complete(JOB_ID)
self.assertEqual(expected_result, result)
method_get.assert_called_once_with(projectId=PROJECT_ID, jobId=JOB_ID, **hook_params)
assert method_jobs.call_count == 1
assert method_get.call_count == 1
assert method_execute.call_count == 1

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
def test_pull_job_complete_on_fails(self, mock_get_service, mock_get_creds_and_proj_id):
method_jobs = mock_get_service.return_value.jobs
method_get = method_jobs.return_value.get
method_execute = method_get.return_value.execute
resp = type('', (object,), {"status": 404, "reason": "Not Found"})()
method_execute.side_effect = HttpError(resp=resp, content=b'Not Found')

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
with self.assertRaisesRegex(AirflowException, "HttpError 404 \"Not Found\""):
cursor.poll_job_complete(JOB_ID)

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryBaseCursor.poll_job_complete")
@mock.patch("logging.Logger.info")
def test_cancel_query_np_jobs_to_cancel(
self, mock_logger_info, poll_job_complete, mock_get_service, mock_get_creds_and_proj_id
):
method_jobs = mock_get_service.return_value.jobs
poll_job_complete.return_value = True

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
cursor.running_job_id = JOB_ID
cursor.cancel_query()
assert method_jobs.call_count == 1
assert poll_job_complete.call_count == 1
mock_logger_info.has_call(mock.call("No running BigQuery jobs to cancel."))

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryBaseCursor.poll_job_complete")
@mock.patch("time.sleep")
@mock.patch("logging.Logger.info")
def test_cancel_query_np_cancel_timeout(
self, mock_logger_info, mock_sleep, poll_job_complete, mock_get_service, mock_get_creds_and_proj_id
):
method_jobs = mock_get_service.return_value.jobs
method_jobs_cancel = method_jobs.return_value.cancel
poll_job_complete.side_effect = [False] * 13

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
cursor.running_job_id = JOB_ID
cursor.cancel_query()
assert method_jobs.call_count == 1
assert method_jobs_cancel.call_count == 1
assert poll_job_complete.call_count == 13
assert mock_sleep.call_count == 11
mock_logger_info.has_call(
mock.call("Stopping polling due to timeout. Job with id {} "
"has not completed cancel and may or may not finish.".format(JOB_ID))
)

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryBaseCursor.poll_job_complete")
@mock.patch("time.sleep")
@mock.patch("logging.Logger.info")
def test_cancel_query_np_cancel_completed(
self, mock_logger_info, mock_sleep, poll_job_complete, mock_get_service, mock_get_creds_and_proj_id
):
method_jobs = mock_get_service.return_value.jobs
method_jobs_cancel = method_jobs.return_value.cancel
poll_job_complete.side_effect = [False] * 12 + [True]

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
cursor.running_job_id = JOB_ID
cursor.cancel_query()
assert method_jobs.call_count == 1
assert method_jobs_cancel.call_count == 1
assert poll_job_complete.call_count == 13
assert mock_sleep.call_count == 11
mock_logger_info.has_call(mock.call("Job successfully canceled: {}, {}".format(PROJECT_ID, JOB_ID)))

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
def test_get_schema(self, mock_get_service, mock_get_creds_and_proj_id):
schema = "SCHEMA"
method_get = mock_get_service.return_value.tables.return_value.get
method_execute = method_get.return_value.execute
method_execute.return_value = {"schema": schema}

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
result = cursor.get_schema(dataset_id=DATASET_ID, table_id=TABLE_ID)

method_get.assert_called_once_with(projectId=PROJECT_ID, datasetId=DATASET_ID, tableId=TABLE_ID)
assert method_execute.call_count == 1
self.assertEqual(schema, result)


class TestTableDataOperations(unittest.TestCase):
@mock.patch(
Expand Down