Skip to content

Commit

Permalink
[scheduler][autoscaler] Report placement resources for actor creation…
Browse files Browse the repository at this point in the history
… tasks (ray-project#26813)

This change makes us report placement resources for actor creation tasks. Essentially, the resource model here is that a placement resource/actor creation task is a task that runs very quickly.

Closes ray-project#26806

Co-authored-by: Alex <alex@anyscale.com>
Signed-off-by: Scott Graham <scgraham@microsoft.com>
  • Loading branch information
2 people authored and Scott Graham committed Aug 15, 2022
1 parent 0ab9202 commit 4331617
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 4 deletions.
31 changes: 31 additions & 0 deletions python/ray/tests/test_autoscaler_fake_multinode.py
Expand Up @@ -55,6 +55,37 @@ def g():
# __example_end__


def test_zero_cpu_default_actor():
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"cpu_node": {
"resources": {
"CPU": 1,
},
"node_config": {},
"min_workers": 0,
"max_workers": 1,
},
},
)

try:
cluster.start()
ray.init("auto")

@ray.remote
class Actor:
def ping(self):
pass

actor = Actor.remote()
ray.get(actor.ping.remote())
ray.shutdown()
finally:
cluster.shutdown()


if __name__ == "__main__":
import os
import sys
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_failure_2.py
Expand Up @@ -31,7 +31,7 @@ def test_warning_for_too_many_actors(shutdown_only):

p = init_error_pubsub()

@ray.remote
@ray.remote(num_cpus=0)
class Foo:
def __init__(self):
time.sleep(1000)
Expand Down
48 changes: 48 additions & 0 deletions python/ray/tests/test_global_state.py
Expand Up @@ -378,6 +378,54 @@ def backlog_size_set():
global_state_accessor.disconnect()


def test_default_load_reports(shutdown_only):
"""Despite the fact that default actors release their cpu after being
placed, they should still require 1 CPU for laod reporting purposes.
https://github.com/ray-project/ray/issues/26806
"""
cluster = ray.init(
num_cpus=0,
)

global_state_accessor = make_global_state_accessor(cluster)

@ray.remote
def foo():
return None

@ray.remote
class Foo:
pass

def actor_and_task_queued_together():
message = global_state_accessor.get_all_resource_usage()
if message is None:
return False

resource_usage = gcs_utils.ResourceUsageBatchData.FromString(message)
aggregate_resource_load = resource_usage.resource_load_by_shape.resource_demands
print(f"Num shapes {len(aggregate_resource_load)}")
if len(aggregate_resource_load) == 1:
num_infeasible = aggregate_resource_load[0].num_infeasible_requests_queued
print(f"num in shape {num_infeasible}")
# Ideally we'd want to assert backlog_size == 8, but guaranteeing
# the order the order that submissions will occur is too
# hard/flaky.
return num_infeasible == 2
return False

# Assign to variables to keep the ref counter happy.
handle = Foo.remote()
ref = foo.remote()

wait_for_condition(actor_and_task_queued_together, timeout=2)
global_state_accessor.disconnect()

# Do something with the variables so lint is happy.
del handle
del ref


def test_heartbeat_ip(shutdown_only):
cluster = ray.init(num_cpus=1)
global_state_accessor = make_global_state_accessor(cluster)
Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/ray_config_def.h
Expand Up @@ -92,6 +92,13 @@ RAY_CONFIG(uint64_t, raylet_get_agent_info_interval_ms, 1)
/// handler is drifting.
RAY_CONFIG(uint64_t, num_resource_report_periods_warning, 5)

/// Whether to report placement or regular resource usage for an actor.
/// Reporting placement may cause the autoscaler to overestimate the resources
/// required of the cluster, but reporting regular resource may lead to no
/// autoscaling when an actor can't be placed.
/// https://github.com/ray-project/ray/issues/26806
RAY_CONFIG(bool, report_actor_placement_resources, true)

/// Whether to record the creation sites of object references. This adds more
/// information to `ray memory`, but introduces a little extra overhead when
/// creating object references (e.g. 5~10 microsec per call in Python).
Expand Down
8 changes: 7 additions & 1 deletion src/ray/common/task/task_spec.cc
Expand Up @@ -113,7 +113,13 @@ void TaskSpecification::ComputeResources() {
if (!IsActorTask()) {
// There is no need to compute `SchedulingClass` for actor tasks since
// the actor tasks need not be scheduled.
const auto &resource_set = GetRequiredResources();
const bool is_actor_creation_task = IsActorCreationTask();
const bool should_report_placement_resources =
RayConfig::instance().report_actor_placement_resources();
const auto &resource_set =
(is_actor_creation_task && should_report_placement_resources)
? GetRequiredPlacementResources()
: GetRequiredResources();
const auto &function_descriptor = FunctionDescriptor();
auto depth = GetDepth();
auto sched_cls_desc = SchedulingClassDescriptor(
Expand Down
25 changes: 24 additions & 1 deletion src/ray/common/test/task_spec_test.cc
Expand Up @@ -100,6 +100,29 @@ TEST(TaskSpecTest, TestSchedulingClassDescriptor) {
TaskSpecification::GetSchedulingClass(descriptor9));
}

TEST(TaskSpecTest, TestActorSchedulingClass) {
// This test ensures that an actor's lease request's scheduling class is
// determined by the placement resources, not the regular resources.

const std::unordered_map<std::string, double> one_cpu = {{"CPU", 1}};

rpc::TaskSpec actor_task_spec_proto;
actor_task_spec_proto.set_type(TaskType::ACTOR_CREATION_TASK);
actor_task_spec_proto.mutable_required_placement_resources()->insert(one_cpu.begin(),
one_cpu.end());

TaskSpecification actor_task(actor_task_spec_proto);

rpc::TaskSpec regular_task_spec_proto;
regular_task_spec_proto.set_type(TaskType::NORMAL_TASK);
regular_task_spec_proto.mutable_required_resources()->insert(one_cpu.begin(),
one_cpu.end());

TaskSpecification regular_task(regular_task_spec_proto);

ASSERT_EQ(regular_task.GetSchedulingClass(), actor_task.GetSchedulingClass());
}

TEST(TaskSpecTest, TestTaskSpecification) {
rpc::SchedulingStrategy scheduling_strategy;
NodeID node_id = NodeID::FromRandom();
Expand All @@ -118,4 +141,4 @@ TEST(TaskSpecTest, TestTaskSpecification) {
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.h
Expand Up @@ -73,7 +73,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
rpc::GetAllAvailableResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle report resource usage rpc come from raylet.
/// Handle report resource usage rpc from a raylet.
void HandleReportResourceUsage(const rpc::ReportResourceUsageRequest &request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
Expand Down

0 comments on commit 4331617

Please sign in to comment.