Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 69 additions & 43 deletions databricks_cli/dbfs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import tempfile

import re
import functools
import click

from tenacity import retry, wait_random_exponential, retry_if_exception_type, stop_after_attempt
Expand All @@ -39,10 +40,6 @@
from databricks_cli.dbfs.exceptions import LocalFileExistsException, RateLimitException

BUFFER_SIZE_BYTES = 2**20
EXPONENTIAL_BACKOFF_MULTIPLIER = 1
MAX_SECONDS_WAIT = 60
MAX_RETRY_ATTEMPTS = 8
time_for_last_retry = 0


class ParseException(Exception):
Expand Down Expand Up @@ -83,46 +80,75 @@ class DbfsErrorCodes(object):
TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS'


def before_sleep_on_429(retry_state):
global time_for_last_retry
if retry_state.attempt_number < 1:
click.echo("Warning: Unexpected retry_state.attempt_number={}.".format(
retry_state.attempt_number))
click.echo("Received 429 REQUEST_LIMIT_EXCEEDED. Retrying with exponential backoff.")
else:
# Initialize time_for_last_retry on the first attempt.
if retry_state.attempt_number == 1:
time_for_last_retry = 0
# Note: Here idle_for represents the total time spent sleeping in all retries so far +
# the time that we will sleep until the next retry. We determined this empirically,
# as it is not clearly stated in the Tenacity docs.
time_until_next_retry = retry_state.idle_for - time_for_last_retry
class CustomRetryState(object):
def __init__(self):
self.time_for_last_retry = 0

def reset(self):
self.time_for_last_retry = 0


class Retry429(object):
EXPONENTIAL_BACKOFF_MULTIPLIER = 1
MAX_SECONDS_WAIT = 60
MAX_RETRY_ATTEMPTS = 8

def __init__(self, func):
"""
If there are no decorator arguments, the function to be decorated is passed to
the constructor. It is called only once for each function decorated with it.
"""
self.retry_state_429 = CustomRetryState()

@retry(wait=wait_random_exponential(multiplier=self.EXPONENTIAL_BACKOFF_MULTIPLIER,
max=self.MAX_SECONDS_WAIT), retry=retry_if_exception_type(RateLimitException),
stop=stop_after_attempt(self.MAX_RETRY_ATTEMPTS), reraise=True,
before_sleep=lambda retry_state: self.before_sleep_on_429(retry_state,
self.retry_state_429))
def wrapped_function(*args, **kwargs):
try:
return func(*args, **kwargs)
except HTTPError as e:
if e.response.status_code == 429:
raise RateLimitException("429 Too Many Requests")
raise e

self.func = wrapped_function

def __call__(self, *args, **kwargs):
"""
The __call__ method is called every time a decorated function is called.
"""
self.retry_state_429.reset()

return self.func(*args, **kwargs)

def __get__(self, obj, objtype):
"""
Making this decorator a descriptor such that we can use it on class methods.
See https://stackoverflow.com/a/3296318/12359607
"""
return functools.partial(self.__call__, obj)

@staticmethod
def before_sleep_on_429(retry_state, retry_state_429):
"""
Note: Here idle_for represents the total time spent sleeping in all retries so far +
the time that we will sleep until the next retry. We determined this empirically,
as it is not clearly stated in the Tenacity docs.
"""
time_until_next_retry = retry_state.idle_for - retry_state_429.time_for_last_retry
click.echo(("Received 429 REQUEST_LIMIT_EXCEEDED for attempt {}. "
"Retrying in {:.2f} seconds.").format(retry_state.attempt_number,
time_until_next_retry))
time_for_last_retry = retry_state.idle_for


def retry_429(func):
@retry(wait=wait_random_exponential(multiplier=EXPONENTIAL_BACKOFF_MULTIPLIER,
max=MAX_SECONDS_WAIT), retry=retry_if_exception_type(RateLimitException),
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS), reraise=True,
before_sleep=before_sleep_on_429)
def wrapped_function(*args, **kwargs):
try:
return func(*args, **kwargs)
except HTTPError as e:
if e.response.status_code == 429:
raise RateLimitException("429 Too Many Requests")
raise e
return wrapped_function
retry_state_429.time_for_last_retry = retry_state.idle_for


class DbfsApi(object):
def __init__(self, api_client):
self.client = DbfsService(api_client)

@retry_429
@Retry429
Copy link
Copy Markdown
Collaborator Author

@bogdanghita-db bogdanghita-db Sep 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a class now, so name must be camel case.

def list_files(self, dbfs_path, headers=None):
list_response = self.client.list(dbfs_path.absolute_path, headers=headers)
if 'files' in list_response:
Expand All @@ -143,20 +169,20 @@ def file_exists(self, dbfs_path, headers=None):
raise e
return True

@retry_429
@Retry429
def get_status(self, dbfs_path, headers=None):
json = self.client.get_status(dbfs_path.absolute_path, headers=headers)
return FileInfo.from_json(json)

@retry_429
@Retry429
def create(self, dbfs_path, overwrite, headers):
return self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)

@retry_429
@Retry429
def add_block(self, handle, contents, headers):
self.client.add_block(handle, contents, headers=headers)

@retry_429
@Retry429
def close(self, handle, headers):
self.client.close(handle, headers=headers)

Expand All @@ -171,7 +197,7 @@ def put_file(self, src_path, dbfs_path, overwrite, headers=None):
self.add_block(handle, b64encode(contents).decode(), headers=headers)
self.close(handle, headers=headers)

@retry_429
@Retry429
def read(self, dbfs_path, offset, headers):
return self.client.read(dbfs_path.absolute_path, offset, BUFFER_SIZE_BYTES, headers=headers)

Expand Down Expand Up @@ -204,7 +230,7 @@ def get_num_files_deleted(partial_delete_error):
message))
return int(m.group(1))

@retry_429
@Retry429
def delete(self, dbfs_path, recursive, headers=None):
num_files_deleted = 0
while True:
Expand Down Expand Up @@ -232,11 +258,11 @@ def delete(self, dbfs_path, recursive, headers=None):
break
click.echo("\rDelete finished successfully.\033[K")

@retry_429
@Retry429
def mkdirs(self, dbfs_path, headers=None):
self.client.mkdirs(dbfs_path.absolute_path, headers=headers)

@retry_429
@Retry429
def move(self, dbfs_src, dbfs_dst, headers=None):
self.client.move(dbfs_src.absolute_path, dbfs_dst.absolute_path, headers=headers)

Expand Down
3 changes: 1 addition & 2 deletions tests/dbfs/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
'file_size': 1
}
TEST_FILE_INFO = api.FileInfo(TEST_DBFS_PATH, False, 1)
MAX_RETRY_ATTEMPTS = 8


def get_resource_does_not_exist_exception():
Expand Down Expand Up @@ -141,7 +140,7 @@ def test_mkdirs_stop_retrying(self, dbfs_api):
dbfs_api.client.mkdirs = mock.Mock(side_effect=exception_sequence)
with pytest.raises(RateLimitException):
dbfs_api.mkdirs(DbfsPath('dbfs:/test/mkdir'))
assert dbfs_api.client.mkdirs.call_count == MAX_RETRY_ATTEMPTS
assert dbfs_api.client.mkdirs.call_count == api.Retry429.MAX_RETRY_ATTEMPTS

def test_file_exists_true(self, dbfs_api):
dbfs_api.client.get_status.return_value = TEST_FILE_JSON
Expand Down