Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/docs/concepts/fleets.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ Once the status of instances changes to `idle`, they can be used by dev environm

### Configuration options

#### Nodes { #nodes }

The `nodes` property controls how many instances to provision and maintain in the fleet:

<div editor-title=".dstack.yml">

```yaml
type: fleet

name: my-fleet

nodes:
min: 1 # Always maintain at least 1 instance
target: 2 # Provision 2 instances initially
max: 3 # Do not allow more than 3 instances
```

</div>

`dstack` ensures the fleet always has at least `nodes.min` instances, creating new instances in the background if necessary. If you don't need to keep instances in the fleet forever, you can set `nodes.min` to `0`. By default, `dstack apply` also provisions `nodes.min` instances. The `nodes.target` property allows provisioning more instances initially than needs to be maintained.

#### Placement { #cloud-placement }

To ensure instances are interconnected (e.g., for
Expand Down
5 changes: 5 additions & 0 deletions src/dstack/_internal/core/compatibility/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def get_fleet_spec_excludes(fleet_spec: FleetSpec) -> Optional[IncludeExcludeDic
profile_excludes.add("stop_criteria")
if profile.schedule is None:
profile_excludes.add("schedule")
if (
fleet_spec.configuration.nodes
and fleet_spec.configuration.nodes.min == fleet_spec.configuration.nodes.target
):
configuration_excludes["nodes"] = {"target"}
if configuration_excludes:
spec_excludes["configuration"] = configuration_excludes
if profile_excludes:
Expand Down
68 changes: 66 additions & 2 deletions src/dstack/_internal/core/models/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
TerminationPolicy,
parse_idle_duration,
)
from dstack._internal.core.models.resources import Range, ResourcesSpec
from dstack._internal.core.models.resources import ResourcesSpec
from dstack._internal.utils.common import list_enum_values_for_annotation
from dstack._internal.utils.json_schema import add_extra_schema_types
from dstack._internal.utils.tags import tags_validator
Expand Down Expand Up @@ -141,6 +141,58 @@ def validate_network(cls, value):
return value


class FleetNodesSpec(CoreModel):
min: Annotated[
int, Field(description=("The minimum number of instances to maintain in the fleet"))
]
target: Annotated[
int,
Field(
description=(
"The number of instances to provision on fleet apply. `min` <= `target` <= `max`"
" Defaults to `min`"
)
),
]
max: Annotated[
Optional[int],
Field(
description=(
"The maximum number of instances allowed in the fleet. Unlimited if not specified"
)
),
] = None

@root_validator(pre=True)
def set_min_and_target_defaults(cls, values):
min_ = values.get("min")
target = values.get("target")
if min_ is None:
values["min"] = 0
if target is None:
values["target"] = values["min"]
return values

@validator("min")
def validate_min(cls, v: int) -> int:
if v < 0:
raise ValueError("min cannot be negative")
return v

@root_validator(skip_on_failure=True)
def _post_validate_ranges(cls, values):
min_ = values["min"]
target = values["target"]
max_ = values.get("max")
if target < min_:
raise ValueError("target must not be be less than min")
if max_ is not None and max_ < min_:
raise ValueError("max must not be less than min")
if max_ is not None and max_ < target:
raise ValueError("max must not be less than target")
return values


class InstanceGroupParams(CoreModel):
env: Annotated[
Env,
Expand All @@ -151,7 +203,9 @@ class InstanceGroupParams(CoreModel):
Field(description="The parameters for adding instances via SSH"),
] = None

nodes: Annotated[Optional[Range[int]], Field(description="The number of instances")] = None
nodes: Annotated[
Optional[FleetNodesSpec], Field(description="The number of instances in cloud fleet")
] = None
placement: Annotated[
Optional[InstanceGroupPlacement],
Field(description="The placement of instances: `any` or `cluster`"),
Expand Down Expand Up @@ -248,6 +302,16 @@ def schema_extra(schema: Dict[str, Any], model: Type):
extra_types=[{"type": "string"}],
)

@validator("nodes", pre=True)
def parse_nodes(cls, v: Optional[Union[dict, str]]) -> Optional[dict]:
if isinstance(v, str) and ".." in v:
v = v.replace(" ", "")
min, max = v.split("..")
return dict(min=min or None, max=max or None)
elif isinstance(v, str) or isinstance(v, int):
return dict(min=v, max=v)
return v

_validate_idle_duration = validator("idle_duration", pre=True, allow_reuse=True)(
parse_idle_duration
)
Expand Down
120 changes: 107 additions & 13 deletions src/dstack/_internal/server/background/tasks/process_fleets.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from datetime import timedelta
from typing import List
from uuid import UUID

from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, load_only

from dstack._internal.core.models.fleets import FleetStatus
from dstack._internal.core.models.fleets import FleetSpec, FleetStatus
from dstack._internal.core.models.instances import InstanceStatus
from dstack._internal.server.db import get_db, get_session_ctx
from dstack._internal.server.models import (
FleetModel,
Expand All @@ -15,7 +17,9 @@
RunModel,
)
from dstack._internal.server.services.fleets import (
create_fleet_instance_model,
get_fleet_spec,
get_next_instance_num,
is_fleet_empty,
is_fleet_in_use,
)
Expand Down Expand Up @@ -65,31 +69,111 @@ async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel])
res = await session.execute(
select(FleetModel)
.where(FleetModel.id.in_(fleet_ids))
.options(joinedload(FleetModel.instances).load_only(InstanceModel.deleted))
.options(
joinedload(FleetModel.instances).joinedload(InstanceModel.jobs).load_only(JobModel.id)
joinedload(FleetModel.instances).joinedload(InstanceModel.jobs).load_only(JobModel.id),
joinedload(FleetModel.project),
)
.options(joinedload(FleetModel.runs).load_only(RunModel.status))
.execution_options(populate_existing=True)
)
fleet_models = list(res.unique().scalars().all())

