Skip to content

Commit

Permalink
Merge branch 'main' into 7679-perf-metrics-bokeh-dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Apr 26, 2023
2 parents bdd3f91 + afe6491 commit 5e7947c
Show file tree
Hide file tree
Showing 38 changed files with 442 additions and 164 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
name: pre-commit hooks
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3.5.0
- uses: actions/checkout@v3.5.2
- uses: actions/setup-python@v4
with:
python-version: '3.9'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
name: Build (and upload)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3.5.0
- uses: actions/checkout@v3.5.2
with:
fetch-depth: 0
- name: Set up Python
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
run:
shell: bash -l {0}
steps:
- uses: actions/checkout@v3.5.0
- uses: actions/checkout@v3.5.2

- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v2.2.0
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
shell: bash

- name: Checkout source
uses: actions/checkout@v3.5.0
uses: actions/checkout@v3.5.2
with:
fetch-depth: 0

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/update-gpuci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
if: github.repository == 'dask/distributed'

steps:
- uses: actions/checkout@v3.5.0
- uses: actions/checkout@v3.5.2

- name: Parse current axis YAML
id: rapids_current
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- coverage
- dask # overridden by git tip below
- filesystem-spec # overridden by git tip below
- gilknocker>=0.4.0
- h5py
- ipykernel <6.22.0 # https://github.com/dask/distributed/issues/7688
- ipywidgets <8.0.5 # https://github.com/dask/distributed/issues/7688
Expand Down Expand Up @@ -47,4 +48,3 @@ dependencies:
- git+https://github.com/dask/zict
- git+https://github.com/fsspec/filesystem_spec
- keras
- gilknocker>=0.4.0
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- coverage
- dask # overridden by git tip below
- filesystem-spec # overridden by git tip below
- gilknocker>=0.4.0
- h5py
- ipykernel <6.22.0 # https://github.com/dask/distributed/issues/7688
- ipywidgets <8.0.5 # https://github.com/dask/distributed/issues/7688
Expand Down Expand Up @@ -47,4 +48,3 @@ dependencies:
- git+https://github.com/dask/zict
- git+https://github.com/fsspec/filesystem_spec
- keras
- gilknocker>=0.4.0
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.8.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies:
- cython # Only tested here; also a dependency of crick
- dask # overridden by git tip below
- filesystem-spec
- gilknocker>=0.4.0
- h5py
- ipykernel <6.22.0 # https://github.com/dask/distributed/issues/7688
- ipywidgets <8.0.5 # https://github.com/dask/distributed/issues/7688
Expand Down Expand Up @@ -48,4 +49,3 @@ dependencies:
- git+https://github.com/dask/dask
- git+https://github.com/jcrist/crick # Only tested here
- keras
- gilknocker>=0.4.0
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies:
- coverage
- dask # overridden by git tip below
- filesystem-spec
- gilknocker>=0.4.0
- h5py
- ipykernel <6.22.0 # https://github.com/dask/distributed/issues/7688
- ipywidgets <8.0.5 # https://github.com/dask/distributed/issues/7688
Expand Down Expand Up @@ -50,4 +51,3 @@ dependencies:
- pip:
- git+https://github.com/dask/dask
- keras
- gilknocker>=0.4.0
6 changes: 5 additions & 1 deletion distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,11 @@ def wait_for_workers(
raise ValueError(
f"`n_workers` must be a positive integer. Instead got {n_workers}."
)
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)

if self.cluster is None:
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)

return self.cluster.wait_for_workers(n_workers, timeout)

def _heartbeat(self):
# Don't send heartbeat if scheduler comm or cluster are already closed
Expand Down
13 changes: 3 additions & 10 deletions distributed/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,10 @@ async def to_thread(func, /, *args, **kwargs):
if sys.version_info >= (3, 9):
from random import randbytes
else:
try:
import numpy
from random import getrandbits

def randbytes(size):
return numpy.random.randint(255, size=size, dtype="u8").tobytes()

except ImportError:
import secrets

def randbytes(size):
return secrets.token_bytes(size)
def randbytes(size):
return getrandbits(size * 8).to_bytes(size, "little")


if tornado.version_info >= (6, 2, 0, 0):
Expand Down
55 changes: 32 additions & 23 deletions distributed/config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
from __future__ import annotations

import asyncio
import logging
import logging.config
import os
import sys
from typing import Any

import yaml

import dask
from dask.utils import import_required

from distributed.compatibility import WINDOWS, logging_names

config = dask.config.config


Expand Down Expand Up @@ -68,7 +66,15 @@
logger = logging.getLogger(__name__)


def _initialize_logging_old_style(config):
if sys.version_info >= (3, 11):
_logging_get_level_names_mapping = logging.getLevelNamesMapping
else:

def _logging_get_level_names_mapping() -> dict[str, int]:
return logging._nameToLevel.copy()


def _initialize_logging_old_style(config: dict[Any, Any]) -> None:
"""
Initialize logging using the "old-style" configuration scheme, e.g.:
{
Expand All @@ -79,7 +85,7 @@ def _initialize_logging_old_style(config):
}
}
"""
loggers = { # default values
loggers: dict[str, str | int] = { # default values
"distributed": "info",
"distributed.client": "warning",
"bokeh": "error",
Expand All @@ -95,50 +101,53 @@ def _initialize_logging_old_style(config):
dask.config.get("distributed.admin.log-format", config=config)
)
)
for name, level in loggers.items():
if isinstance(level, str):
level = logging_names[level.upper()]
logging_names = _logging_get_level_names_mapping()
for name, raw_level in sorted(loggers.items()):
level = (
logging_names[raw_level.upper()]
if isinstance(raw_level, str)
else raw_level
)
logger = logging.getLogger(name)
logger.setLevel(level)

