Skip to content

Commit

Permalink
Azure blockblob backend parametrized connection/read timeouts (celery…
Browse files Browse the repository at this point in the history
…#6978)

* Initial hardcoded (sorry) change to the celery azure block blob backend.

This is required to check if this change has any influence.
If it does I will make it proper config option in celery itself.

* Add sensible defaults for azure block blob backend.

The problem we hit in production is on certain network errors (suspect partitioning) the client becomes stuck on the default read timeout for an ssl socket
which in azure is defined in `/azure/storage/blob/_shared/constants.py` as READ_TIMEOUT = 80000 (seconds) for python versions > 3.5.

This means that for those python versions the operation is stuck for 55.555[...] days until it times out which is obviously not ideal :).

This sets the timeouts at 20s for connection (which is the current default) and 120s for all python versions, which with modern connections is sufficient.

If we think it should be higher - I can increase it but we definitely should give the user an option to set their own timeouts based on file sizes and bandwidths they are operating on.

* Update docs a bit.

* Update docs/userguide/configuration.rst

Co-authored-by: Omer Katz <omer.drow@gmail.com>

* Add test confirming azure blob client is configured correctly based on values supplied from configuration dictionary.

Co-authored-by: tomaszkluczkowski <tomasz.kluczkowski@flexciton.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
Co-authored-by: Omer Katz <omer.drow@gmail.com>
  • Loading branch information
4 people committed Oct 6, 2021
1 parent d377322 commit 4945291
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 1 deletion.
2 changes: 2 additions & 0 deletions celery/app/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def __repr__(self):
retry_increment_base=Option(2, type='int'),
retry_max_attempts=Option(3, type='int'),
base_path=Option('', type='string'),
connection_timeout=Option(20, type='int'),
read_timeout=Option(120, type='int'),
),
control=Namespace(
queue_ttl=Option(300.0, type='float'),
Expand Down
10 changes: 9 additions & 1 deletion celery/backends/azureblockblob.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def __init__(self,
conf["azureblockblob_container_name"])

self.base_path = conf.get('azureblockblob_base_path', '')
self._connection_timeout = conf.get(
'azureblockblob_connection_timeout', 20
)
self._read_timeout = conf.get('azureblockblob_read_timeout', 120)

@classmethod
def _parse_url(cls, url, prefix="azureblockblob://"):
Expand All @@ -61,7 +65,11 @@ def _blob_service_client(self):
the container is created if it doesn't yet exist.
"""
client = BlobServiceClient.from_connection_string(self._connection_string)
client = BlobServiceClient.from_connection_string(
self._connection_string,
connection_timeout=self._connection_timeout,
read_timeout=self._read_timeout
)

try:
client.create_container(name=self._container_name)
Expand Down
18 changes: 18 additions & 0 deletions docs/userguide/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1599,6 +1599,24 @@ Default: 3.

The maximum number of retry attempts.

.. setting:: azureblockblob_connection_timeout

``azureblockblob_connection_timeout``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Default: 20.

Timeout in seconds for establishing the azure block blob connection.

.. setting:: azureblockblob_read_timeout

``azureblockblob_read_timeout``
~~~~~~~~~~~~~~~~~~~~

Default: 120.

Timeout in seconds for reading of an azure block blob.

.. _conf-elasticsearch-result-backend:

Elasticsearch backend settings
Expand Down
36 changes: 36 additions & 0 deletions t/unit/backends/test_azureblockblob.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,42 @@ def test_create_client(self, mock_blob_service_factory):
assert backend._blob_service_client is not None
assert mock_blob_service_client_instance.create_container.call_count == 1

@patch(MODULE_TO_MOCK + ".BlobServiceClient")
def test_configure_client(self, mock_blob_service_factory):

connection_timeout = 3
read_timeout = 11
self.app.conf.update(
{
'azureblockblob_connection_timeout': connection_timeout,
'azureblockblob_read_timeout': read_timeout,
}
)

mock_blob_service_client_instance = Mock()
mock_blob_service_factory.from_connection_string.return_value = (
mock_blob_service_client_instance
)

base_url = "azureblockblob://"
connection_string = "connection_string"
backend = AzureBlockBlobBackend(
app=self.app, url=f'{base_url}{connection_string}'
)

client = backend._blob_service_client
assert client is mock_blob_service_client_instance

(
mock_blob_service_factory
.from_connection_string
.assert_called_once_with(
connection_string,
connection_timeout=connection_timeout,
read_timeout=read_timeout
)
)

@patch(MODULE_TO_MOCK + ".AzureBlockBlobBackend._blob_service_client")
def test_get(self, mock_client, base_path):
self.backend.base_path = base_path
Expand Down

0 comments on commit 4945291

Please sign in to comment.