Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cluster.scheduler_info rather than cluster.scheduler #73

Merged
merged 5 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ci/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ conda update -q conda
conda create -q -n test-environment python=$PYTHON
source activate test-environment
conda install -q \
dask=2.1 \
pytest=3.7 \
notebook

# Install unreleased versions of dask and distributed for now
# in order to get a patched config system.
pip install git+https://github.com/dask/dask.git@677d62a35bae0fb964472b604bc52ef91b46ea22
pip install git+https://github.com/dask/distributed.git@538767b4977d1bd14679ae555b7705088a7e5a16

pip install -e .
15 changes: 12 additions & 3 deletions dask_labextension/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,25 @@ def make_cluster_model(
"""
# This would be a great target for a dataclass
# once python 3.7 is in wider use.
try:
info = cluster.scheduler_info
except AttributeError:
info = cluster.scheduler.identity()
try:
cores = sum(d["nthreads"] for d in info["workers"].values())
except KeyError: # dask.__version__ < 2.0
cores = sum(d["ncores"] for d in info["workers"].values())
assert isinstance(info, dict)
model = dict(
id=cluster_id,
name=cluster_name,
scheduler_address=cluster.scheduler_address,
dashboard_link=cluster.dashboard_link or "",
workers=len(cluster.scheduler.workers),
workers=len(info["workers"]),
memory=utils.format_bytes(
sum(ws.memory_limit for ws in cluster.scheduler.workers.values())
sum(d["memory_limit"] for d in info["workers"].values())
),
cores=sum(ws.ncores for ws in cluster.scheduler.workers.values()),
cores=cores,
)
if adaptive:
model["adapt"] = {"minimum": adaptive.minimum, "maximum": adaptive.maximum}
Expand Down
58 changes: 31 additions & 27 deletions dask_labextension/tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,23 @@
from dask_labextension.manager import DaskClusterManager


config = {
'labextension': {
"initial": [],
"default": {},
'factory': {
"module": "dask.distributed",
"class": "LocalCluster",
"kwargs": {"processes": False},
"args": []
}
}
}


@gen_test()
async def test_start():
with dask.config.set({
'labextension.defaults.kwargs': {'processes': False}, # for speed
'labextension.initial': [],
}):
with dask.config.set(config):
async with DaskClusterManager() as manager:
# add cluster
model = await manager.start_cluster()
Expand All @@ -33,10 +44,7 @@ async def test_start():

@gen_test()
async def test_close():
with dask.config.set({
'labextension.defaults.kwargs': {'processes': False}, # for speed
'labextension.initial': [],
}):
with dask.config.set(config):
async with DaskClusterManager() as manager:
# start a cluster
model = await manager.start_cluster()
Expand All @@ -50,10 +58,7 @@ async def test_close():

@gen_test()
async def test_get():
with dask.config.set({
'labextension.defaults.kwargs': {'processes': False}, # for speed
'labextension.initial': [],
}):
with dask.config.set(config):
async with DaskClusterManager() as manager:
# start a cluster
model = await manager.start_cluster()
Expand All @@ -67,10 +72,7 @@ async def test_get():
@pytest.mark.filterwarnings('ignore')
@gen_test()
async def test_list():
with dask.config.set({
'labextension.defaults.kwargs': {'processes': False}, # for speed
'labextension.initial': [],
}):
with dask.config.set(config):
async with DaskClusterManager() as manager:
# start with an empty list
assert not manager.list_clusters()
Expand All @@ -85,10 +87,7 @@ async def test_list():

@gen_test()
async def test_scale():
with dask.config.set({
'labextension.defaults.kwargs': {'processes': False}, # for speed
'labextension.initial': [],
}):
with dask.config.set(config):
async with DaskClusterManager() as manager:
# add cluster with number of workers configuration
model = await manager.start_cluster(configuration={'workers': 3})
Expand All @@ -110,10 +109,7 @@ async def test_scale():

@gen_test()
async def test_adapt():
with dask.config.set({
'labextension.defaults.kwargs': {'processes': False}, # for speed
'labextension.initial': [],
}):
with dask.config.set(config):
async with DaskClusterManager() as manager:
# add a new cluster
model = await manager.start_cluster()
Expand All @@ -127,9 +123,17 @@ async def test_adapt():
@gen_test()
async def test_initial():
with dask.config.set({
'labextension.defaults.kwargs': {'processes': False}, # for speed
'labextension.initial': [{'name': 'foo'}],
}):
'labextension': {
"initial": [{"name": "foo"}],
"default": {},
'factory': {
"module": "dask.distributed",
"class": "LocalCluster",
"kwargs": {"processes": False},
"args": []
}
}
}):
# Test asynchronous starting of clusters via a context
async with DaskClusterManager() as manager:
clusters = manager.list_clusters()
Expand Down