From 7e94679ccb636e1257f28750acf7478a65f7b78d Mon Sep 17 00:00:00 2001 From: Ryan Yuan Date: Tue, 12 Feb 2019 17:14:34 +1100 Subject: [PATCH 1/2] [AIRFLOW-3874] mprove BigQueryHook.run_with_configuration's location support Assign location from the resp of insert() back to job.get() --- airflow/contrib/hooks/bigquery_hook.py | 11 +++++--- tests/contrib/hooks/test_bigquery_hook.py | 33 +++++++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 9d7c5fb38f4ac..23dfd12c4a89c 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -1204,22 +1204,27 @@ def run_with_configuration(self, configuration): """ jobs = self.service.jobs() job_data = {'configuration': configuration} - + print(1) # Send query and wait for reply. query_reply = jobs \ .insert(projectId=self.project_id, body=job_data) \ .execute() self.running_job_id = query_reply['jobReference']['jobId'] + if 'location' in query_reply['jobReference']: + location = query_reply['jobReference']['location'] + else: + location = self.location + print(location) # Wait for query to finish. keep_polling_job = True while keep_polling_job: try: - if self.location: + if location: job = jobs.get( projectId=self.project_id, jobId=self.running_job_id, - location=self.location).execute() + location=location).execute() else: job = jobs.get( projectId=self.project_id, diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py index 9bae4abb2a642..c0a845ebe7144 100644 --- a/tests/contrib/hooks/test_bigquery_hook.py +++ b/tests/contrib/hooks/test_bigquery_hook.py @@ -823,5 +823,38 @@ def test_location_propagates_properly(self, run_with_config): self.assertEqual(bq_cursor.location, 'US') +class TestBigQueryHookRunWithConfiguration(unittest.TestCase): + def test_run_with_configuration_location(self): + project_id = 'bq-project' + running_job_id = 'job_vjdi28vskdui2onru23' + location = 'asia-east1' + + mock_service = mock.Mock() + method = (mock_service.jobs.return_value.get) + + mock_service.jobs.return_value.insert.return_value.execute.return_value = { + 'jobReference': { + 'jobId': running_job_id, + 'location': location + } + } + + mock_service.jobs.return_value.get.return_value.execute.return_value = { + 'status': { + 'state': 'DONE' + } + } + + cursor = hook.BigQueryBaseCursor(mock_service, project_id) + cursor.running_job_id = running_job_id + cursor.run_with_configuration({}) + + method.assert_called_once_with( + projectId=project_id, + jobId=running_job_id, + location=location + ) + + if __name__ == '__main__': unittest.main() From 1dea85dce0b06857b3dd73a768e562e30b8bb466 Mon Sep 17 00:00:00 2001 From: Ryan Yuan Date: Tue, 12 Feb 2019 19:40:09 +1100 Subject: [PATCH 2/2] [AIRFLOW-3874] Improve BigQueryHook.run_with_configuration's location support Remove unused code --- airflow/contrib/hooks/bigquery_hook.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 23dfd12c4a89c..be7afd83c1636 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -1204,7 +1204,7 @@ def run_with_configuration(self, configuration): """ jobs = self.service.jobs() job_data = {'configuration': configuration} - print(1) + # Send query and wait for reply. query_reply = jobs \ .insert(projectId=self.project_id, body=job_data) \ @@ -1214,7 +1214,6 @@ def run_with_configuration(self, configuration): location = query_reply['jobReference']['location'] else: location = self.location - print(location) # Wait for query to finish. keep_polling_job = True