# Ensure that we're not registering the logger twice in this hierarchy.
anc = None
anc = logging.getLogger(None)
already_registered = False
for ancestor in name.split("."):
if anc is None:
anc = logging.getLogger(ancestor)
else:
anc.getChild(ancestor)

if handler in anc.handlers:
for ancestor in name.split("."):
if anc.handlers:
already_registered = True
break
anc.getChild(ancestor)

if not already_registered:
logger.addHandler(handler)
logger.propagate = False


def _initialize_logging_new_style(config):
def _initialize_logging_new_style(config: dict[Any, Any]) -> None:
"""
Initialize logging using logging's "Configuration dictionary schema".
(ref.: https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema)
"""
base_config = _find_logging_config(config)
logging.config.dictConfig(base_config.get("logging"))
logging.config.dictConfig(base_config.get("logging")) # type: ignore[arg-type]


def _initialize_logging_file_config(config):
def _initialize_logging_file_config(config: dict[Any, Any]) -> None:
"""
Initialize logging using logging's "Configuration file format".
(ref.: https://docs.python.org/3/howto/logging.html#configuring-logging)
"""
base_config = _find_logging_config(config)
logging.config.fileConfig(
base_config.get("logging-file-config"), disable_existing_loggers=False
base_config.get("logging-file-config"), # type: ignore[arg-type]
disable_existing_loggers=False,
)


def _find_logging_config(config):
def _find_logging_config(config: dict[Any, Any]) -> dict[Any, Any]:
"""
Look for the dictionary containing logging-specific configurations,
starting in the 'distributed' dictionary and then trying the top-level
Expand All @@ -150,7 +159,7 @@ def _find_logging_config(config):
return config


def initialize_logging(config):
def initialize_logging(config: dict[Any, Any]) -> None:
base_config = _find_logging_config(config)
if "logging-file-config" in base_config:
if "logging" in base_config:
Expand All @@ -168,7 +177,7 @@ def initialize_logging(config):
_initialize_logging_old_style(config)


def initialize_event_loop(config):
def initialize_event_loop(config: dict[Any, Any]) -> None:
event_loop = dask.config.get("distributed.admin.event-loop")
if event_loop == "uvloop":
uvloop = import_required(
Expand All @@ -182,7 +191,7 @@ def initialize_event_loop(config):
)
uvloop.install()
elif event_loop in {"asyncio", "tornado"}:
if WINDOWS:
if sys.platform == "win32":
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
# https://github.com/tornadoweb/tornado/issues/2608
Expand Down
3 changes: 2 additions & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ async def _close_on_failure(exc: Exception) -> None:
except Exception as exc:
await _close_on_failure(exc)
raise RuntimeError(f"{type(self).__name__} failed to start.") from exc
self.status = Status.running
if self.status == Status.init:
self.status = Status.running
return self

async def __aenter__(self):
Expand Down
55 changes: 55 additions & 0 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from distributed.compatibility import PeriodicCallback
from distributed.core import Status
from distributed.deploy.adaptive import Adaptive
from distributed.metrics import time
from distributed.objects import SchedulerInfo
from distributed.utils import (
Log,
Expand All @@ -33,6 +34,9 @@
logger = logging.getLogger(__name__)


no_default = "__no_default__"


class Cluster(SyncMethodMixin):
"""Superclass for cluster objects
Expand Down Expand Up @@ -582,6 +586,57 @@ def __eq__(self, other):
def __hash__(self):
return id(self)

async def _wait_for_workers(self, n_workers=0, timeout=None):
self.scheduler_info = SchedulerInfo(await self.scheduler_comm.identity())
if timeout:
deadline = time() + parse_timedelta(timeout)
else:
deadline = None

def running_workers(info):
return len(
[
ws
for ws in info["workers"].values()
if ws["status"] == Status.running.name
]
)

while n_workers and running_workers(self.scheduler_info) < n_workers:
if deadline and time() > deadline:
raise TimeoutError(
"Only %d/%d workers arrived after %s"
% (running_workers(self.scheduler_info), n_workers, timeout)
)
await asyncio.sleep(0.1)

self.scheduler_info = SchedulerInfo(await self.scheduler_comm.identity())

def wait_for_workers(
self, n_workers: int | str = no_default, timeout: float | None = None
) -> None:
"""Blocking call to wait for n workers before continuing
Parameters
----------
n_workers : int
The number of workers
timeout : number, optional
Time in seconds after which to raise a
``dask.distributed.TimeoutError``
"""
if n_workers is no_default:
warnings.warn(
"Please specify the `n_workers` argument when using `Client.wait_for_workers`. Not specifying `n_workers` will no longer be supported in future versions.",
FutureWarning,
)
n_workers = 0
elif not isinstance(n_workers, int) or n_workers < 1:
raise ValueError(
f"`n_workers` must be a positive integer. Instead got {n_workers}."
)
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)


def _exponential_backoff(
attempt: int, multiplier: float, exponential_base: float, max_interval: float
Expand Down

0 comments on commit 5e7947c

Please sign in to comment.