Skip to content

Commit

Permalink
[Core][Label Scheduling 1/n]Add NodeLabelSchedulingStrategy API in py…
Browse files Browse the repository at this point in the history
…thon (ray-project#36418)

Signed-off-by: LarryLian <554538252@qq.com>
Signed-off-by: NripeshN <nn2012@hw.ac.uk>
  • Loading branch information
larrylian authored and NripeshN committed Aug 15, 2023
1 parent ca46557 commit a643466
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 1 deletion.
2 changes: 2 additions & 0 deletions python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
PlacementGroupSchedulingStrategy,
NodeLabelSchedulingStrategy,
)


Expand Down Expand Up @@ -129,6 +130,7 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]:
str,
PlacementGroupSchedulingStrategy,
NodeAffinitySchedulingStrategy,
NodeLabelSchedulingStrategy,
)
),
"_metadata": Option((dict, type(None))),
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ py_test_module_list(
"test_healthcheck.py",
"test_kill_raylet_signal_log.py",
"test_memstat.py",
"test_node_label_scheduling_strategy.py",
"test_protobuf_compatibility.py"
],
size = "medium",
Expand Down
106 changes: 106 additions & 0 deletions python/ray/tests/test_node_label_scheduling_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import os
import sys
import pytest

import ray
from ray.util.scheduling_strategies import (
In,
NotIn,
Exists,
DoesNotExist,
NodeLabelSchedulingStrategy,
)


@ray.remote
class MyActor:
def __init__(self):
pass

def double(self, x):
return 2 * x


@pytest.mark.parametrize(
"call_ray_start",
['ray start --head --labels={"gpu_type":"A100","region":"us"}'],
indirect=True,
)
def test_node_label_scheduling_basic(call_ray_start):
ray.init(address=call_ray_start)
MyActor.options(
scheduling_strategy=NodeLabelSchedulingStrategy(
{"gpu_type": In("A100", "T100"), "region": Exists()}
)
)

MyActor.options(
scheduling_strategy=NodeLabelSchedulingStrategy(
{"gpu_type": NotIn("A100", "T100"), "other_key": DoesNotExist()}
)
)

MyActor.options(
scheduling_strategy=NodeLabelSchedulingStrategy(
hard={"gpu_type": Exists()},
soft={"gpu_type": In("A100")},
)
)


def test_node_label_scheduling_invalid_paramter(call_ray_start):
ray.init(address=call_ray_start)
with pytest.raises(
ValueError, match="Type of value in position 0 for the In operator must be str"
):
MyActor.options(
scheduling_strategy=NodeLabelSchedulingStrategy({"gpu_type": In(123)})
)

with pytest.raises(
ValueError,
match="Type of value in position 0 for the NotIn operator must be str",
):
MyActor.options(
scheduling_strategy=NodeLabelSchedulingStrategy({"gpu_type": NotIn(123)})
)

with pytest.raises(
ValueError,
match="The variadic parameter of the In operator must be a non-empty tuple",
):
MyActor.options(
scheduling_strategy=NodeLabelSchedulingStrategy({"gpu_type": In()})
)

with pytest.raises(
ValueError,
match="The variadic parameter of the NotIn operator must be a non-empty tuple",
):
MyActor.options(
scheduling_strategy=NodeLabelSchedulingStrategy({"gpu_type": NotIn()})
)

with pytest.raises(ValueError, match="The soft parameter must be a map"):
MyActor.options(
scheduling_strategy=NodeLabelSchedulingStrategy(hard=None, soft=["1"])
)

with pytest.raises(
ValueError, match="The map key of the hard parameter must be of type str"
):
MyActor.options(scheduling_strategy=NodeLabelSchedulingStrategy({111: "1111"}))

