Skip to content

Commit

Permalink
[Autoscaler] New output log format (ray-project#12772)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuisawesome authored and cathrinS committed Dec 29, 2020
1 parent 6792446 commit f0f286d
Show file tree
Hide file tree
Showing 11 changed files with 907 additions and 154 deletions.
12 changes: 10 additions & 2 deletions dashboard/modules/reporter/reporter_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import ray._private.services
import ray.utils
from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS,
DEBUG_AUTOSCALING_STATUS_LEGACY,
DEBUG_AUTOSCALING_ERROR)
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
Expand Down Expand Up @@ -113,13 +114,20 @@ async def get_cluster_status(self, req):
"""

aioredis_client = self._dashboard_head.aioredis_client
status = await aioredis_client.hget(DEBUG_AUTOSCALING_STATUS, "value")
legacy_status = await aioredis_client.hget(
DEBUG_AUTOSCALING_STATUS_LEGACY, "value")
formatted_status_string = await aioredis_client.hget(
DEBUG_AUTOSCALING_STATUS, "value")
formatted_status = json.loads(formatted_status_string.decode()
) if formatted_status_string else {}
error = await aioredis_client.hget(DEBUG_AUTOSCALING_ERROR, "value")
return dashboard_utils.rest_response(
success=True,
message="Got cluster status.",
autoscaling_status=status.decode() if status else None,
autoscaling_status=legacy_status.decode()
if legacy_status else None,
autoscaling_error=error.decode() if error else None,
cluster_status=formatted_status if formatted_status else None,
)

async def run(self, server):
Expand Down
9 changes: 7 additions & 2 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ray.test_utils import (format_web_url, wait_for_condition,
wait_until_server_available, run_string_as_driver,
wait_until_succeeded_without_exception)
from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS,
from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS_LEGACY,
DEBUG_AUTOSCALING_ERROR)
import ray.new_dashboard.consts as dashboard_consts
import ray.new_dashboard.utils as dashboard_utils
Expand Down Expand Up @@ -458,11 +458,14 @@ def test_get_cluster_status(ray_start_with_dashboard):
def get_cluster_status():
response = requests.get(f"{webui_url}/api/cluster_status")
response.raise_for_status()
print(response.json())
assert response.json()["result"]
assert "autoscalingStatus" in response.json()["data"]
assert response.json()["data"]["autoscalingStatus"] is None
assert "autoscalingError" in response.json()["data"]
assert response.json()["data"]["autoscalingError"] is None
assert "clusterStatus" in response.json()["data"]
assert "loadMetricsReport" in response.json()["data"]["clusterStatus"]

wait_until_succeeded_without_exception(get_cluster_status,
(requests.RequestException, ))
Expand All @@ -478,7 +481,7 @@ def get_cluster_status():
port=int(address[1]),
password=ray_constants.REDIS_DEFAULT_PASSWORD)

client.hset(DEBUG_AUTOSCALING_STATUS, "value", "hello")
client.hset(DEBUG_AUTOSCALING_STATUS_LEGACY, "value", "hello")
client.hset(DEBUG_AUTOSCALING_ERROR, "value", "world")

response = requests.get(f"{webui_url}/api/cluster_status")
Expand All @@ -488,6 +491,8 @@ def get_cluster_status():
assert response.json()["data"]["autoscalingStatus"] == "hello"
assert "autoscalingError" in response.json()["data"]
assert response.json()["data"]["autoscalingError"] == "world"
assert "clusterStatus" in response.json()["data"]
assert "loadMetricsReport" in response.json()["data"]["clusterStatus"]


def test_immutable_types():
Expand Down
168 changes: 98 additions & 70 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import defaultdict, namedtuple
from collections import defaultdict, namedtuple, Counter
from typing import Any, Optional, Dict, List
from urllib3.exceptions import MaxRetryError
import copy
Expand All @@ -16,17 +16,19 @@
from ray.autoscaler.tags import (
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER,
NODE_KIND_UNMANAGED, NODE_KIND_HEAD)
TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, STATUS_WAITING_FOR_SSH,
STATUS_SYNCING_FILES, STATUS_SETTING_UP, STATUS_UP_TO_DATE,
NODE_KIND_WORKER, NODE_KIND_UNMANAGED, NODE_KIND_HEAD)
from ray.autoscaler._private.legacy_info_string import legacy_log_info_string
from ray.autoscaler._private.providers import _get_node_provider
from ray.autoscaler._private.updater import NodeUpdaterThread
from ray.autoscaler._private.node_launcher import NodeLauncher
from ray.autoscaler._private.resource_demand_scheduler import \
get_bin_pack_residual, ResourceDemandScheduler, NodeType, NodeID, NodeIP, \
ResourceDict
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf, add_prefix, \
DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
DEBUG_AUTOSCALING_ERROR, format_info_string
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \
AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \
Expand All @@ -41,20 +43,23 @@
"UpdateInstructions",
["node_id", "init_commands", "start_ray_commands", "docker_config"])

AutoscalerSummary = namedtuple(
"AutoscalerSummary",
["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"])


class StandardAutoscaler:
"""The autoscaling control loop for a Ray cluster.
There are two ways to start an autoscaling cluster: manually by running
`ray start --head --autoscaling-config=/path/to/config.yaml` on a
instance that has permission to launch other instances, or you can also use
`ray up /path/to/config.yaml` from your laptop, which will
configure the right AWS/Cloud roles automatically.
StandardAutoscaler's `update` method is periodically called by `monitor.py`
to add and remove nodes as necessary. Currently, load-based autoscaling is
not implemented, so all this class does is try to maintain a constant
cluster size.
`ray start --head --autoscaling-config=/path/to/config.yaml` on a instance
that has permission to launch other instances, or you can also use `ray up
/path/to/config.yaml` from your laptop, which will configure the right
AWS/Cloud roles automatically. See the documentation for a full definition
of autoscaling behavior:
https://docs.ray.io/en/master/cluster/autoscaling.html
StandardAutoscaler's `update` method is periodically called in
`monitor.py`'s monitoring loop.
StandardAutoscaler is also used to bootstrap clusters (by adding workers
until the cluster size that can handle the resource demand is met).
Expand Down Expand Up @@ -120,9 +125,6 @@ def __init__(self,
for local_path in self.config["file_mounts"].values():
assert os.path.exists(local_path)

# List of resource bundles the user is requesting of the cluster.
self.resource_demand_vector = []

logger.info("StandardAutoscaler: {}".format(self.config))

def update(self):
Expand Down Expand Up @@ -161,7 +163,6 @@ def _update(self):
self.provider.internal_ip(node_id)
for node_id in self.all_workers()
])
self.log_info_string(nodes)

