Skip to content

Commit

Permalink
[AIRFLOW-6485] BigQuery hook - add missing test for BIgQueryBaseCurso…
Browse files Browse the repository at this point in the history
…r methods (#7077)
  • Loading branch information
TobKed authored and potiuk committed Jan 6, 2020
1 parent e863f8a commit 6cbedf8
Showing 1 changed file with 149 additions and 0 deletions.
149 changes: 149 additions & 0 deletions tests/gcp/hooks/test_bigquery.py
Expand Up @@ -850,6 +850,155 @@ def test_get_dataset_tables_list(self, mock_get_service, mock_project_id):

self.assertEqual(table_list, result)

@parameterized.expand([
("US", None, True),
(None, None, True),
(
None,
HttpError(resp=type('', (object,), {"status": 500, })(), content=b'Internal Server Error'),
False
),
(
None,
HttpError(resp=type('', (object,), {"status": 503, })(), content=b'Service Unavailable'),
False
),
])
@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
def test_poll_job_complete_pass(
self, location, exception, expected_result, mock_get_service, mock_get_creds_and_proj_id
):
method_jobs = mock_get_service.return_value.jobs
method_get = method_jobs.return_value.get
method_execute = method_get.return_value.execute
method_execute.return_value = {"status": {"state": "DONE"}}
method_execute.side_effect = exception

hook_params = {"location": location} if location else {}
bq_hook = hook.BigQueryHook(**hook_params)
cursor = bq_hook.get_cursor()

result = cursor.poll_job_complete(JOB_ID)
self.assertEqual(expected_result, result)
method_get.assert_called_once_with(projectId=PROJECT_ID, jobId=JOB_ID, **hook_params)
assert method_jobs.call_count == 1
assert method_get.call_count == 1
assert method_execute.call_count == 1

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
def test_pull_job_complete_on_fails(self, mock_get_service, mock_get_creds_and_proj_id):
method_jobs = mock_get_service.return_value.jobs
method_get = method_jobs.return_value.get
method_execute = method_get.return_value.execute
resp = type('', (object,), {"status": 404, "reason": "Not Found"})()
method_execute.side_effect = HttpError(resp=resp, content=b'Not Found')

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
with self.assertRaisesRegex(AirflowException, "HttpError 404 \"Not Found\""):
cursor.poll_job_complete(JOB_ID)

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryBaseCursor.poll_job_complete")
@mock.patch("logging.Logger.info")
def test_cancel_query_np_jobs_to_cancel(
self, mock_logger_info, poll_job_complete, mock_get_service, mock_get_creds_and_proj_id
):
method_jobs = mock_get_service.return_value.jobs
poll_job_complete.return_value = True

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
cursor.running_job_id = JOB_ID
cursor.cancel_query()
assert method_jobs.call_count == 1
assert poll_job_complete.call_count == 1
mock_logger_info.has_call(mock.call("No running BigQuery jobs to cancel."))

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryBaseCursor.poll_job_complete")
@mock.patch("time.sleep")
@mock.patch("logging.Logger.info")
def test_cancel_query_np_cancel_timeout(
self, mock_logger_info, mock_sleep, poll_job_complete, mock_get_service, mock_get_creds_and_proj_id
):
method_jobs = mock_get_service.return_value.jobs
method_jobs_cancel = method_jobs.return_value.cancel
poll_job_complete.side_effect = [False] * 13

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
cursor.running_job_id = JOB_ID
cursor.cancel_query()
assert method_jobs.call_count == 1
assert method_jobs_cancel.call_count == 1
assert poll_job_complete.call_count == 13
assert mock_sleep.call_count == 11
mock_logger_info.has_call(
mock.call("Stopping polling due to timeout. Job with id {} "
"has not completed cancel and may or may not finish.".format(JOB_ID))
)

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryBaseCursor.poll_job_complete")
@mock.patch("time.sleep")
@mock.patch("logging.Logger.info")
def test_cancel_query_np_cancel_completed(
self, mock_logger_info, mock_sleep, poll_job_complete, mock_get_service, mock_get_creds_and_proj_id
):
method_jobs = mock_get_service.return_value.jobs
method_jobs_cancel = method_jobs.return_value.cancel
poll_job_complete.side_effect = [False] * 12 + [True]

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
cursor.running_job_id = JOB_ID
cursor.cancel_query()
assert method_jobs.call_count == 1
assert method_jobs_cancel.call_count == 1
assert poll_job_complete.call_count == 13
assert mock_sleep.call_count == 11
mock_logger_info.has_call(mock.call("Job successfully canceled: {}, {}".format(PROJECT_ID, JOB_ID)))

@mock.patch(
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID)
)
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
def test_get_schema(self, mock_get_service, mock_get_creds_and_proj_id):
schema = "SCHEMA"
method_get = mock_get_service.return_value.tables.return_value.get
method_execute = method_get.return_value.execute
method_execute.return_value = {"schema": schema}

bq_hook = hook.BigQueryHook()
cursor = bq_hook.get_cursor()
result = cursor.get_schema(dataset_id=DATASET_ID, table_id=TABLE_ID)

method_get.assert_called_once_with(projectId=PROJECT_ID, datasetId=DATASET_ID, tableId=TABLE_ID)
assert method_execute.call_count == 1
self.assertEqual(schema, result)


class TestTableDataOperations(unittest.TestCase):
@mock.patch(
Expand Down

0 comments on commit 6cbedf8

Please sign in to comment.