Skip to content

Commit

Permalink
[AIRFLOW-3874] Improve BigQueryHook.run_with_configuration's location…
Browse files Browse the repository at this point in the history
… support (#4695)

* [AIRFLOW-3874] mprove BigQueryHook.run_with_configuration's location support

Assign location from the resp of insert() back to job.get()

* [AIRFLOW-3874] Improve BigQueryHook.run_with_configuration's location support

Remove unused code
  • Loading branch information
ryanyuan authored and Fokko committed Feb 12, 2019
1 parent 4d34632 commit 1c06c74
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
8 changes: 6 additions & 2 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -1210,16 +1210,20 @@ def run_with_configuration(self, configuration):
.insert(projectId=self.project_id, body=job_data) \
.execute()
self.running_job_id = query_reply['jobReference']['jobId']
if 'location' in query_reply['jobReference']:
location = query_reply['jobReference']['location']
else:
location = self.location

# Wait for query to finish.
keep_polling_job = True
while keep_polling_job:
try:
if self.location:
if location:
job = jobs.get(
projectId=self.project_id,
jobId=self.running_job_id,
location=self.location).execute()
location=location).execute()
else:
job = jobs.get(
projectId=self.project_id,
Expand Down
33 changes: 33 additions & 0 deletions tests/contrib/hooks/test_bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,5 +823,38 @@ def test_location_propagates_properly(self, run_with_config):
self.assertEqual(bq_cursor.location, 'US')


class TestBigQueryHookRunWithConfiguration(unittest.TestCase):
def test_run_with_configuration_location(self):
project_id = 'bq-project'
running_job_id = 'job_vjdi28vskdui2onru23'
location = 'asia-east1'

mock_service = mock.Mock()
method = (mock_service.jobs.return_value.get)

mock_service.jobs.return_value.insert.return_value.execute.return_value = {
'jobReference': {
'jobId': running_job_id,
'location': location
}
}

mock_service.jobs.return_value.get.return_value.execute.return_value = {
'status': {
'state': 'DONE'
}
}

cursor = hook.BigQueryBaseCursor(mock_service, project_id)
cursor.running_job_id = running_job_id
cursor.run_with_configuration({})

method.assert_called_once_with(
projectId=project_id,
jobId=running_job_id,
location=location
)


if __name__ == '__main__':
unittest.main()

0 comments on commit 1c06c74

Please sign in to comment.