From cc5437d7ac641109b2148777f64d1cb4ac284952 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sun, 28 Jul 2019 11:39:26 -0700 Subject: [PATCH 1/5] Use cluster.scheduler_info rather than cluster.scheduler Supercedes https://github.com/dask/dask-labextension/pull/72 This depends on https://github.com/dask/distributed/pull/2902 , which adds a `Cluster.scheduler_info` attribute to clusters which holds necessary scheduler information. We prefer this over querying a Scheduler object directly in case that scheduler is not local, as in increasingly becoming the case. --- dask_labextension/manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_labextension/manager.py b/dask_labextension/manager.py index 753442a..a774b97 100644 --- a/dask_labextension/manager.py +++ b/dask_labextension/manager.py @@ -269,11 +269,11 @@ def make_cluster_model( name=cluster_name, scheduler_address=cluster.scheduler_address, dashboard_link=cluster.dashboard_link or "", - workers=len(cluster.scheduler.workers), + workers=len(cluster.scheduler_info["workers"]), memory=utils.format_bytes( - sum(ws.memory_limit for ws in cluster.scheduler.workers.values()) + sum(d["memory_limit"] for d in cluster.scheduler_info["workers"].values()) ), - cores=sum(ws.ncores for ws in cluster.scheduler.workers.values()), + cores=sum(d["nthreads"] for d in cluster.scheduler_info["workers"].values()), ) if adaptive: model["adapt"] = {"minimum": adaptive.minimum, "maximum": adaptive.maximum} From 3fa745475cfc2c8f03686023826fb701878f8a40 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 31 Jul 2019 16:55:35 -0700 Subject: [PATCH 2/5] Make changes robust to old versions --- dask_labextension/manager.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dask_labextension/manager.py b/dask_labextension/manager.py index a774b97..c9d70d7 100644 --- a/dask_labextension/manager.py +++ b/dask_labextension/manager.py @@ -264,16 +264,21 @@ def make_cluster_model( """ # This would be a great target for a dataclass # once python 3.7 is in wider use. + try: + info = cluster.scheduler_info + except AttributeError: + info = cluster.scheduler.identity() + assert isinstance(info, dict) model = dict( id=cluster_id, name=cluster_name, scheduler_address=cluster.scheduler_address, dashboard_link=cluster.dashboard_link or "", - workers=len(cluster.scheduler_info["workers"]), + workers=len(info["workers"]), memory=utils.format_bytes( - sum(d["memory_limit"] for d in cluster.scheduler_info["workers"].values()) + sum(d["memory_limit"] for d in info["workers"].values()) ), - cores=sum(d["nthreads"] for d in cluster.scheduler_info["workers"].values()), + cores=sum(d["nthreads"] for d in info["workers"].values()), ) if adaptive: model["adapt"] = {"minimum": adaptive.minimum, "maximum": adaptive.maximum} From 3ae2bccf727941f9abe76b6e85afdc3e1610dd6e Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 31 Jul 2019 16:55:44 -0700 Subject: [PATCH 3/5] Fixup config in tests Otherwise we didn't seem to be getting the baseline config, which was causing errors. --- dask_labextension/tests/test_manager.py | 58 +++++++++++++------------ 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/dask_labextension/tests/test_manager.py b/dask_labextension/tests/test_manager.py index 05ae86f..5d6bb93 100644 --- a/dask_labextension/tests/test_manager.py +++ b/dask_labextension/tests/test_manager.py @@ -8,12 +8,23 @@ from dask_labextension.manager import DaskClusterManager +config = { + 'labextension': { + "initial": [], + "default": {}, + 'factory': { + "module": "dask.distributed", + "class": "LocalCluster", + "kwargs": {"processes": False}, + "args": [] + } + } +} + + @gen_test() async def test_start(): - with dask.config.set({ - 'labextension.defaults.kwargs': {'processes': False}, # for speed - 'labextension.initial': [], - }): + with dask.config.set(config): async with DaskClusterManager() as manager: # add cluster model = await manager.start_cluster() @@ -33,10 +44,7 @@ async def test_start(): @gen_test() async def test_close(): - with dask.config.set({ - 'labextension.defaults.kwargs': {'processes': False}, # for speed - 'labextension.initial': [], - }): + with dask.config.set(config): async with DaskClusterManager() as manager: # start a cluster model = await manager.start_cluster() @@ -50,10 +58,7 @@ async def test_close(): @gen_test() async def test_get(): - with dask.config.set({ - 'labextension.defaults.kwargs': {'processes': False}, # for speed - 'labextension.initial': [], - }): + with dask.config.set(config): async with DaskClusterManager() as manager: # start a cluster model = await manager.start_cluster() @@ -67,10 +72,7 @@ async def test_get(): @pytest.mark.filterwarnings('ignore') @gen_test() async def test_list(): - with dask.config.set({ - 'labextension.defaults.kwargs': {'processes': False}, # for speed - 'labextension.initial': [], - }): + with dask.config.set(config): async with DaskClusterManager() as manager: # start with an empty list assert not manager.list_clusters() @@ -85,10 +87,7 @@ async def test_list(): @gen_test() async def test_scale(): - with dask.config.set({ - 'labextension.defaults.kwargs': {'processes': False}, # for speed - 'labextension.initial': [], - }): + with dask.config.set(config): async with DaskClusterManager() as manager: # add cluster with number of workers configuration model = await manager.start_cluster(configuration={'workers': 3}) @@ -110,10 +109,7 @@ async def test_scale(): @gen_test() async def test_adapt(): - with dask.config.set({ - 'labextension.defaults.kwargs': {'processes': False}, # for speed - 'labextension.initial': [], - }): + with dask.config.set(config): async with DaskClusterManager() as manager: # add a new cluster model = await manager.start_cluster() @@ -127,9 +123,17 @@ async def test_adapt(): @gen_test() async def test_initial(): with dask.config.set({ - 'labextension.defaults.kwargs': {'processes': False}, # for speed - 'labextension.initial': [{'name': 'foo'}], - }): + 'labextension': { + "initial": [{"name": "foo"}], + "default": {}, + 'factory': { + "module": "dask.distributed", + "class": "LocalCluster", + "kwargs": {"processes": False}, + "args": [] + } + } + }): # Test asynchronous starting of clusters via a context async with DaskClusterManager() as manager: clusters = manager.list_clusters() From 3ab9d8fa5c760db944e0168bfd987ccb7973f614 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 31 Jul 2019 17:41:10 -0700 Subject: [PATCH 4/5] use dask=2.1 in CI --- ci/install.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ci/install.sh b/ci/install.sh index 21ba9ba..a7290d5 100644 --- a/ci/install.sh +++ b/ci/install.sh @@ -8,12 +8,11 @@ conda update -q conda conda create -q -n test-environment python=$PYTHON source activate test-environment conda install -q \ + dask=2.1 \ pytest=3.7 \ notebook # Install unreleased versions of dask and distributed for now # in order to get a patched config system. -pip install git+https://github.com/dask/dask.git@677d62a35bae0fb964472b604bc52ef91b46ea22 -pip install git+https://github.com/dask/distributed.git@538767b4977d1bd14679ae555b7705088a7e5a16 pip install -e . From 22be0ea68c476b60cde146f140f3959aa3047b6b Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 31 Jul 2019 18:30:55 -0700 Subject: [PATCH 5/5] Support older versions of dask/distributed with ncores keyword --- dask_labextension/manager.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dask_labextension/manager.py b/dask_labextension/manager.py index c9d70d7..8092902 100644 --- a/dask_labextension/manager.py +++ b/dask_labextension/manager.py @@ -268,6 +268,10 @@ def make_cluster_model( info = cluster.scheduler_info except AttributeError: info = cluster.scheduler.identity() + try: + cores = sum(d["nthreads"] for d in info["workers"].values()) + except KeyError: # dask.__version__ < 2.0 + cores = sum(d["ncores"] for d in info["workers"].values()) assert isinstance(info, dict) model = dict( id=cluster_id, @@ -278,7 +282,7 @@ def make_cluster_model( memory=utils.format_bytes( sum(d["memory_limit"] for d in info["workers"].values()) ), - cores=sum(d["nthreads"] for d in info["workers"].values()), + cores=cores, ) if adaptive: model["adapt"] = {"minimum": adaptive.minimum, "maximum": adaptive.maximum}