with pytest.raises(
ValueError, match="must be one of the `In`, `NotIn`, `Exists` or `DoesNotExist`"
):
MyActor.options(
scheduling_strategy=NodeLabelSchedulingStrategy({"gpu_type": "1111"})
)


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
110 changes: 109 additions & 1 deletion python/ray/util/scheduling_strategies.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union, Optional, TYPE_CHECKING
from typing import Dict, Union, Optional, TYPE_CHECKING
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
Expand Down Expand Up @@ -72,9 +72,117 @@ def __init__(
self._fail_on_unavailable = _fail_on_unavailable


def _validate_label_match_operator_values(values, operator):
if not values:
raise ValueError(
f"The variadic parameter of the {operator} operator"
f' must be a non-empty tuple: e.g. {operator}("value1", "value2").'
)

index = 0
for value in values:
if not isinstance(value, str):
raise ValueError(
f"Type of value in position {index} for the {operator} operator "
f'must be str (e.g. {operator}("value1", "value2")) '
f"but got {str(value)} of type {type(value)}."
)
index = index + 1


@PublicAPI(stability="alpha")
class In:
def __init__(self, *values):
_validate_label_match_operator_values(values, "In")
self.values = list(values)


@PublicAPI(stability="alpha")
class NotIn:
def __init__(self, *values):
_validate_label_match_operator_values(values, "NotIn")
self.values = list(values)


@PublicAPI(stability="alpha")
class Exists:
def __init__(self):
pass


@PublicAPI(stability="alpha")
class DoesNotExist:
def __init__(self):
pass


class _LabelMatchExpression:
"""An expression used to select node by node's labels
Attributes:
key: the key of label
operator: In、NotIn、Exists、DoesNotExist
"""

def __init__(self, key: str, operator: Union[In, NotIn, Exists, DoesNotExist]):
self.key = key
self.operator = operator


LabelMatchExpressionsT = Dict[str, Union[In, NotIn, Exists, DoesNotExist]]


@PublicAPI(stability="alpha")
class NodeLabelSchedulingStrategy:
"""Label based node affinity scheduling strategy
scheduling_strategy=NodeLabelSchedulingStrategy({
"region": In("us"),
"gpu_type": Exists()
})
"""

def __init__(
self, hard: LabelMatchExpressionsT, *, soft: LabelMatchExpressionsT = None
):
self.hard = _convert_map_to_expressions(hard, "hard")
self.soft = _convert_map_to_expressions(soft, "soft")


def _convert_map_to_expressions(map_expressions: LabelMatchExpressionsT, param: str):
expressions = []
if map_expressions is None:
return expressions

if not isinstance(map_expressions, Dict):
raise ValueError(
f'The {param} parameter must be a map (e.g. {{"key1": In("value1")}}) '
f"but got type {type(map_expressions)}."
)

for key, value in map_expressions.items():
if not isinstance(key, str):
raise ValueError(
f"The map key of the {param} parameter must "
f'be of type str (e.g. {{"key1": In("value1")}}) '
f"but got {str(key)} of type {type(key)}."
)

if not isinstance(value, (In, NotIn, Exists, DoesNotExist)):
raise ValueError(
f"The map value for key {key} of the {param} parameter "
f"must be one of the `In`, `NotIn`, `Exists` or `DoesNotExist` "
f'operator (e.g. {{"key1": In("value1")}}) '
f"but got {str(value)} of type {type(value)}."
)

expressions.append(_LabelMatchExpression(key, value))
return expressions


SchedulingStrategyT = Union[
None,
str, # Literal["DEFAULT", "SPREAD"]
PlacementGroupSchedulingStrategy,
NodeAffinitySchedulingStrategy,
NodeLabelSchedulingStrategy,
]
38 changes: 38 additions & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,43 @@ enum TaskType {
DRIVER_TASK = 3;
}

message LabelIn {
repeated string values = 1;
}

message LabelNotIn {
repeated string values = 1;
}

message LabelExists {}

message LabelDoesNotExist {}

message LabelOperator {
oneof LabelOperator {
LabelIn label_in = 1;
LabelNotIn label_not_in = 2;
LabelExists label_exists = 3;
LabelDoesNotExist label_does_not_exist = 4;
}
}

message LabelMatchExpression {
string key = 1;
LabelOperator operator = 2;
}

message LabelMatchExpressions {
repeated LabelMatchExpression expressions = 1;
}

message NodeLabelSchedulingStrategy {
// Required expressions to be satisfied
LabelMatchExpressions hard = 1;
// Preferred expressions to be satisfied
LabelMatchExpressions soft = 2;
}

message NodeAffinitySchedulingStrategy {
bytes node_id = 1;
bool soft = 2;
Expand Down Expand Up @@ -80,6 +117,7 @@ message SchedulingStrategy {
// Best effort spread scheduling strategy.
SpreadSchedulingStrategy spread_scheduling_strategy = 3;
NodeAffinitySchedulingStrategy node_affinity_scheduling_strategy = 4;
NodeLabelSchedulingStrategy node_label_scheduling_strategy = 5;
}
}

Expand Down

0 comments on commit a643466

Please sign in to comment.