# Terminate any idle or out of date nodes
last_used = self.load_metrics.last_used_time_by_ip
Expand All @@ -175,7 +176,7 @@ def _update(self):
sorted_node_ids = self._sort_based_on_last_used(nodes, last_used)
# Don't terminate nodes needed by request_resources()
nodes_allowed_to_terminate: Dict[NodeID, bool] = {}
if self.resource_demand_vector:
if self.load_metrics.get_resource_requests():
nodes_allowed_to_terminate = self._get_nodes_allowed_to_terminate(
sorted_node_ids)

Expand All @@ -201,7 +202,6 @@ def _update(self):
if nodes_to_terminate:
self.provider.terminate_nodes(nodes_to_terminate)
nodes = self.workers()
self.log_info_string(nodes)

# Terminate nodes if there are too many
nodes_to_terminate = []
Expand All @@ -216,16 +216,14 @@ def _update(self):
self.provider.terminate_nodes(nodes_to_terminate)
nodes = self.workers()

self.log_info_string(nodes)

to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
self.load_metrics.get_resource_demand_vector(),
self.load_metrics.get_resource_utilization(),
self.load_metrics.get_pending_placement_groups(),
self.load_metrics.get_static_node_resources_by_ip(),
ensure_min_cluster_size=self.resource_demand_vector)
ensure_min_cluster_size=self.load_metrics.get_resource_requests())
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)

Expand Down Expand Up @@ -255,7 +253,6 @@ def _update(self):
self.provider.terminate_nodes(nodes_to_terminate)

nodes = self.workers()
self.log_info_string(nodes)

# Update nodes with out-of-date files.
# TODO(edoakes): Spawning these threads directly seems to cause
Expand All @@ -281,6 +278,9 @@ def _update(self):
for node_id in nodes:
self.recover_if_needed(node_id, now)

logger.info(self.info_string())
legacy_log_info_string(self, nodes)

