From 23cdb5239b0252b41e341f094a5665f46036c8e1 Mon Sep 17 00:00:00 2001 From: Bogdan Ghita Date: Wed, 1 Apr 2020 18:27:08 +0200 Subject: [PATCH 1/3] Bump version to 0.10.0 --- databricks_cli/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks_cli/version.py b/databricks_cli/version.py index 100c0517..2c21bfc5 100644 --- a/databricks_cli/version.py +++ b/databricks_cli/version.py @@ -21,7 +21,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -version = '0.9.2.dev0' # NOQA +version = '0.10.0' # NOQA def print_version_callback(ctx, param, value): # NOQA From 6e5a67420087fa3c1a905d972c3b59588cd3ed0d Mon Sep 17 00:00:00 2001 From: Bogdan Ghita Date: Wed, 1 Apr 2020 19:19:45 +0200 Subject: [PATCH 2/3] Revert "Bump version to 0.10.0" This reverts commit 23cdb5239b0252b41e341f094a5665f46036c8e1. --- databricks_cli/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks_cli/version.py b/databricks_cli/version.py index 2c21bfc5..100c0517 100644 --- a/databricks_cli/version.py +++ b/databricks_cli/version.py @@ -21,7 +21,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -version = '0.10.0' # NOQA +version = '0.9.2.dev0' # NOQA def print_version_callback(ctx, param, value): # NOQA From 019f8f8513e55b5108de7ce6bfed8ae430c8c093 Mon Sep 17 00:00:00 2001 From: Bogdan Ghita Date: Wed, 23 Sep 2020 15:51:46 +0200 Subject: [PATCH 3/3] Wrapped retry logic in a class. --- databricks_cli/dbfs/api.py | 112 +++++++++++++++++++++++-------------- tests/dbfs/test_api.py | 3 +- 2 files changed, 70 insertions(+), 45 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index 51e7233a..18edd668 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -28,6 +28,7 @@ import tempfile import re +import functools import click from tenacity import retry, wait_random_exponential, retry_if_exception_type, stop_after_attempt @@ -39,10 +40,6 @@ from databricks_cli.dbfs.exceptions import LocalFileExistsException, RateLimitException BUFFER_SIZE_BYTES = 2**20 -EXPONENTIAL_BACKOFF_MULTIPLIER = 1 -MAX_SECONDS_WAIT = 60 -MAX_RETRY_ATTEMPTS = 8 -time_for_last_retry = 0 class ParseException(Exception): @@ -83,46 +80,75 @@ class DbfsErrorCodes(object): TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS' -def before_sleep_on_429(retry_state): - global time_for_last_retry - if retry_state.attempt_number < 1: - click.echo("Warning: Unexpected retry_state.attempt_number={}.".format( - retry_state.attempt_number)) - click.echo("Received 429 REQUEST_LIMIT_EXCEEDED. Retrying with exponential backoff.") - else: - # Initialize time_for_last_retry on the first attempt. - if retry_state.attempt_number == 1: - time_for_last_retry = 0 - # Note: Here idle_for represents the total time spent sleeping in all retries so far + - # the time that we will sleep until the next retry. We determined this empirically, - # as it is not clearly stated in the Tenacity docs. - time_until_next_retry = retry_state.idle_for - time_for_last_retry +class CustomRetryState(object): + def __init__(self): + self.time_for_last_retry = 0 + + def reset(self): + self.time_for_last_retry = 0 + + +class Retry429(object): + EXPONENTIAL_BACKOFF_MULTIPLIER = 1 + MAX_SECONDS_WAIT = 60 + MAX_RETRY_ATTEMPTS = 8 + + def __init__(self, func): + """ + If there are no decorator arguments, the function to be decorated is passed to + the constructor. It is called only once for each function decorated with it. + """ + self.retry_state_429 = CustomRetryState() + + @retry(wait=wait_random_exponential(multiplier=self.EXPONENTIAL_BACKOFF_MULTIPLIER, + max=self.MAX_SECONDS_WAIT), retry=retry_if_exception_type(RateLimitException), + stop=stop_after_attempt(self.MAX_RETRY_ATTEMPTS), reraise=True, + before_sleep=lambda retry_state: self.before_sleep_on_429(retry_state, + self.retry_state_429)) + def wrapped_function(*args, **kwargs): + try: + return func(*args, **kwargs) + except HTTPError as e: + if e.response.status_code == 429: + raise RateLimitException("429 Too Many Requests") + raise e + + self.func = wrapped_function + + def __call__(self, *args, **kwargs): + """ + The __call__ method is called every time a decorated function is called. + """ + self.retry_state_429.reset() + + return self.func(*args, **kwargs) + + def __get__(self, obj, objtype): + """ + Making this decorator a descriptor such that we can use it on class methods. + See https://stackoverflow.com/a/3296318/12359607 + """ + return functools.partial(self.__call__, obj) + + @staticmethod + def before_sleep_on_429(retry_state, retry_state_429): + """ + Note: Here idle_for represents the total time spent sleeping in all retries so far + + the time that we will sleep until the next retry. We determined this empirically, + as it is not clearly stated in the Tenacity docs. + """ + time_until_next_retry = retry_state.idle_for - retry_state_429.time_for_last_retry click.echo(("Received 429 REQUEST_LIMIT_EXCEEDED for attempt {}. " "Retrying in {:.2f} seconds.").format(retry_state.attempt_number, time_until_next_retry)) - time_for_last_retry = retry_state.idle_for - - -def retry_429(func): - @retry(wait=wait_random_exponential(multiplier=EXPONENTIAL_BACKOFF_MULTIPLIER, - max=MAX_SECONDS_WAIT), retry=retry_if_exception_type(RateLimitException), - stop=stop_after_attempt(MAX_RETRY_ATTEMPTS), reraise=True, - before_sleep=before_sleep_on_429) - def wrapped_function(*args, **kwargs): - try: - return func(*args, **kwargs) - except HTTPError as e: - if e.response.status_code == 429: - raise RateLimitException("429 Too Many Requests") - raise e - return wrapped_function + retry_state_429.time_for_last_retry = retry_state.idle_for class DbfsApi(object): def __init__(self, api_client): self.client = DbfsService(api_client) - @retry_429 + @Retry429 def list_files(self, dbfs_path, headers=None): list_response = self.client.list(dbfs_path.absolute_path, headers=headers) if 'files' in list_response: @@ -143,20 +169,20 @@ def file_exists(self, dbfs_path, headers=None): raise e return True - @retry_429 + @Retry429 def get_status(self, dbfs_path, headers=None): json = self.client.get_status(dbfs_path.absolute_path, headers=headers) return FileInfo.from_json(json) - @retry_429 + @Retry429 def create(self, dbfs_path, overwrite, headers): return self.client.create(dbfs_path.absolute_path, overwrite, headers=headers) - @retry_429 + @Retry429 def add_block(self, handle, contents, headers): self.client.add_block(handle, contents, headers=headers) - @retry_429 + @Retry429 def close(self, handle, headers): self.client.close(handle, headers=headers) @@ -171,7 +197,7 @@ def put_file(self, src_path, dbfs_path, overwrite, headers=None): self.add_block(handle, b64encode(contents).decode(), headers=headers) self.close(handle, headers=headers) - @retry_429 + @Retry429 def read(self, dbfs_path, offset, headers): return self.client.read(dbfs_path.absolute_path, offset, BUFFER_SIZE_BYTES, headers=headers) @@ -204,7 +230,7 @@ def get_num_files_deleted(partial_delete_error): message)) return int(m.group(1)) - @retry_429 + @Retry429 def delete(self, dbfs_path, recursive, headers=None): num_files_deleted = 0 while True: @@ -232,11 +258,11 @@ def delete(self, dbfs_path, recursive, headers=None): break click.echo("\rDelete finished successfully.\033[K") - @retry_429 + @Retry429 def mkdirs(self, dbfs_path, headers=None): self.client.mkdirs(dbfs_path.absolute_path, headers=headers) - @retry_429 + @Retry429 def move(self, dbfs_src, dbfs_dst, headers=None): self.client.move(dbfs_src.absolute_path, dbfs_dst.absolute_path, headers=headers) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 77c6cf78..1ad2c4b9 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -40,7 +40,6 @@ 'file_size': 1 } TEST_FILE_INFO = api.FileInfo(TEST_DBFS_PATH, False, 1) -MAX_RETRY_ATTEMPTS = 8 def get_resource_does_not_exist_exception(): @@ -141,7 +140,7 @@ def test_mkdirs_stop_retrying(self, dbfs_api): dbfs_api.client.mkdirs = mock.Mock(side_effect=exception_sequence) with pytest.raises(RateLimitException): dbfs_api.mkdirs(DbfsPath('dbfs:/test/mkdir')) - assert dbfs_api.client.mkdirs.call_count == MAX_RETRY_ATTEMPTS + assert dbfs_api.client.mkdirs.call_count == api.Retry429.MAX_RETRY_ATTEMPTS def test_file_exists_true(self, dbfs_api): dbfs_api.client.get_status.return_value = TEST_FILE_JSON