Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ repos:
- pytest
- tornado
- pyarrow
- urllib3
- git+https://github.com/dask/dask
- git+https://github.com/dask/zict

Expand Down
1 change: 0 additions & 1 deletion continuous_integration/environment-mindeps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ dependencies:
- tblib=1.6.0
- toolz=0.12.0
- tornado=6.2.0
- urllib3=1.26.5
- zict=3.0.0
# Distributed depends on the latest version of Dask
- pip
Expand Down
1 change: 0 additions & 1 deletion continuous_integration/recipes/distributed/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ requirements:
- tblib >=1.6.0,!=3.2.0,!=3.2.1
- toolz >=0.10.0
- tornado >=6.2.0
- urllib3 >=1.26.5
- zict >=3.0.0
run_constrained:
- openssl !=1.1.1e
Expand Down
32 changes: 18 additions & 14 deletions distributed/preloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import os
import shutil
import sys
import time
import urllib.error
import urllib.request
from collections.abc import Iterable, Sequence
from importlib import import_module
from types import ModuleType
Expand Down Expand Up @@ -129,21 +132,22 @@ def _import_module(name: str, file_dir: str | None = None) -> ModuleType:
def _download_module(url: str) -> ModuleType:
logger.info("Downloading preload at %s", url)
assert is_webaddress(url)
# This is the only place where urllib3 is used and it is a relatively heavy
# import. Do lazy import to reduce import time
import urllib3

with urllib3.PoolManager() as http:
response = http.request(
method="GET",
url=url,
retries=urllib3.util.Retry(
status_forcelist=[429, 504, 503, 502],
backoff_factor=0.2,
),
)

source = response.data
retryable_codes = {429, 502, 503, 504}
backoff_factor = 0.2
max_retries = 3

for attempt in range(max_retries + 1):
try:
with urllib.request.urlopen(url) as response:
source = response.read()
break
except urllib.error.HTTPError as e:
if e.code in retryable_codes and attempt < max_retries:
retry_delay_seconds = backoff_factor * (2**attempt)
time.sleep(retry_delay_seconds)
continue
raise

compiled = compile(source, url, "exec")
module = ModuleType(url)
Expand Down
20 changes: 8 additions & 12 deletions distributed/tests/test_preload.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ async def test_preload_import_time():
async def test_web_preload():
with (
mock.patch(
"urllib3.PoolManager.request",
"urllib.request.urlopen",
**{
"return_value.data": b"def dask_setup(dask_server):"
"return_value.__enter__.return_value.read.return_value": b"def dask_setup(dask_server):"
b"\n dask_server.foo = 1"
b"\n"
},
) as request,
) as mock_urlopen,
captured_logger("distributed.preloading") as log,
):
async with Scheduler(
Expand All @@ -200,9 +200,7 @@ async def test_web_preload():
)
is not None
)
assert request.mock_calls == [
mock.call(method="GET", url="http://example.com/preload", retries=mock.ANY)
]
assert mock_urlopen.call_args_list == [mock.call("http://example.com/preload")]


@gen_cluster(nthreads=[])
Expand Down Expand Up @@ -233,15 +231,13 @@ async def test_web_preload_worker():
dask.config.set(scheduler_address="tcp://127.0.0.1:{port}")
""").encode()
with mock.patch(
"urllib3.PoolManager.request",
**{"return_value.data": data},
) as request:
"urllib.request.urlopen",
**{"return_value.__enter__.return_value.read.return_value": data},
) as mock_urlopen:
async with Scheduler(port=port, host="localhost", dashboard_address=":0") as s:
async with Nanny(preload_nanny=["http://example.com/preload"]) as nanny:
assert nanny.scheduler_addr == s.address
assert request.mock_calls == [
mock.call(method="GET", url="http://example.com/preload", retries=mock.ANY)
]
assert mock_urlopen.call_args_list == [mock.call("http://example.com/preload")]


# This test is blocked on https://github.com/dask/distributed/issues/5819
Expand Down
1 change: 0 additions & 1 deletion pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ psutil = ">=5.8.0"
sortedcontainers = ">=2.0.5"
tblib = ">=1.6.0,!=3.2.0,!=3.2.1"
tornado = ">=6.2.0"
urllib3 = ">=1.26.5"
zict = ">=3.0.0"
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ dependencies = [
"tblib >= 1.6.0,!=3.2.0,!=3.2.1",
"toolz >= 0.12.0",
"tornado >= 6.2.0",
"urllib3 >= 1.26.5",
"zict >= 3.0.0",
]
dynamic = ["version"]
Expand Down
Loading