diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index a0cb71dabbde2..900ec129e028a 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -470,21 +470,32 @@ def run_with_configuration(self, configuration): .insert(projectId=self.project_id, body=job_data) \ .execute() job_id = query_reply['jobReference']['jobId'] - job = jobs.get(projectId=self.project_id, jobId=job_id).execute() # Wait for query to finish. - while not job['status']['state'] == 'DONE': - logging.info('Waiting for job to complete: %s, %s', self.project_id, job_id) - time.sleep(5) - job = jobs.get(projectId=self.project_id, jobId=job_id).execute() - - # Check if job had errors. - if 'errorResult' in job['status']: - raise Exception( - 'BigQuery job failed. Final error was: {}. The job was: {}'.format( - job['status']['errorResult'], job - ) - ) + keep_polling_job = True + while (keep_polling_job): + try: + job = jobs.get(projectId=self.project_id, jobId=job_id).execute() + if (job['status']['state'] == 'DONE'): + keep_polling_job = False + # Check if job had errors. + if 'errorResult' in job['status']: + raise Exception( + 'BigQuery job failed. Final error was: {}. The job was: {}'.format( + job['status']['errorResult'], job + ) + ) + else: + logging.info('Waiting for job to complete : %s, %s', self.project_id, job_id) + time.sleep(5) + + except HttpError, err: + if err.code in [500, 503]: + logging.info('%s: Retryable error, waiting for job to complete: %s', err.code, job_id) + time.sleep(5) + else: + raise Exception( + 'BigQuery job status check faild. Final error was: %s', err.code) return job_id