def _sort_based_on_last_used(self, nodes: List[NodeID],
last_used: Dict[str, float]) -> List[NodeID]:
"""Sort the nodes based on the last time they were used.
Expand Down Expand Up @@ -361,7 +361,7 @@ def _get_nodes_allowed_to_terminate(
used_resource_requests: List[ResourceDict]
_, used_resource_requests = \
get_bin_pack_residual(max_node_resources,
self.resource_demand_vector)
self.load_metrics.get_resource_requests())
# Remove the first entry (the head node).
max_node_resources.pop(0)
# Remove the first entry (the head node).
Expand Down Expand Up @@ -533,15 +533,17 @@ def recover_if_needed(self, node_id, now):
if not self.can_update(node_id):
return
key = self.provider.internal_ip(node_id)
if key not in self.load_metrics.last_heartbeat_time_by_ip:
self.load_metrics.last_heartbeat_time_by_ip[key] = now
last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip[key]
delta = now - last_heartbeat_time
if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S:
return

if key in self.load_metrics.last_heartbeat_time_by_ip:
last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip[
key]
delta = now - last_heartbeat_time
if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S:
return

logger.warning("StandardAutoscaler: "
"{}: No heartbeat in {}s, "
"restarting Ray to recover...".format(node_id, delta))
"{}: No recent heartbeat, "
"restarting Ray to recover...".format(node_id))
updater = NodeUpdaterThread(
node_id=node_id,
provider_config=self.config["provider"],
Expand Down Expand Up @@ -678,47 +680,73 @@ def unmanaged_workers(self):
return self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_UNMANAGED})

def log_info_string(self, nodes):
tmp = "Cluster status: "
tmp += self.info_string(nodes)
tmp += "\n"
tmp += self.load_metrics.info_string()
tmp += "\n"
tmp += self.resource_demand_scheduler.debug_string(
nodes, self.pending_launches.breakdown(),
self.load_metrics.get_resource_utilization())
if _internal_kv_initialized():
_internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True)
if self.prefix_cluster_info:
tmp = add_prefix(tmp, self.config["cluster_name"])
logger.debug(tmp)

def info_string(self, nodes):
suffix = ""
if self.updaters:
suffix += " ({} updating)".format(len(self.updaters))
if self.num_failed_updates:
suffix += " ({} failed to update)".format(
len(self.num_failed_updates))

return "{} nodes{}".format(len(nodes), suffix)

def request_resources(self, resources: List[dict]):
"""Called by monitor to request resources.
Args:
resources: A list of resource bundles.
"""
if resources:
logger.info(
"StandardAutoscaler: resource_requests={}".format(resources))
assert isinstance(resources, list), resources
self.resource_demand_vector = resources

def kill_workers(self):
logger.error("StandardAutoscaler: kill_workers triggered")
nodes = self.workers()
if nodes:
self.provider.terminate_nodes(nodes)
logger.error("StandardAutoscaler: terminated {} node(s)".format(
len(nodes)))

def summary(self):
"""Summarizes the active, pending, and failed node launches.
An active node is a node whose raylet is actively reporting heartbeats.
A pending node is non-active node whose node tag is uninitialized,
waiting for ssh, syncing files, or setting up.
If a node is not pending or active, it is failed.
Returns:
AutoscalerSummary: The summary.
"""
all_node_ids = self.provider.non_terminated_nodes(tag_filters={})

active_nodes = Counter()
pending_nodes = []
failed_nodes = []

for node_id in all_node_ids:
ip = self.provider.internal_ip(node_id)
node_tags = self.provider.node_tags(node_id)
if node_tags[TAG_RAY_NODE_KIND] == NODE_KIND_UNMANAGED:
continue
node_type = node_tags[TAG_RAY_USER_NODE_TYPE]

# TODO (Alex): If a node's raylet has died, it shouldn't be marked
# as active.
is_active = self.load_metrics.is_active(ip)
if is_active:
active_nodes[node_type] += 1
else:
status = node_tags[TAG_RAY_NODE_STATUS]
pending_states = [
STATUS_UNINITIALIZED, STATUS_WAITING_FOR_SSH,
STATUS_SYNCING_FILES, STATUS_SETTING_UP
]
is_pending = status in pending_states
if is_pending:
pending_nodes.append((ip, node_type))
else:
# TODO (Alex): Failed nodes are now immediately killed, so
# this list will almost always be empty. We should ideally
# keep a cache of recently failed nodes and their startup
# logs.
failed_nodes.append((ip, node_type))

# The concurrent counter leaves some 0 counts in, so we need to
# manually filter those out.
pending_launches = {}
for node_type, count in self.pending_launches.breakdown().items():
if count:
pending_launches[node_type] = count

return AutoscalerSummary(
active_nodes=active_nodes,
pending_nodes=pending_nodes,
pending_launches=pending_launches,
failed_nodes=failed_nodes)

def info_string(self):
lm_summary = self.load_metrics.summary()
autoscaler_summary = self.summary()
return "\n" + format_info_string(lm_summary, autoscaler_summary)
12 changes: 12 additions & 0 deletions python/ray/autoscaler/_private/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
from ray.util.debug import log_once

import ray.autoscaler._private.subprocess_output_util as cmd_output_util
from ray.autoscaler._private.load_metrics import LoadMetricsSummary
from ray.autoscaler._private.autoscaler import AutoscalerSummary
from ray.autoscaler._private.util import format_info_string, \
format_info_string_no_node_types

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -94,6 +98,14 @@ def debug_status() -> str:
status = "No cluster status."
else:
status = status.decode("utf-8")
as_dict = json.loads(status)
lm_summary = LoadMetricsSummary(**as_dict["load_metrics_report"])
if "autoscaler_report" in as_dict:
autoscaler_summary = AutoscalerSummary(
**as_dict["autoscaler_report"])
status = format_info_string(lm_summary, autoscaler_summary)
else:
status = format_info_string_no_node_types(lm_summary)
if error:
status += "\n"
status += error.decode("utf-8")
Expand Down
Loading

0 comments on commit f0f286d

Please sign in to comment.