From 7dec3204a0e35cf490065e9785101843299504d8 Mon Sep 17 00:00:00 2001 From: Krishnaveni Mettu Date: Thu, 8 Dec 2016 14:38:50 -0800 Subject: [PATCH 1/4] Handle BigQuery 503 error added retry feature if BigQuery error is 500 or 503 set the retry time for 5 seconds --- airflow/contrib/hooks/bigquery_hook.py | 35 +++++++++++++++++--------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index a0cb71dabbde2..8d7918441bb79 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -473,18 +473,29 @@ def run_with_configuration(self, configuration): job = jobs.get(projectId=self.project_id, jobId=job_id).execute() # Wait for query to finish. - while not job['status']['state'] == 'DONE': - logging.info('Waiting for job to complete: %s, %s', self.project_id, job_id) - time.sleep(5) - job = jobs.get(projectId=self.project_id, jobId=job_id).execute() - - # Check if job had errors. - if 'errorResult' in job['status']: - raise Exception( - 'BigQuery job failed. Final error was: {}. The job was: {}'.format( - job['status']['errorResult'], job - ) - ) + job_status_flag = True + while (job_status_flag): + try: + job = jobs.get(projectId=self.project_id, jobId=job_id).execute() + if (job['status']['state'] == 'DONE'): + job_status_flag = False + # Check if job had errors. + if 'errorResult' in job['status']: + raise Exception( + 'BigQuery job failed. Final error was: {}. The job was: {}'.format( + job['status']['errorResult'], job + ) + ) + else: + logging.info('Waiting for job to complete : %s, %s', self.project_id, job_id) + time.sleep(5) + + except HTTPError, err: + if err.code in [500, 503]: + logging.info('%s: Retryable error, waiting for job to complete: %s',err.code, job_id) + time.sleep(5) + else:raise Exception( + 'BigQuery job status check faild. Final error was: %s', err.code) return job_id From 70f46dd288af2ef7ed9ff2e5498974ade8fc399e Mon Sep 17 00:00:00 2001 From: Krishnaveni Mettu Date: Fri, 9 Dec 2016 12:09:01 -0800 Subject: [PATCH 2/4] typo in the error class name --- airflow/contrib/hooks/bigquery_hook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 8d7918441bb79..10479c0deb5db 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -490,7 +490,7 @@ def run_with_configuration(self, configuration): logging.info('Waiting for job to complete : %s, %s', self.project_id, job_id) time.sleep(5) - except HTTPError, err: + except HttpError, err: if err.code in [500, 503]: logging.info('%s: Retryable error, waiting for job to complete: %s',err.code, job_id) time.sleep(5) From 542fc50642cff941315a4461cfa2632755c90b42 Mon Sep 17 00:00:00 2001 From: Krishnaveni Mettu Date: Tue, 13 Dec 2016 11:10:27 -0800 Subject: [PATCH 3/4] Fix comments from code review fixed style errors and renamed variable --- airflow/contrib/hooks/bigquery_hook.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 10479c0deb5db..b3265e47e0030 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -473,12 +473,12 @@ def run_with_configuration(self, configuration): job = jobs.get(projectId=self.project_id, jobId=job_id).execute() # Wait for query to finish. - job_status_flag = True - while (job_status_flag): + keep_polling_job = True + while (keep_polling_job): try: job = jobs.get(projectId=self.project_id, jobId=job_id).execute() if (job['status']['state'] == 'DONE'): - job_status_flag = False + keep_polling_job = False # Check if job had errors. if 'errorResult' in job['status']: raise Exception( @@ -492,9 +492,10 @@ def run_with_configuration(self, configuration): except HttpError, err: if err.code in [500, 503]: - logging.info('%s: Retryable error, waiting for job to complete: %s',err.code, job_id) + logging.info('%s: Retryable error, waiting for job to complete: %s', err.code, job_id) time.sleep(5) - else:raise Exception( + else: + raise Exception( 'BigQuery job status check faild. Final error was: %s', err.code) return job_id From 29587cb8713fc7df195a15c00007cb92b2365505 Mon Sep 17 00:00:00 2001 From: Krishnaveni Mettu Date: Tue, 13 Dec 2016 13:07:30 -0800 Subject: [PATCH 4/4] bug fix - make sure jobs.get in inside try catch loop --- airflow/contrib/hooks/bigquery_hook.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index b3265e47e0030..900ec129e028a 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -470,7 +470,6 @@ def run_with_configuration(self, configuration): .insert(projectId=self.project_id, body=job_data) \ .execute() job_id = query_reply['jobReference']['jobId'] - job = jobs.get(projectId=self.project_id, jobId=job_id).execute() # Wait for query to finish. keep_polling_job = True