# TODO: Drop fleets auto-deletion after dropping fleets auto-creation.
deleted_fleets_ids = []
now = get_current_datetime()
for fleet_model in fleet_models:
_consolidate_fleet_state_with_spec(session, fleet_model)
deleted = _autodelete_fleet(fleet_model)
if deleted:
deleted_fleets_ids.append(fleet_model.id)
fleet_model.last_processed_at = now
fleet_model.last_processed_at = get_current_datetime()
await _update_deleted_fleets_placement_groups(session, deleted_fleets_ids)
await session.commit()

await session.execute(
update(PlacementGroupModel)
.where(
PlacementGroupModel.fleet_id.in_(deleted_fleets_ids),

def _consolidate_fleet_state_with_spec(session: AsyncSession, fleet_model: FleetModel):
if fleet_model.status == FleetStatus.TERMINATING:
return
fleet_spec = get_fleet_spec(fleet_model)
if fleet_spec.configuration.nodes is None or fleet_spec.autocreated:
# Only explicitly created cloud fleets are consolidated.
return
if not _is_fleet_ready_for_consolidation(fleet_model):
return
added_instances = _maintain_fleet_nodes_min(session, fleet_model, fleet_spec)
if added_instances:
fleet_model.consolidation_attempt += 1
else:
# The fleet is already consolidated or consolidation is in progress.
# We reset consolidation_attempt in both cases for simplicity.
# The second case does not need reset but is ok to do since
# it means consolidation is longer than delay, so it won't happen too often.
# TODO: Reset consolidation_attempt on fleet in-place update.
fleet_model.consolidation_attempt = 0
fleet_model.last_consolidated_at = get_current_datetime()


def _is_fleet_ready_for_consolidation(fleet_model: FleetModel) -> bool:
consolidation_retry_delay = _get_consolidation_retry_delay(fleet_model.consolidation_attempt)
last_consolidated_at = fleet_model.last_consolidated_at or fleet_model.last_processed_at
duration_since_last_consolidation = get_current_datetime() - last_consolidated_at
return duration_since_last_consolidation >= consolidation_retry_delay


# We use exponentially increasing consolidation retry delays so that
# consolidation does not happen too often. In particular, this prevents
# retrying instance provisioning constantly in case of no offers.
# TODO: Adjust delays.
_CONSOLIDATION_RETRY_DELAYS = [
timedelta(seconds=30),
timedelta(minutes=1),
timedelta(minutes=2),
timedelta(minutes=5),
timedelta(minutes=10),
]


def _get_consolidation_retry_delay(consolidation_attempt: int) -> timedelta:
if consolidation_attempt < len(_CONSOLIDATION_RETRY_DELAYS):
return _CONSOLIDATION_RETRY_DELAYS[consolidation_attempt]
return _CONSOLIDATION_RETRY_DELAYS[-1]


def _maintain_fleet_nodes_min(
session: AsyncSession,
fleet_model: FleetModel,
fleet_spec: FleetSpec,
) -> bool:
"""
Ensures the fleet has at least `nodes.min` instances.
Returns `True` if retried or added new instances and `False` otherwise.
"""
assert fleet_spec.configuration.nodes is not None
for instance in fleet_model.instances:
# Delete terminated but not deleted instances since
# they are going to be replaced with new pending instances.
if instance.status == InstanceStatus.TERMINATED and not instance.deleted:
# It's safe to modify instances without instance lock since
# no other task modifies already terminated instances.
instance.deleted = True
instance.deleted_at = get_current_datetime()
active_instances = [i for i in fleet_model.instances if not i.deleted]
active_instances_num = len(active_instances)
if active_instances_num >= fleet_spec.configuration.nodes.min:
return False
nodes_missing = fleet_spec.configuration.nodes.min - active_instances_num
for i in range(nodes_missing):
instance_model = create_fleet_instance_model(
session=session,
project=fleet_model.project,
# TODO: Store fleet.user and pass it instead of the project owner.
username=fleet_model.project.owner.name,
spec=fleet_spec,
instance_num=get_next_instance_num({i.instance_num for i in active_instances}),
)
.values(fleet_deleted=True)
)
await session.commit()
active_instances.append(instance_model)
fleet_model.instances.append(instance_model)
logger.info("Added %s instances to fleet %s", nodes_missing, fleet_model.name)
return True


def _autodelete_fleet(fleet_model: FleetModel) -> bool:
Expand All @@ -100,7 +184,7 @@ def _autodelete_fleet(fleet_model: FleetModel) -> bool:
if (
fleet_model.status != FleetStatus.TERMINATING
and fleet_spec.configuration.nodes is not None
and (fleet_spec.configuration.nodes.min is None or fleet_spec.configuration.nodes.min == 0)
and fleet_spec.configuration.nodes.min == 0
):
# Empty fleets that allow 0 nodes should not be auto-deleted
return False
Expand All @@ -110,3 +194,13 @@ def _autodelete_fleet(fleet_model: FleetModel) -> bool:
fleet_model.deleted = True
logger.info("Fleet %s deleted", fleet_model.name)
return True


async def _update_deleted_fleets_placement_groups(session: AsyncSession, fleets_ids: list[UUID]):
await session.execute(
update(PlacementGroupModel)
.where(
PlacementGroupModel.fleet_id.in_(fleets_ids),
)
.values(fleet_deleted=True)
)
Loading
Loading