From 26c1243048f53e5653ff97ba88118b8546e4fc07 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 2 May 2022 23:37:07 -0700 Subject: [PATCH 1/8] Just use `lz4.block` API These have been around since python-lz4's 0.23.1 release, which came out in February 2018. Given these have been around for a while, it seems reasonable to just require them and drop the workarounds. --- distributed/protocol/compression.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 72172e90dde..f538cf80fee 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -52,18 +52,7 @@ def _fixed_snappy_decompress(data): default_compression = "snappy" with suppress(ImportError): - import lz4 - - try: - # try using the new lz4 API - import lz4.block - - lz4_compress = lz4.block.compress - lz4_decompress = lz4.block.decompress - except ImportError: - # fall back to old one - lz4_compress = lz4.LZ4_compress - lz4_decompress = lz4.LZ4_uncompress + from lz4.block import compress as lz4_compress, decompress as lz4_decompress # helper to bypass missing memoryview support in current lz4 # (fixed in later versions) From c1305103503f4d021224d481e11bfedca170714c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 2 May 2022 23:37:14 -0700 Subject: [PATCH 2/8] Drop python-lz4 workarounds for `memoryview`s In addition to including the `lz4.block` API, the python-lz4 0.23.1 release included proper support for the Python Buffer Protocol. So if we are going to require the new API, it seems reasonable to drop these workarounds as well. --- distributed/protocol/compression.py | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index f538cf80fee..72eb14ab1c9 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -54,30 +54,9 @@ def _fixed_snappy_decompress(data): with suppress(ImportError): from lz4.block import compress as lz4_compress, decompress as lz4_decompress - # helper to bypass missing memoryview support in current lz4 - # (fixed in later versions) - - def _fixed_lz4_compress(data): - try: - return lz4_compress(data) - except TypeError: - if isinstance(data, (memoryview, bytearray)): - return lz4_compress(bytes(data)) - else: - raise - - def _fixed_lz4_decompress(data): - try: - return lz4_decompress(data) - except (ValueError, TypeError): - if isinstance(data, (memoryview, bytearray)): - return lz4_decompress(bytes(data)) - else: - raise - compressions["lz4"] = { - "compress": _fixed_lz4_compress, - "decompress": _fixed_lz4_decompress, + "compress": lz4_compress, + "decompress": lz4_decompress, } default_compression = "lz4" From 00020a442033667bbca9ab00e85d8ecdfd7a1fa9 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 2 May 2022 23:37:21 -0700 Subject: [PATCH 3/8] Be explicit about requiring python-lz4 0.23.1+ Raise an `ImportError` if python-lz4 is too old. Since this is in a `suppress` block, the `ImportError` will be captured, but the effect will be python-lz4 won't be used in this case. --- distributed/protocol/compression.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 72eb14ab1c9..cab43aa85e2 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -11,6 +11,8 @@ from contextlib import suppress from typing import Literal +from packaging.version import parse as parse_version + from tlz import identity import dask @@ -52,6 +54,12 @@ def _fixed_snappy_decompress(data): default_compression = "snappy" with suppress(ImportError): + import lz4 + + # Required to use `lz4.block` APIs and Python Buffer Protocol support. + if parse_version(lz4.__version__) < parse_version("0.23.1"): + raise ImportError("Need lz4 >= 0.23.1") + from lz4.block import compress as lz4_compress, decompress as lz4_decompress compressions["lz4"] = { From 31ee08ad58160084eb9cb1ca79ba92860c9258c2 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 2 May 2022 23:52:48 -0700 Subject: [PATCH 4/8] Drop workaround for python-snappy prior to 0.5.3+ As of python-snappy 0.5.3 (released July 2018), any Python Buffer Protocol object (like `memoryview`s) is supported. Given how long this release has been out, it seems reasonable to drop the prior workaround. Should avoid unneeded copying for other objects (like `memoryview`s). --- distributed/protocol/compression.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index cab43aa85e2..47a61772bbb 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -41,15 +41,9 @@ with suppress(ImportError): import snappy - def _fixed_snappy_decompress(data): - # snappy.decompress() doesn't accept memoryviews - if isinstance(data, (memoryview, bytearray)): - data = bytes(data) - return snappy.decompress(data) - compressions["snappy"] = { "compress": snappy.compress, - "decompress": _fixed_snappy_decompress, + "decompress": snappy.decompress, } default_compression = "snappy" From 3147bfd984706f8c21b2571e440066b9f0f01f07 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 3 May 2022 00:42:52 -0700 Subject: [PATCH 5/8] Explicitly require python-snappy 0.5.3+ Ideally we would test `__version__` however not in a release atm (though it was added recently). In any even it can't reliably be checked for current versions. Instead just test some fast code that reliably fails for versions prior to `0.5.3`. If the code fails, simply raise an `ImportError` noting this requirement. As this is in a `suppress` block, the `ImportError` will be caught. Though the net effect will be disabling python-snappy use if it is too old. --- distributed/protocol/compression.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 47a61772bbb..33e7fb1838f 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -41,6 +41,17 @@ with suppress(ImportError): import snappy + # In python-snappy 0.5.3, support for the Python Buffer Protocol was added. + # This is needed to handle other objects (like `memoryview`s) without + # copying to `bytes` first. + # + # Note: `snappy.__version__` doesn't exist in a release yet. + # So do a little test that will fail if snappy is not 0.5.3 or later. + try: + snappy.compress(memoryview(b"")) + except TypeError: + raise ImportError("Need snappy >= 0.5.3") + compressions["snappy"] = { "compress": snappy.compress, "decompress": snappy.decompress, From 0b263f66e31c15d9071cc70a75eddac97b939348 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 3 May 2022 00:44:24 -0700 Subject: [PATCH 6/8] Add `python-snappy` & `lz4` requirements to CI These should already be satisfied when installed. Though this makes it clearer that these are the required versions (particularly for any users poking around trying to understand things). --- continuous_integration/environment-3.9.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/continuous_integration/environment-3.9.yaml b/continuous_integration/environment-3.9.yaml index a79d010b278..da6b3478c17 100644 --- a/continuous_integration/environment-3.9.yaml +++ b/continuous_integration/environment-3.9.yaml @@ -19,7 +19,7 @@ dependencies: - ipywidgets - jinja2 - locket >=1.0 - - lz4 # Only tested here + - lz4 >=0.23.1 # Only tested here - msgpack-python - netcdf4 - paramiko @@ -32,7 +32,7 @@ dependencies: - pytest-repeat - pytest-rerunfailures - pytest-timeout - - python-snappy # Only tested here + - python-snappy >=0.5.3 # Only tested here - pytorch # Only tested here - requests - s3fs From d2a18adc34c0977288a458e94944c3beb069b116 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 3 May 2022 00:50:48 -0700 Subject: [PATCH 7/8] Make linter happy --- distributed/protocol/compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 33e7fb1838f..5e6c6b271e0 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -12,7 +12,6 @@ from typing import Literal from packaging.version import parse as parse_version - from tlz import identity import dask @@ -65,7 +64,8 @@ if parse_version(lz4.__version__) < parse_version("0.23.1"): raise ImportError("Need lz4 >= 0.23.1") - from lz4.block import compress as lz4_compress, decompress as lz4_decompress + from lz4.block import compress as lz4_compress + from lz4.block import decompress as lz4_decompress compressions["lz4"] = { "compress": lz4_compress, From 3aa8e2fefb9cf3c4d88aac677461dcfbf715a0e4 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 3 May 2022 01:23:48 -0700 Subject: [PATCH 8/8] Require `zstandard` version `0.9.0+` In `zstandard`'s `0.9.0` release, support for the Python Buffer Protocol was added. As we are supporting this functionality with other compressors, go ahead and support it here too. --- continuous_integration/environment-3.10.yaml | 2 +- continuous_integration/environment-3.8.yaml | 2 +- continuous_integration/environment-3.9.yaml | 2 +- distributed/protocol/compression.py | 4 ++++ 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml index 7279c6b84d8..d92f3fb902c 100644 --- a/continuous_integration/environment-3.10.yaml +++ b/continuous_integration/environment-3.10.yaml @@ -39,7 +39,7 @@ dependencies: - toolz - tornado - zict # overridden by git tip below - - zstandard + - zstandard >=0.9.0 - pip: - git+https://github.com/dask/dask - git+https://github.com/dask/s3fs diff --git a/continuous_integration/environment-3.8.yaml b/continuous_integration/environment-3.8.yaml index 0dbe6377f1b..7268d049531 100644 --- a/continuous_integration/environment-3.8.yaml +++ b/continuous_integration/environment-3.8.yaml @@ -41,7 +41,7 @@ dependencies: - toolz - tornado - zict - - zstandard + - zstandard >=0.9.0 - pip: - git+https://github.com/dask/dask - git+https://github.com/jcrist/crick # Only tested here diff --git a/continuous_integration/environment-3.9.yaml b/continuous_integration/environment-3.9.yaml index da6b3478c17..7eabb56ac03 100644 --- a/continuous_integration/environment-3.9.yaml +++ b/continuous_integration/environment-3.9.yaml @@ -44,7 +44,7 @@ dependencies: - torchvision # Only tested here - tornado - zict - - zstandard + - zstandard >=0.9.0 - pip: - git+https://github.com/dask/dask - keras diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 5e6c6b271e0..33e504137ff 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -77,6 +77,10 @@ with suppress(ImportError): import zstandard + # Required for Python Buffer Protocol support. + if parse_version(zstandard.__version__) < parse_version("0.9.0"): + raise ImportError("Need zstandard >= 0.9.0") + zstd_compressor = zstandard.ZstdCompressor( level=dask.config.get("distributed.comm.zstd.level"), threads=dask.config.get("distributed.comm.zstd.threads"),