From cc6244818c47b661af45de383f816fc53449271f Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Wed, 28 Jul 2021 16:54:08 +0000 Subject: [PATCH 01/22] initial stab --- google/cloud/bigquery/client.py | 67 +++++++++++++++++++----------- google/cloud/bigquery/job/query.py | 15 +++++++ 2 files changed, 58 insertions(+), 24 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 742ecac2e..c5890122f 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -3206,7 +3206,7 @@ def query( class. """ job_id_given = job_id is not None - job_id = _make_job_id(job_id, job_id_prefix) + job_id_save = job_id if project is None: project = self.project @@ -3214,8 +3214,6 @@ def query( if location is None: location = self.location - job_config = copy.deepcopy(job_config) - if self._default_query_job_config: if job_config: _verify_job_config_type( @@ -3225,6 +3223,8 @@ def query( # that is in the default, # should be filled in with the default # the incoming therefore has precedence + # + # Note that _fill_from_default doesn't mutate the receiver job_config = job_config._fill_from_default( self._default_query_job_config ) @@ -3233,34 +3233,53 @@ def query( self._default_query_job_config, google.cloud.bigquery.job.QueryJobConfig, ) - job_config = copy.deepcopy(self._default_query_job_config) + job_config = self._default_query_job_config - job_ref = job._JobReference(job_id, project=project, location=location) - query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config) + # Note that we haven't modified the original job_config (or + # _default_query_job_config) up to this point. + job_config_save = job_config - try: - query_job._begin(retry=retry, timeout=timeout) - except core_exceptions.Conflict as create_exc: - # The thought is if someone is providing their own job IDs and they get - # their job ID generation wrong, this could end up returning results for - # the wrong query. We thus only try to recover if job ID was not given. - if job_id_given: - raise create_exc + def do_query(): + # Make a copy now, so that original doesn't get changed by the process + # below and to facilitate retry + job_config = copy.deepcopy(job_config_save) + + job_id = _make_job_id(job_id_save, job_id_prefix) + job_ref = job._JobReference(job_id, project=project, location=location) + query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config) try: - query_job = self.get_job( - job_id, - project=project, - location=location, - retry=retry, - timeout=timeout, - ) - except core_exceptions.GoogleAPIError: # (includes RetryError) - raise create_exc + query_job._begin(retry=retry, timeout=timeout) + except core_exceptions.Conflict as create_exc: + # The thought is if someone is providing their own job IDs and they get + # their job ID generation wrong, this could end up returning results for + # the wrong query. We thus only try to recover if job ID was not given. + if job_id_given: + raise create_exc + + try: + query_job = self.get_job( + job_id, + project=project, + location=location, + retry=retry, + timeout=timeout, + ) + except core_exceptions.GoogleAPIError: # (includes RetryError) + raise create_exc + else: + return query_job else: return query_job + + if job_id_given: + # Can't retry (at this level) + future = do_query() else: - return query_job + future = retry(do_query)() + future.retry_do_query = do_query # in case we have to retry later + + return future def insert_rows( self, diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 2cb7ee28e..b0e309966 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1307,6 +1307,21 @@ def result( # set the self._query_results cache. self._reload_query_results(retry=retry, timeout=timeout) except exceptions.GoogleAPICallError as exc: + if retry is not None: + retry_do_query = getattr(self, 'retry_do_query', None) + if retry_do_query is not None: + print('RETRY', id(retry), 'IN RESULT', self.job_id, self.query) + @retry + def retry_query_and_result(): + job = retry_do_query() + self._properties["jobReference"] = ( + job._properties["jobReference"]) + result = job.result() + self._query_results = job._query_results + return result + + return retry_query_and_result() + exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self raise From 61e1c38c8f156c13b771108caf94a2f994ff31f6 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Wed, 28 Jul 2021 17:00:03 +0000 Subject: [PATCH 02/22] test start --- tests/unit/test_query_retry.py | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 tests/unit/test_query_retry.py diff --git a/tests/unit/test_query_retry.py b/tests/unit/test_query_retry.py new file mode 100644 index 000000000..97e3beaab --- /dev/null +++ b/tests/unit/test_query_retry.py @@ -0,0 +1,11 @@ +from .helpers import make_connection + +def test_rateLimitExceeded_in__begin(client): + err = dict(reason='rateLimitExceeded') + client._connection = conn = make_connection( + dict(status=dict(status='DONE', errors=[err], errorResult=err)), + dict(status=dict(status='DONE', errors=[err], errorResult=err)), + dict(status=dict(status='DONE')), + ) + breakpoint() + client.query("select 1") From 093f9b4348f2ef9d8330747dc4922c7174e58f4d Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Wed, 28 Jul 2021 18:40:35 -0400 Subject: [PATCH 03/22] Test high-level retry behavior. --- tests/unit/test_query_retry.py | 60 +++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/tests/unit/test_query_retry.py b/tests/unit/test_query_retry.py index 97e3beaab..344c84970 100644 --- a/tests/unit/test_query_retry.py +++ b/tests/unit/test_query_retry.py @@ -1,11 +1,55 @@ +import mock + from .helpers import make_connection -def test_rateLimitExceeded_in__begin(client): + +@mock.patch('time.sleep') +def test_retry_failed_jobs(sleep, client): + """ + Test retry of job failures, as opposed to API-invocation failures. + """ err = dict(reason='rateLimitExceeded') - client._connection = conn = make_connection( - dict(status=dict(status='DONE', errors=[err], errorResult=err)), - dict(status=dict(status='DONE', errors=[err], errorResult=err)), - dict(status=dict(status='DONE')), - ) - breakpoint() - client.query("select 1") + responses = [ + dict(status=dict(state='DONE', errors=[err], errorResult=err)), + dict(status=dict(state='DONE', errors=[err], errorResult=err)), + dict(status=dict(state='DONE', errors=[err], errorResult=err)), + dict(status=dict(state='DONE')), + dict(rows=[{'f': [{'v': '1'}]}], totalRows='1'), + ] + + def api_request(method, path, query_params=None, data=None, **kw): + response = responses.pop(0) + if data: + response['jobReference'] = data['jobReference'] + else: + response['jobReference'] = dict(jobId=path.split('/')[-1], + projectId='PROJECT') + print(response) + return response + + conn = client._connection = make_connection() + conn.api_request.side_effect = api_request + + job = client.query("select 1") + orig_job_id = job.job_id + result = job.result() + assert result.total_rows == 1 + assert not responses # We made all the calls we expected to. + + # The job adjusts it's job id based on the id of the last attempt. + assert job.job_id != orig_job_id + assert job.job_id == conn.mock_calls[3][2]['data']['jobReference']['jobId'] + + # each of the first four calls was for a different job. + assert len(set(call[2]['data']['jobReference']['jobId'] + for call in conn.mock_calls[:4])) == 4 + + # We had to sleep three times + assert len(sleep.mock_calls) == 3 + + # Sleeps are random, however they're more than 0 + assert min(c[1][0] for c in sleep.mock_calls) > 0 + + # They're at most 2 * (multiplier**(number of sleeps - 1)) * initial + # The default multiplier is 2 + assert max(c[1][0] for c in sleep.mock_calls) <= 8 From b2ce74b45a8ef2992cda4039bf4611374931b659 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Wed, 28 Jul 2021 18:41:41 -0400 Subject: [PATCH 04/22] Don't retry here Retry in the quert job's result method. --- google/cloud/bigquery/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index c5890122f..a283d8bbd 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -3272,11 +3272,11 @@ def do_query(): else: return query_job - if job_id_given: - # Can't retry (at this level) - future = do_query() - else: - future = retry(do_query)() + future = do_query() + # The future might be in a failed state now, but if it's + # unrecoverable, we'll find out when we ask for it's result, at which + # point, we may retry. + if not job_id_given: future.retry_do_query = do_query # in case we have to retry later return future From d930c83387b645ba8ac3952c9bb5772cd8b3d124 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Wed, 28 Jul 2021 18:44:53 -0400 Subject: [PATCH 05/22] reworked the retry logic. --- google/cloud/bigquery/job/query.py | 50 ++++++++++++++++++------------ 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index b0e309966..5d7afde8c 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1300,28 +1300,38 @@ def result( If the job did not complete in the given timeout. """ try: - super(QueryJob, self).result(retry=retry, timeout=timeout) + retry_do_query = getattr(self, 'retry_do_query', None) + first = True + sub_retry = retry if retry_do_query is None else None + def do_get_result(): + nonlocal first - # Since the job could already be "done" (e.g. got a finished job - # via client.get_job), the superclass call to done() might not - # set the self._query_results cache. - self._reload_query_results(retry=retry, timeout=timeout) - except exceptions.GoogleAPICallError as exc: - if retry is not None: - retry_do_query = getattr(self, 'retry_do_query', None) - if retry_do_query is not None: - print('RETRY', id(retry), 'IN RESULT', self.job_id, self.query) - @retry - def retry_query_and_result(): - job = retry_do_query() - self._properties["jobReference"] = ( - job._properties["jobReference"]) - result = job.result() - self._query_results = job._query_results - return result - - return retry_query_and_result() + if first: + first = False + else: + # Note that we won't get here if retry_do_query is + # None, because we won't use a retry. + + # The orinal job is failed. Create a new one. + job = retry_do_query() + + # Become the new job: + self.__dict__.clear() + self.__dict__.update(job.__dict__) + + super(QueryJob, self).result(retry=sub_retry, timeout=timeout) + # Since the job could already be "done" (e.g. got a finished job + # via client.get_job), the superclass call to done() might not + # set the self._query_results cache. + self._reload_query_results(retry=sub_retry, timeout=timeout) + + if retry is not None and retry_do_query is not None: + do_get_result = retry(do_get_result) + + do_get_result() + + except exceptions.GoogleAPICallError as exc: exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self raise From bf7b4c635e67425ca2fed4bca530de53153c46c1 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 29 Jul 2021 11:12:24 -0400 Subject: [PATCH 06/22] if, when retrying, the new query job is complete and errorer, stop right away There's no point in continuing to make calls. --- google/cloud/bigquery/job/query.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 5d7afde8c..ddf6b1c81 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1315,6 +1315,10 @@ def do_get_result(): # The orinal job is failed. Create a new one. job = retry_do_query() + # If it's already failed, we might as well stop: + if job.done() and job.exception() is not None: + raise job.exception() + # Become the new job: self.__dict__.clear() self.__dict__.update(job.__dict__) From 0598197864b0602f8743c2adeb66ef0286a9568e Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 29 Jul 2021 11:13:27 -0400 Subject: [PATCH 07/22] Added test (and to existing test) to make sure we can call result multiple times --- tests/unit/test_query_retry.py | 88 ++++++++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 5 deletions(-) diff --git a/tests/unit/test_query_retry.py b/tests/unit/test_query_retry.py index 344c84970..0f6c91867 100644 --- a/tests/unit/test_query_retry.py +++ b/tests/unit/test_query_retry.py @@ -1,4 +1,9 @@ +import datetime + import mock +import pytest + +import google.api_core.exceptions from .helpers import make_connection @@ -24,13 +29,13 @@ def api_request(method, path, query_params=None, data=None, **kw): else: response['jobReference'] = dict(jobId=path.split('/')[-1], projectId='PROJECT') - print(response) return response conn = client._connection = make_connection() conn.api_request.side_effect = api_request job = client.query("select 1") + orig_job_id = job.job_id result = job.result() assert result.total_rows == 1 @@ -40,10 +45,6 @@ def api_request(method, path, query_params=None, data=None, **kw): assert job.job_id != orig_job_id assert job.job_id == conn.mock_calls[3][2]['data']['jobReference']['jobId'] - # each of the first four calls was for a different job. - assert len(set(call[2]['data']['jobReference']['jobId'] - for call in conn.mock_calls[:4])) == 4 - # We had to sleep three times assert len(sleep.mock_calls) == 3 @@ -53,3 +54,80 @@ def api_request(method, path, query_params=None, data=None, **kw): # They're at most 2 * (multiplier**(number of sleeps - 1)) * initial # The default multiplier is 2 assert max(c[1][0] for c in sleep.mock_calls) <= 8 + + # We can ask for the result again: + responses = [ + dict(rows=[{'f': [{'v': '1'}]}], totalRows='1'), + ] + orig_job_id = job.job_id + result = job.result() + assert result.total_rows == 1 + assert not responses # We made all the calls we expected to. + + # We wouldn't (and didn't) fail, because we're dealing with a successful job. + # So the job id hasn't changed. + assert job.job_id == orig_job_id + + +@mock.patch('google.api_core.retry.datetime_helpers') +@mock.patch('time.sleep') +def test_retry_failed_jobs_after_retry_failed(sleep, datetime_helpers, client): + """ + If at first you don't succeed, maybe you will later. :) + """ + conn = client._connection = make_connection() + + datetime_helpers.utcnow.return_value = datetime.datetime(2021, 7, 29, 10, 43, 2) + + err = dict(reason='rateLimitExceeded') + def api_request(method, path, query_params=None, data=None, **kw): + calls = sleep.mock_calls + if calls: + datetime_helpers.utcnow.return_value += datetime.timedelta( + seconds=calls[-1][1][0] + ) + response = dict(status=dict(state='DONE', errors=[err], errorResult=err)) + response['jobReference'] = data['jobReference'] + return response + + conn.api_request.side_effect = api_request + + job = client.query("select 1") + orig_job_id = job.job_id + + with pytest.raises(google.api_core.exceptions.RetryError): + job.result() + + # We never fot a successful job, so the job id never changed: + assert job.job_id == orig_job_id + + # We failed because we couldn't succeed after 120 seconds. + # But we can try again: + responses = [ + dict(status=dict(state='DONE', errors=[err], errorResult=err)), + dict(status=dict(state='DONE', errors=[err], errorResult=err)), + dict(status=dict(state='DONE', errors=[err], errorResult=err)), + dict(status=dict(state='DONE')), + dict(rows=[{'f': [{'v': '1'}]}], totalRows='1'), + ] + + def api_request(method, path, query_params=None, data=None, **kw): + calls = sleep.mock_calls + if calls: + datetime_helpers.utcnow.return_value += datetime.timedelta( + seconds=calls[-1][1][0] + ) + response = responses.pop(0) + if data: + response['jobReference'] = data['jobReference'] + else: + response['jobReference'] = dict(jobId=path.split('/')[-1], + projectId='PROJECT') + return response + + conn.api_request.side_effect = api_request + result = job.result() + assert result.total_rows == 1 + assert not responses # We made all the calls we expected to. + assert job.job_id != orig_job_id + From c9644e92737b827184d9cdd1a73d8a23ef5a196c Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 29 Jul 2021 11:22:08 -0400 Subject: [PATCH 08/22] Keep carrying retry_do_query, even though it shouldn't be necessary. --- google/cloud/bigquery/job/query.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index ddf6b1c81..5406e2dae 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1323,6 +1323,11 @@ def do_get_result(): self.__dict__.clear() self.__dict__.update(job.__dict__) + # This shouldn't be necessary, because once we have a good + # job, it should stay good,and we shouldn't have to retry. + # But let's be paranoid. :) + self.retry_do_query = retry_do_query + super(QueryJob, self).result(retry=sub_retry, timeout=timeout) # Since the job could already be "done" (e.g. got a finished job From 2d0e0671d65583bd451a65fee3a048fb1c03f055 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 29 Jul 2021 11:43:33 -0400 Subject: [PATCH 09/22] blacken --- google/cloud/bigquery/client.py | 2 +- google/cloud/bigquery/job/query.py | 3 +- tests/unit/test_query_retry.py | 60 +++++++++++++++--------------- 3 files changed, 34 insertions(+), 31 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index a283d8bbd..7e8f5a4c7 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -3277,7 +3277,7 @@ def do_query(): # unrecoverable, we'll find out when we ask for it's result, at which # point, we may retry. if not job_id_given: - future.retry_do_query = do_query # in case we have to retry later + future.retry_do_query = do_query # in case we have to retry later return future diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 5406e2dae..7722db521 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1300,9 +1300,10 @@ def result( If the job did not complete in the given timeout. """ try: - retry_do_query = getattr(self, 'retry_do_query', None) + retry_do_query = getattr(self, "retry_do_query", None) first = True sub_retry = retry if retry_do_query is None else None + def do_get_result(): nonlocal first diff --git a/tests/unit/test_query_retry.py b/tests/unit/test_query_retry.py index 0f6c91867..b151ab4c7 100644 --- a/tests/unit/test_query_retry.py +++ b/tests/unit/test_query_retry.py @@ -8,27 +8,28 @@ from .helpers import make_connection -@mock.patch('time.sleep') +@mock.patch("time.sleep") def test_retry_failed_jobs(sleep, client): """ Test retry of job failures, as opposed to API-invocation failures. """ - err = dict(reason='rateLimitExceeded') + err = dict(reason="rateLimitExceeded") responses = [ - dict(status=dict(state='DONE', errors=[err], errorResult=err)), - dict(status=dict(state='DONE', errors=[err], errorResult=err)), - dict(status=dict(state='DONE', errors=[err], errorResult=err)), - dict(status=dict(state='DONE')), - dict(rows=[{'f': [{'v': '1'}]}], totalRows='1'), - ] + dict(status=dict(state="DONE", errors=[err], errorResult=err)), + dict(status=dict(state="DONE", errors=[err], errorResult=err)), + dict(status=dict(state="DONE", errors=[err], errorResult=err)), + dict(status=dict(state="DONE")), + dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), + ] def api_request(method, path, query_params=None, data=None, **kw): response = responses.pop(0) if data: - response['jobReference'] = data['jobReference'] + response["jobReference"] = data["jobReference"] else: - response['jobReference'] = dict(jobId=path.split('/')[-1], - projectId='PROJECT') + response["jobReference"] = dict( + jobId=path.split("/")[-1], projectId="PROJECT" + ) return response conn = client._connection = make_connection() @@ -43,7 +44,7 @@ def api_request(method, path, query_params=None, data=None, **kw): # The job adjusts it's job id based on the id of the last attempt. assert job.job_id != orig_job_id - assert job.job_id == conn.mock_calls[3][2]['data']['jobReference']['jobId'] + assert job.job_id == conn.mock_calls[3][2]["data"]["jobReference"]["jobId"] # We had to sleep three times assert len(sleep.mock_calls) == 3 @@ -57,8 +58,8 @@ def api_request(method, path, query_params=None, data=None, **kw): # We can ask for the result again: responses = [ - dict(rows=[{'f': [{'v': '1'}]}], totalRows='1'), - ] + dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), + ] orig_job_id = job.job_id result = job.result() assert result.total_rows == 1 @@ -69,8 +70,8 @@ def api_request(method, path, query_params=None, data=None, **kw): assert job.job_id == orig_job_id -@mock.patch('google.api_core.retry.datetime_helpers') -@mock.patch('time.sleep') +@mock.patch("google.api_core.retry.datetime_helpers") +@mock.patch("time.sleep") def test_retry_failed_jobs_after_retry_failed(sleep, datetime_helpers, client): """ If at first you don't succeed, maybe you will later. :) @@ -79,15 +80,16 @@ def test_retry_failed_jobs_after_retry_failed(sleep, datetime_helpers, client): datetime_helpers.utcnow.return_value = datetime.datetime(2021, 7, 29, 10, 43, 2) - err = dict(reason='rateLimitExceeded') + err = dict(reason="rateLimitExceeded") + def api_request(method, path, query_params=None, data=None, **kw): calls = sleep.mock_calls if calls: datetime_helpers.utcnow.return_value += datetime.timedelta( seconds=calls[-1][1][0] ) - response = dict(status=dict(state='DONE', errors=[err], errorResult=err)) - response['jobReference'] = data['jobReference'] + response = dict(status=dict(state="DONE", errors=[err], errorResult=err)) + response["jobReference"] = data["jobReference"] return response conn.api_request.side_effect = api_request @@ -104,12 +106,12 @@ def api_request(method, path, query_params=None, data=None, **kw): # We failed because we couldn't succeed after 120 seconds. # But we can try again: responses = [ - dict(status=dict(state='DONE', errors=[err], errorResult=err)), - dict(status=dict(state='DONE', errors=[err], errorResult=err)), - dict(status=dict(state='DONE', errors=[err], errorResult=err)), - dict(status=dict(state='DONE')), - dict(rows=[{'f': [{'v': '1'}]}], totalRows='1'), - ] + dict(status=dict(state="DONE", errors=[err], errorResult=err)), + dict(status=dict(state="DONE", errors=[err], errorResult=err)), + dict(status=dict(state="DONE", errors=[err], errorResult=err)), + dict(status=dict(state="DONE")), + dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), + ] def api_request(method, path, query_params=None, data=None, **kw): calls = sleep.mock_calls @@ -119,10 +121,11 @@ def api_request(method, path, query_params=None, data=None, **kw): ) response = responses.pop(0) if data: - response['jobReference'] = data['jobReference'] + response["jobReference"] = data["jobReference"] else: - response['jobReference'] = dict(jobId=path.split('/')[-1], - projectId='PROJECT') + response["jobReference"] = dict( + jobId=path.split("/")[-1], projectId="PROJECT" + ) return response conn.api_request.side_effect = api_request @@ -130,4 +133,3 @@ def api_request(method, path, query_params=None, data=None, **kw): assert result.total_rows == 1 assert not responses # We made all the calls we expected to. assert job.job_id != orig_job_id - From 0dcac016606b62a7d21de65925d7860922e6a570 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 29 Jul 2021 11:48:12 -0400 Subject: [PATCH 10/22] removed unecessary condition that caused a coverage fail --- tests/unit/test_query_retry.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_query_retry.py b/tests/unit/test_query_retry.py index b151ab4c7..c00defb71 100644 --- a/tests/unit/test_query_retry.py +++ b/tests/unit/test_query_retry.py @@ -115,10 +115,9 @@ def api_request(method, path, query_params=None, data=None, **kw): def api_request(method, path, query_params=None, data=None, **kw): calls = sleep.mock_calls - if calls: - datetime_helpers.utcnow.return_value += datetime.timedelta( - seconds=calls[-1][1][0] - ) + datetime_helpers.utcnow.return_value += datetime.timedelta( + seconds=calls[-1][1][0] + ) response = responses.pop(0) if data: response["jobReference"] = data["jobReference"] From 026edba979d5765ff1684ba26ca2b53b625ff3dd Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 29 Jul 2021 14:31:50 -0400 Subject: [PATCH 11/22] System test that demonstrates the retry behavior as applied to the original issues' use case --- tests/system/test_query_retry.py | 46 ++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 tests/system/test_query_retry.py diff --git a/tests/system/test_query_retry.py b/tests/system/test_query_retry.py new file mode 100644 index 000000000..475d61d06 --- /dev/null +++ b/tests/system/test_query_retry.py @@ -0,0 +1,46 @@ +import contextlib +import threading +import time + +import google.cloud.bigquery + + +def thread(func): + thread = threading.Thread(target=func, daemon=True) + thread.start() + return thread + + +def test_query_retry_539(bigquery_client, dataset_id): + """ + Test semantic retry + + See: https://github.com/googleapis/python-bigquery/issues/539 + """ + from google.api_core import exceptions + from google.api_core.retry import if_exception_type, Retry + + table_name = f"{dataset_id}.t539" + job = bigquery_client.query(f"select count(*) from {table_name}") + job_id = job.job_id + + # We can already know that the job failed, but we're not supposed + # to find out until we call result, which is where retry happend + assert job.done() + assert job.exception() is not None + + @thread + def create_table(): + time.sleep(1) + with contextlib.closing(google.cloud.bigquery.Client()) as client: + client.query(f"create table {table_name} (id int64)") + + retry_policy = Retry(predicate=if_exception_type(exceptions.NotFound)) + [[count]] = list(job.result(retry=retry_policy)) + assert count == 0 + + # The job was retried, and thus got a new job id + assert job.job_id != job_id + + # Make sure we don't leave a thread behind: + create_table.join() From 0e764d255e3c8138ddaf595c05ba74eacf40e08b Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 29 Jul 2021 19:11:48 -0400 Subject: [PATCH 12/22] Added missing copyright --- tests/system/test_query_retry.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/system/test_query_retry.py b/tests/system/test_query_retry.py index 475d61d06..aa17e54df 100644 --- a/tests/system/test_query_retry.py +++ b/tests/system/test_query_retry.py @@ -1,3 +1,22 @@ +# Copyright (c) 2021 The PyBigQuery Authors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + import contextlib import threading import time From c96d8b35de1f55710523220c295568648f286c6e Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 29 Jul 2021 19:12:55 -0400 Subject: [PATCH 13/22] Added missing copyright --- tests/unit/test_query_retry.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/unit/test_query_retry.py b/tests/unit/test_query_retry.py index c00defb71..26ea6c78d 100644 --- a/tests/unit/test_query_retry.py +++ b/tests/unit/test_query_retry.py @@ -1,3 +1,22 @@ +# Copyright (c) 2021 The PyBigQuery Authors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + import datetime import mock From 3c53172fe4685bc9889831fadb0a54283dd1a2a5 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Wed, 4 Aug 2021 12:07:01 -0400 Subject: [PATCH 14/22] Added a leading _ to the retry_do_query query-jov attribute to make it clear that this is an implementation detail --- google/cloud/bigquery/client.py | 2 +- google/cloud/bigquery/job/query.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 7e8f5a4c7..8fdcdcbb2 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -3277,7 +3277,7 @@ def do_query(): # unrecoverable, we'll find out when we ask for it's result, at which # point, we may retry. if not job_id_given: - future.retry_do_query = do_query # in case we have to retry later + future._retry_do_query = do_query # in case we have to retry later return future diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 7722db521..90d285f58 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1300,7 +1300,7 @@ def result( If the job did not complete in the given timeout. """ try: - retry_do_query = getattr(self, "retry_do_query", None) + retry_do_query = getattr(self, "_retry_do_query", None) first = True sub_retry = retry if retry_do_query is None else None @@ -1327,7 +1327,7 @@ def do_get_result(): # This shouldn't be necessary, because once we have a good # job, it should stay good,and we shouldn't have to retry. # But let's be paranoid. :) - self.retry_do_query = retry_do_query + self._retry_do_query = retry_do_query super(QueryJob, self).result(retry=sub_retry, timeout=timeout) From d6d958f9eb89b76623d5414eee8e93408769d940 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 5 Aug 2021 13:00:25 -0400 Subject: [PATCH 15/22] fixed copyright --- tests/system/test_query_retry.py | 25 ++++++++++--------------- tests/unit/test_query_retry.py | 25 ++++++++++--------------- 2 files changed, 20 insertions(+), 30 deletions(-) diff --git a/tests/system/test_query_retry.py b/tests/system/test_query_retry.py index aa17e54df..5adb88a1d 100644 --- a/tests/system/test_query_retry.py +++ b/tests/system/test_query_retry.py @@ -1,21 +1,16 @@ -# Copyright (c) 2021 The PyBigQuery Authors +# Copyright 2021 Google LLC # -# Permission is hereby granted, free of charge, to any person obtaining a copy of -# this software and associated documentation files (the "Software"), to deal in -# the Software without restriction, including without limitation the rights to -# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -# the Software, and to permit persons to whom the Software is furnished to do so, -# subject to the following conditions: +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. +# https://www.apache.org/licenses/LICENSE-2.0 # -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import contextlib import threading diff --git a/tests/unit/test_query_retry.py b/tests/unit/test_query_retry.py index 26ea6c78d..a14ab7cb7 100644 --- a/tests/unit/test_query_retry.py +++ b/tests/unit/test_query_retry.py @@ -1,21 +1,16 @@ -# Copyright (c) 2021 The PyBigQuery Authors +# Copyright 2021 Google LLC # -# Permission is hereby granted, free of charge, to any person obtaining a copy of -# this software and associated documentation files (the "Software"), to deal in -# the Software without restriction, including without limitation the rights to -# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -# the Software, and to permit persons to whom the Software is furnished to do so, -# subject to the following conditions: +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. +# https://www.apache.org/licenses/LICENSE-2.0 # -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import datetime From 32e7050a457affe411049d3eb61d25c050a56dc5 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 5 Aug 2021 13:03:33 -0400 Subject: [PATCH 16/22] why sleep? --- tests/system/test_query_retry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_query_retry.py b/tests/system/test_query_retry.py index 5adb88a1d..26c7cb14c 100644 --- a/tests/system/test_query_retry.py +++ b/tests/system/test_query_retry.py @@ -45,7 +45,7 @@ def test_query_retry_539(bigquery_client, dataset_id): @thread def create_table(): - time.sleep(1) + time.sleep(1) # Give the first attempt time to fail. with contextlib.closing(google.cloud.bigquery.Client()) as client: client.query(f"create table {table_name} (id int64)") From 3361a8203c1e686fe3d692bd5c3fbdb104225e98 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 5 Aug 2021 13:28:26 -0400 Subject: [PATCH 17/22] use DEFAULT_RETRY for low-level requests, to retry API errors --- google/cloud/bigquery/job/query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 90d285f58..4e2bfdc15 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1302,7 +1302,7 @@ def result( try: retry_do_query = getattr(self, "_retry_do_query", None) first = True - sub_retry = retry if retry_do_query is None else None + sub_retry = retry if retry_do_query is None else DEFAULT_RETRY def do_get_result(): nonlocal first From 4c6ef5bd48b51c4b676114a46d687158b30fb8c3 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Mon, 9 Aug 2021 11:56:28 -0400 Subject: [PATCH 18/22] Separate job retry into separate option for client query and query-job result --- google/cloud/bigquery/client.py | 33 ++++++++- google/cloud/bigquery/job/query.py | 48 ++++++++++--- google/cloud/bigquery/retry.py | 12 ++++ ...{test_query_retry.py => test_job_retry.py} | 26 +++++-- ...{test_query_retry.py => test_job_retry.py} | 71 +++++++++++++++++-- tests/unit/test_retry.py | 15 ++++ 6 files changed, 181 insertions(+), 24 deletions(-) rename tests/system/{test_query_retry.py => test_job_retry.py} (66%) rename tests/unit/{test_query_retry.py => test_job_retry.py} (68%) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 8fdcdcbb2..4017fe91a 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -3163,6 +3163,7 @@ def query( project: str = None, retry: retries.Retry = DEFAULT_RETRY, timeout: float = None, + job_retry: retries.Retry = None, ) -> job.QueryJob: """Run a SQL query. @@ -3192,20 +3193,45 @@ def query( Project ID of the project of where to run the job. Defaults to the client's project. retry (Optional[google.api_core.retry.Retry]): - How to retry the RPC. + How to retry the RPC. This only applies to making RPC + calls. It isn't used to retry failed jobs. This has + a reasonable default that should only be overridden + with care. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + job_retry (Optional[google.api_core.retry.Retry]): + How to retry failed jobs. The default retries + rate-limit-exceeded errors. + + Not all jobs can be retried. If `job_id` is provided, + then the job returned by the query will not be + retryable, and an exception will be raised if + `job_retry` is also provided. + + Note that the errors aren't detected until `result()` + is called on the job returned. The `job_retry` + specified here becomes the default `job_retry` for + `result()`, where it can also be specified. Returns: google.cloud.bigquery.job.QueryJob: A new query job instance. Raises: TypeError: - If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig` - class. + If ``job_config`` is not an instance of + :class:`~google.cloud.bigquery.job.QueryJobConfig` + class, or if both `job_id` and `job_retry` are + provided. """ job_id_given = job_id is not None + if job_id_given and job_retry is not None: + raise TypeError( + "`job_retry` was provided, but the returned job is" + " not retryable, because a custom `job_id` was" + " provided." + ) + job_id_save = job_id if project is None: @@ -3278,6 +3304,7 @@ def do_query(): # point, we may retry. if not job_id_given: future._retry_do_query = do_query # in case we have to retry later + future._job_retry = job_retry return future diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 4e2bfdc15..5b1d6a743 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -36,7 +36,7 @@ from google.cloud.bigquery.query import ScalarQueryParameter from google.cloud.bigquery.query import StructQueryParameter from google.cloud.bigquery.query import UDFResource -from google.cloud.bigquery.retry import DEFAULT_RETRY +from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY from google.cloud.bigquery.routine import RoutineReference from google.cloud.bigquery.table import _EmptyRowIterator from google.cloud.bigquery.table import RangePartitioning @@ -1260,6 +1260,7 @@ def result( retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None, start_index: int = None, + job_retry: "retries.Retry" = None, ) -> Union["RowIterator", _EmptyRowIterator]: """Start the job and wait for it to complete and get the result. @@ -1270,9 +1271,13 @@ def result( max_results (Optional[int]): The maximum total number of rows from this request. retry (Optional[google.api_core.retry.Retry]): - How to retry the call that retrieves rows. If the job state is - ``DONE``, retrying is aborted early even if the results are not - available, as this will not change anymore. + How to retry the call that retrieves rows. This only + applies to making RPC calls. It isn't used to retry + failed jobs. This has a reasonable default that + should only be overridden with care. If the job state + is ``DONE``, retrying is aborted early even if the + results are not available, as this will not change + anymore. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. @@ -1280,6 +1285,15 @@ def result( applies to each individual request. start_index (Optional[int]): The zero-based index of the starting row to read. + job_retry (Optional[google.api_core.retry.Retry]): + How to retry failed jobs. The default retries + rate-limit-exceeded errors. + + Not all jobs can be retried. If `job_id` was provided + to the query that created this job, then the job + returned by the query will not be retryable, and an + exception will be raised if `job_retry` is also + provided. Returns: google.cloud.bigquery.table.RowIterator: @@ -1295,14 +1309,28 @@ def result( Raises: google.cloud.exceptions.GoogleAPICallError: - If the job failed. + If the job failed and retries aren't successful. concurrent.futures.TimeoutError: If the job did not complete in the given timeout. + TypeError: + If `job_retry` is provided and the job is not retryable. """ try: retry_do_query = getattr(self, "_retry_do_query", None) + if retry_do_query is not None: + if job_retry is None: + job_retry = getattr(self, "_job_retry", None) + if job_retry is None: + job_retry = DEFAULT_JOB_RETRY + else: + if job_retry is not None: + raise TypeError( + "`job_retry` was provided, but this job is" + " not retryable, because a custom `job_id` was" + " provided to the query that created this job." + ) + first = True - sub_retry = retry if retry_do_query is None else DEFAULT_RETRY def do_get_result(): nonlocal first @@ -1329,15 +1357,15 @@ def do_get_result(): # But let's be paranoid. :) self._retry_do_query = retry_do_query - super(QueryJob, self).result(retry=sub_retry, timeout=timeout) + super(QueryJob, self).result(retry=retry, timeout=timeout) # Since the job could already be "done" (e.g. got a finished job # via client.get_job), the superclass call to done() might not # set the self._query_results cache. - self._reload_query_results(retry=sub_retry, timeout=timeout) + self._reload_query_results(retry=retry, timeout=timeout) - if retry is not None and retry_do_query is not None: - do_get_result = retry(do_get_result) + if retry_do_query is not None: + do_get_result = job_retry(do_get_result) do_get_result() diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 2df4de08b..018b4619c 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -56,3 +56,15 @@ def _should_retry(exc): on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds, pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. """ + + +@retry.Retry +def DEFAULT_JOB_RETRY(exc): + """ + The default job retry object. + """ + if not hasattr(exc, "errors") or len(exc.errors) == 0: + return False + + reason = exc.errors[0]["reason"] + return reason == "rateLimitExceeded" diff --git a/tests/system/test_query_retry.py b/tests/system/test_job_retry.py similarity index 66% rename from tests/system/test_query_retry.py rename to tests/system/test_job_retry.py index 26c7cb14c..520545493 100644 --- a/tests/system/test_query_retry.py +++ b/tests/system/test_job_retry.py @@ -16,7 +16,9 @@ import threading import time +import google.api_core.exceptions import google.cloud.bigquery +import pytest def thread(func): @@ -25,9 +27,10 @@ def thread(func): return thread -def test_query_retry_539(bigquery_client, dataset_id): +@pytest.mark.parametrize("job_retry_on_query", [True, False]) +def test_query_retry_539(bigquery_client, dataset_id, job_retry_on_query): """ - Test semantic retry + Test job_retry See: https://github.com/googleapis/python-bigquery/issues/539 """ @@ -35,7 +38,15 @@ def test_query_retry_539(bigquery_client, dataset_id): from google.api_core.retry import if_exception_type, Retry table_name = f"{dataset_id}.t539" - job = bigquery_client.query(f"select count(*) from {table_name}") + + # Without a custom retry, we fail: + with pytest.raises(google.api_core.exceptions.NotFound): + bigquery_client.query(f"select count(*) from {table_name}").result() + + retry_notfound = Retry(predicate=if_exception_type(exceptions.NotFound)) + + job_retry = dict(job_retry=retry_notfound) if job_retry_on_query else {} + job = bigquery_client.query(f"select count(*) from {table_name}", **job_retry) job_id = job.job_id # We can already know that the job failed, but we're not supposed @@ -45,12 +56,12 @@ def test_query_retry_539(bigquery_client, dataset_id): @thread def create_table(): - time.sleep(1) # Give the first attempt time to fail. + time.sleep(1) # Give the first retry attempt time to fail. with contextlib.closing(google.cloud.bigquery.Client()) as client: - client.query(f"create table {table_name} (id int64)") + client.query(f"create table {table_name} (id int64)").result() - retry_policy = Retry(predicate=if_exception_type(exceptions.NotFound)) - [[count]] = list(job.result(retry=retry_policy)) + job_retry = {} if job_retry_on_query else dict(job_retry=retry_notfound) + [[count]] = list(job.result(**job_retry)) assert count == 0 # The job was retried, and thus got a new job id @@ -58,3 +69,4 @@ def create_table(): # Make sure we don't leave a thread behind: create_table.join() + bigquery_client.query(f"drop table {table_name}").result() diff --git a/tests/unit/test_query_retry.py b/tests/unit/test_job_retry.py similarity index 68% rename from tests/unit/test_query_retry.py rename to tests/unit/test_job_retry.py index a14ab7cb7..e5e8ba93a 100644 --- a/tests/unit/test_query_retry.py +++ b/tests/unit/test_job_retry.py @@ -13,21 +13,46 @@ # limitations under the License. import datetime +import re import mock import pytest import google.api_core.exceptions +import google.api_core.retry from .helpers import make_connection +# With job_retry_on_query, we're testing 4 scenarios: +# - No `job_retry` passed, retry on default rateLimitExceeded. +# - Pass NotFound retry to `query`. +# - Pass NotFound retry to `result`. +# - Pass NotFound retry to both, with the value passed to `result` overriding. +@pytest.mark.parametrize("job_retry_on_query", [None, "Query", "Result", "Both"]) @mock.patch("time.sleep") -def test_retry_failed_jobs(sleep, client): +def test_retry_failed_jobs(sleep, client, job_retry_on_query): """ Test retry of job failures, as opposed to API-invocation failures. """ - err = dict(reason="rateLimitExceeded") + + retry_notfound = google.api_core.retry.Retry( + predicate=google.api_core.retry.if_exception_type( + google.api_core.exceptions.NotFound + ) + ) + retry_badrequest = google.api_core.retry.Retry( + predicate=google.api_core.retry.if_exception_type( + google.api_core.exceptions.BadRequest + ) + ) + + if job_retry_on_query is None: + reason = "rateLimitExceeded" + else: + reason = "notFound" + + err = dict(reason=reason) responses = [ dict(status=dict(state="DONE", errors=[err], errorResult=err)), dict(status=dict(state="DONE", errors=[err], errorResult=err)), @@ -49,10 +74,22 @@ def api_request(method, path, query_params=None, data=None, **kw): conn = client._connection = make_connection() conn.api_request.side_effect = api_request - job = client.query("select 1") + if job_retry_on_query == "Query": + job_retry = dict(job_retry=retry_notfound) + elif job_retry_on_query == "Both": + # This will be overridden in `result` + job_retry = dict(job_retry=retry_badrequest) + else: + job_retry = {} + job = client.query("select 1", **job_retry) orig_job_id = job.job_id - result = job.result() + job_retry = ( + dict(job_retry=retry_notfound) + if job_retry_on_query in ("Result", "Both") + else {} + ) + result = job.result(**job_retry) assert result.total_rows == 1 assert not responses # We made all the calls we expected to. @@ -146,3 +183,29 @@ def api_request(method, path, query_params=None, data=None, **kw): assert result.total_rows == 1 assert not responses # We made all the calls we expected to. assert job.job_id != orig_job_id + + +def test_raises_on_job_retry_on_query_with_non_retryable_jobs(client): + with pytest.raises( + TypeError, + match=re.escape( + "`job_retry` was provided, but the returned job is" + " not retryable, because a custom `job_id` was" + " provided." + ), + ): + client.query("select 42", job_id=42, job_retry=google.api_core.retry.Retry()) + + +def test_raises_on_job_retry_on_result_with_non_retryable_jobs(client): + client._connection = make_connection({}) + job = client.query("select 42", job_id=42) + with pytest.raises( + TypeError, + match=re.escape( + "`job_retry` was provided, but this job is" + " not retryable, because a custom `job_id` was" + " provided to the query that created this job." + ), + ): + job.result(job_retry=google.api_core.retry.Retry()) diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 6fb7f93fd..32da674ca 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -86,3 +86,18 @@ def test_w_unstructured_bad_gateway(self): exc = BadGateway("testing") self.assertTrue(self._call_fut(exc)) + + +def test_DEFAULT_JOB_RETRY_predicate(): + from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY + from google.api_core.exceptions import ClientError + + assert not DEFAULT_JOB_RETRY._predicate(TypeError()) + assert not DEFAULT_JOB_RETRY._predicate(ClientError("fail")) + assert not DEFAULT_JOB_RETRY._predicate( + ClientError("fail", errors=[dict(reason="idk")]) + ) + + assert DEFAULT_JOB_RETRY._predicate( + ClientError("fail", errors=[dict(reason="rateLimitExceeded")]) + ) From 9ab84e4064ec9e6bc82e9473e9c5b6850ce66423 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Tue, 10 Aug 2021 15:13:05 -0400 Subject: [PATCH 19/22] Use None job_retry to disable retry --- google/cloud/bigquery/client.py | 34 ++++++++++++++++------------ google/cloud/bigquery/job/query.py | 29 ++++++++++++------------ tests/unit/test_job_retry.py | 36 +++++++++++++++++++++++++++++- 3 files changed, 70 insertions(+), 29 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 4017fe91a..8142c59cd 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -86,7 +86,7 @@ from google.cloud.bigquery.model import ModelReference from google.cloud.bigquery.model import _model_arg_to_model_ref from google.cloud.bigquery.query import _QueryResults -from google.cloud.bigquery.retry import DEFAULT_RETRY +from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY from google.cloud.bigquery.routine import Routine from google.cloud.bigquery.routine import RoutineReference from google.cloud.bigquery.schema import SchemaField @@ -3163,7 +3163,7 @@ def query( project: str = None, retry: retries.Retry = DEFAULT_RETRY, timeout: float = None, - job_retry: retries.Retry = None, + job_retry: retries.Retry = DEFAULT_JOB_RETRY, ) -> job.QueryJob: """Run a SQL query. @@ -3202,17 +3202,19 @@ def query( before using ``retry``. job_retry (Optional[google.api_core.retry.Retry]): How to retry failed jobs. The default retries - rate-limit-exceeded errors. + rate-limit-exceeded errors. Passing ``None`` disables + job retry. - Not all jobs can be retried. If `job_id` is provided, - then the job returned by the query will not be - retryable, and an exception will be raised if - `job_retry` is also provided. + Not all jobs can be retried. If ``job_id`` is + provided, then the job returned by the query will not + be retryable, and an exception will be raised if a + non-``None`` (and non-default) value for ``job_retry`` + is also provided. - Note that the errors aren't detected until `result()` - is called on the job returned. The `job_retry` - specified here becomes the default `job_retry` for - `result()`, where it can also be specified. + Note that errors aren't detected until ``result()`` is + called on the job returned. The ``job_retry`` + specified here becomes the default ``job_retry`` for + ``result()``, where it can also be specified. Returns: google.cloud.bigquery.job.QueryJob: A new query job instance. @@ -3221,11 +3223,15 @@ def query( TypeError: If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig` - class, or if both `job_id` and `job_retry` are - provided. + class, or if both ``job_id`` and non-``None`` non-default + ``job_retry`` are provided. """ job_id_given = job_id is not None - if job_id_given and job_retry is not None: + if ( + job_id_given + and job_retry is not None + and job_retry is not DEFAULT_JOB_RETRY + ): raise TypeError( "`job_retry` was provided, but the returned job is" " not retryable, because a custom `job_id` was" diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 5b1d6a743..3ab47b0f9 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1260,7 +1260,7 @@ def result( retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None, start_index: int = None, - job_retry: "retries.Retry" = None, + job_retry: "retries.Retry" = DEFAULT_JOB_RETRY, ) -> Union["RowIterator", _EmptyRowIterator]: """Start the job and wait for it to complete and get the result. @@ -1287,13 +1287,14 @@ def result( The zero-based index of the starting row to read. job_retry (Optional[google.api_core.retry.Retry]): How to retry failed jobs. The default retries - rate-limit-exceeded errors. + rate-limit-exceeded errors. Passing ``None`` disables + job retry. - Not all jobs can be retried. If `job_id` was provided - to the query that created this job, then the job - returned by the query will not be retryable, and an - exception will be raised if `job_retry` is also - provided. + Not all jobs can be retried. If ``job_id`` was + provided to the query that created this job, then the + job returned by the query will not be retryable, and + an exception will be raised if non-``None`` + non-default ``job_retry`` is also provided. Returns: google.cloud.bigquery.table.RowIterator: @@ -1313,17 +1314,16 @@ def result( concurrent.futures.TimeoutError: If the job did not complete in the given timeout. TypeError: - If `job_retry` is provided and the job is not retryable. + If Non-``None`` and non-default ``job_retry`` is + provided and the job is not retryable. """ try: retry_do_query = getattr(self, "_retry_do_query", None) if retry_do_query is not None: - if job_retry is None: - job_retry = getattr(self, "_job_retry", None) - if job_retry is None: - job_retry = DEFAULT_JOB_RETRY + if job_retry is DEFAULT_JOB_RETRY: + job_retry = self._job_retry else: - if job_retry is not None: + if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY: raise TypeError( "`job_retry` was provided, but this job is" " not retryable, because a custom `job_id` was" @@ -1356,6 +1356,7 @@ def do_get_result(): # job, it should stay good,and we shouldn't have to retry. # But let's be paranoid. :) self._retry_do_query = retry_do_query + self._job_retry = job_retry super(QueryJob, self).result(retry=retry, timeout=timeout) @@ -1364,7 +1365,7 @@ def do_get_result(): # set the self._query_results cache. self._reload_query_results(retry=retry, timeout=timeout) - if retry_do_query is not None: + if retry_do_query is not None and job_retry is not None: do_get_result = job_retry(do_get_result) do_get_result() diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index e5e8ba93a..43d06c8d8 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -28,7 +28,7 @@ # - No `job_retry` passed, retry on default rateLimitExceeded. # - Pass NotFound retry to `query`. # - Pass NotFound retry to `result`. -# - Pass NotFound retry to both, with the value passed to `result` overriding. +# - Pass BadRequest retry to query, with the value passed to `result` overriding. @pytest.mark.parametrize("job_retry_on_query", [None, "Query", "Result", "Both"]) @mock.patch("time.sleep") def test_retry_failed_jobs(sleep, client, job_retry_on_query): @@ -121,6 +121,40 @@ def api_request(method, path, query_params=None, data=None, **kw): assert job.job_id == orig_job_id +# With job_retry_on_query, we're testing 4 scenarios: +# - Pass None retry to `query`. +# - Pass None retry to `result`. +@pytest.mark.parametrize("job_retry_on_query", ["Query", "Result"]) +@mock.patch("time.sleep") +def test_disable_retry_failed_jobs(sleep, client, job_retry_on_query): + """ + Test retry of job failures, as opposed to API-invocation failures. + """ + err = dict(reason="rateLimitExceeded") + responses = [dict(status=dict(state="DONE", errors=[err], errorResult=err)),] * 3 + + def api_request(method, path, query_params=None, data=None, **kw): + response = responses.pop(0) + response["jobReference"] = data["jobReference"] + return response + + conn = client._connection = make_connection() + conn.api_request.side_effect = api_request + + if job_retry_on_query == "Query": + job_retry = dict(job_retry=None) + else: + job_retry = {} + job = client.query("select 1", **job_retry) + + orig_job_id = job.job_id + job_retry = dict(job_retry=None) if job_retry_on_query == "Result" else {} + with pytest.raises(google.api_core.exceptions.Forbidden): + job.result(**job_retry) + + assert len(sleep.mock_calls) == 0 + + @mock.patch("google.api_core.retry.datetime_helpers") @mock.patch("time.sleep") def test_retry_failed_jobs_after_retry_failed(sleep, datetime_helpers, client): From d2bf84036d96538c1c5246fce07397c1aa2bbd8f Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Tue, 10 Aug 2021 15:23:36 -0400 Subject: [PATCH 20/22] Use a 10-minute deadline for job retry by default --- google/cloud/bigquery/retry.py | 16 +++++++++++----- tests/unit/test_job_retry.py | 3 ++- tests/unit/test_retry.py | 6 ++++++ 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 018b4619c..53f989273 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -32,6 +32,8 @@ auth_exceptions.TransportError, ) +_DEFAULT_JOB_DEADLINE = 60.0 * 10.0 # seconds + def _should_retry(exc): """Predicate for determining when to retry. @@ -58,13 +60,17 @@ def _should_retry(exc): """ -@retry.Retry -def DEFAULT_JOB_RETRY(exc): - """ - The default job retry object. - """ +def _job_should_retry(exc): if not hasattr(exc, "errors") or len(exc.errors) == 0: return False reason = exc.errors[0]["reason"] return reason == "rateLimitExceeded" + + +DEFAULT_JOB_RETRY = retry.Retry( + predicate=_job_should_retry, deadline=_DEFAULT_JOB_DEADLINE +) +""" +The default job retry object. +""" diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index 43d06c8d8..5f2747fae 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -131,7 +131,7 @@ def test_disable_retry_failed_jobs(sleep, client, job_retry_on_query): Test retry of job failures, as opposed to API-invocation failures. """ err = dict(reason="rateLimitExceeded") - responses = [dict(status=dict(state="DONE", errors=[err], errorResult=err)),] * 3 + responses = [dict(status=dict(state="DONE", errors=[err], errorResult=err))] * 3 def api_request(method, path, query_params=None, data=None, **kw): response = responses.pop(0) @@ -152,6 +152,7 @@ def api_request(method, path, query_params=None, data=None, **kw): with pytest.raises(google.api_core.exceptions.Forbidden): job.result(**job_retry) + assert job.job_id == orig_job_id assert len(sleep.mock_calls) == 0 diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 32da674ca..b60c3e9f0 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -101,3 +101,9 @@ def test_DEFAULT_JOB_RETRY_predicate(): assert DEFAULT_JOB_RETRY._predicate( ClientError("fail", errors=[dict(reason="rateLimitExceeded")]) ) + + +def test_DEFAULT_JOB_RETRY_deadline(): + from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY + + assert DEFAULT_JOB_RETRY._deadline == 600 From bf051b05a4f84e75a43e81f211f843557350edd5 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Wed, 11 Aug 2021 12:16:25 -0400 Subject: [PATCH 21/22] Added another default reason to retry jobs --- google/cloud/bigquery/retry.py | 4 +++- tests/unit/test_job_retry.py | 5 +++-- tests/unit/test_retry.py | 3 +++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index ea31b3362..e9286055c 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -59,13 +59,15 @@ def _should_retry(exc): pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. """ +job_retry_reasons = "rateLimitExceeded", "backendError" + def _job_should_retry(exc): if not hasattr(exc, "errors") or len(exc.errors) == 0: return False reason = exc.errors[0]["reason"] - return reason == "rateLimitExceeded" + return reason in job_retry_reasons DEFAULT_JOB_RETRY = retry.Retry( diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index 5f2747fae..64ecb8369 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -191,10 +191,11 @@ def api_request(method, path, query_params=None, data=None, **kw): # We failed because we couldn't succeed after 120 seconds. # But we can try again: + err2 = dict(reason="backendError") # We also retry on this responses = [ + dict(status=dict(state="DONE", errors=[err2], errorResult=err2)), dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), + dict(status=dict(state="DONE", errors=[err2], errorResult=err2)), dict(status=dict(state="DONE")), dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), ] diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index b60c3e9f0..c7c25e036 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -101,6 +101,9 @@ def test_DEFAULT_JOB_RETRY_predicate(): assert DEFAULT_JOB_RETRY._predicate( ClientError("fail", errors=[dict(reason="rateLimitExceeded")]) ) + assert DEFAULT_JOB_RETRY._predicate( + ClientError("fail", errors=[dict(reason="backendError")]) + ) def test_DEFAULT_JOB_RETRY_deadline(): From 6b790e8b91e125fff2ed627f3b89628b045c8ff8 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 11 Aug 2021 13:02:09 -0500 Subject: [PATCH 22/22] Update tests/unit/test_job_retry.py --- tests/unit/test_job_retry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index 64ecb8369..b2095d2f2 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -186,7 +186,7 @@ def api_request(method, path, query_params=None, data=None, **kw): with pytest.raises(google.api_core.exceptions.RetryError): job.result() - # We never fot a successful job, so the job id never changed: + # We never got a successful job, so the job id never changed: assert job.job_id == orig_job_id # We failed because we couldn't succeed after 120 seconds.