Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import math
import uuid
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
from typing import List, Optional

from sqlalchemy import and_, func, not_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
Expand All @@ -25,6 +25,7 @@
from dstack._internal.core.models.profiles import (
DEFAULT_RUN_TERMINATION_IDLE_TIME,
CreationPolicy,
Profile,
TerminationPolicy,
)
from dstack._internal.core.models.resources import Memory
Expand All @@ -34,6 +35,7 @@
JobRuntimeData,
JobStatus,
JobTerminationReason,
Requirements,
Run,
RunSpec,
)
Expand Down Expand Up @@ -186,7 +188,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
project = run_model.project
run = run_model_to_run(run_model)
run_spec = run.run_spec
profile = run_spec.merged_profile
run_profile = run_spec.merged_profile
job = find_job(run.jobs, job_model.replica_num, job_model.job_num)
multinode = job.job_spec.jobs_per_replica > 1

Expand Down Expand Up @@ -333,7 +335,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
job_model.status = JobStatus.PROVISIONING
else:
# Assigned no instance, create a new one
if profile.creation_policy == CreationPolicy.REUSE:
if run_profile.creation_policy == CreationPolicy.REUSE:
logger.debug("%s: reuse instance failed", fmt(job_model))
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
Expand Down Expand Up @@ -362,7 +364,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
return

logger.info("%s: now is provisioning a new instance", fmt(job_model))
job_provisioning_data, offer = run_job_result
job_provisioning_data, offer, effective_profile, _ = run_job_result
job_model.job_provisioning_data = job_provisioning_data.json()
job_model.status = JobStatus.PROVISIONING
if fleet_model is None:
Expand All @@ -382,12 +384,11 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
instance = _create_instance_model_for_job(
project=project,
fleet_model=fleet_model,
run_spec=run_spec,
job_model=job_model,
job=job,
job_provisioning_data=job_provisioning_data,
offer=offer,
instance_num=instance_num,
profile=effective_profile,
)
job_model.job_runtime_data = _prepare_job_runtime_data(offer, multinode).json()
# Both this task and process_fleets can add instances to fleets.
Expand Down Expand Up @@ -546,23 +547,22 @@ async def _find_optimal_fleet_with_offers(
fleet_cheapest_pool_offer = fleet_instances_with_pool_offers[0][1].price

candidate_fleet = fleet_model_to_fleet(candidate_fleet_model)
profile = combine_fleet_and_run_profiles(
candidate_fleet.spec.merged_profile, run_spec.merged_profile
)
fleet_requirements = get_fleet_requirements(candidate_fleet.spec)
requirements = combine_fleet_and_run_requirements(
fleet_requirements, job.job_spec.requirements
)
multinode = (
candidate_fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
or job.job_spec.jobs_per_replica > 1
)
profile = None
requirements = None
try:
profile, requirements = _get_run_profile_and_requirements_in_fleet(
job=job,
run_spec=run_spec,
fleet=candidate_fleet,
)
except ValueError:
pass
fleet_backend_offers = []
if (
_check_can_create_new_instance_in_fleet(candidate_fleet)
and profile is not None
and requirements is not None
):
if profile is not None and requirements is not None:
multinode = (
candidate_fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
or job.job_spec.jobs_per_replica > 1
)
fleet_backend_offers = await get_offers_by_requirements(
project=project,
profile=profile,
Expand Down Expand Up @@ -704,29 +704,22 @@ async def _run_job_on_new_instance(
master_job_provisioning_data: Optional[JobProvisioningData] = None,
volumes: Optional[List[List[Volume]]] = None,
fleet_model: Optional[FleetModel] = None,
) -> Optional[Tuple[JobProvisioningData, InstanceOfferWithAvailability]]:
) -> Optional[tuple[JobProvisioningData, InstanceOfferWithAvailability, Profile, Requirements]]:
if volumes is None:
volumes = []
profile = run.run_spec.merged_profile
requirements = job.job_spec.requirements
fleet = None
if fleet_model is not None:
fleet = fleet_model_to_fleet(fleet_model)
if not _check_can_create_new_instance_in_fleet(fleet):
logger.debug(
"%s: cannot fit new instance into fleet %s", fmt(job_model), fleet_model.name
)
return None
profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, profile)
if profile is None:
logger.debug("%s: cannot combine fleet %s profile", fmt(job_model), fleet_model.name)
return None
fleet_requirements = get_fleet_requirements(fleet.spec)
requirements = combine_fleet_and_run_requirements(fleet_requirements, requirements)
if requirements is None:
logger.debug(
"%s: cannot combine fleet %s requirements", fmt(job_model), fleet_model.name
try:
profile, requirements = _get_run_profile_and_requirements_in_fleet(
job=job,
run_spec=run.run_spec,
fleet=fleet,
)
except ValueError as e:
logger.debug("%s: %s", fmt(job_model), e.args[0])
return None
# TODO: Respect fleet provisioning properties such as tags

Expand Down Expand Up @@ -766,7 +759,7 @@ async def _run_job_on_new_instance(
project_ssh_private_key,
offer_volumes,
)
return job_provisioning_data, offer
return job_provisioning_data, offer, profile, requirements
except BackendError as e:
logger.warning(
"%s: %s launch in %s/%s failed: %s",
Expand All @@ -789,6 +782,25 @@ async def _run_job_on_new_instance(
return None


def _get_run_profile_and_requirements_in_fleet(
job: Job,
run_spec: RunSpec,
fleet: Fleet,
) -> tuple[Profile, Requirements]:
if not _check_can_create_new_instance_in_fleet(fleet):
raise ValueError("Cannot fit new instance into fleet")
profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, run_spec.merged_profile)
if profile is None:
raise ValueError("Cannot combine fleet profile")
fleet_requirements = get_fleet_requirements(fleet.spec)
requirements = combine_fleet_and_run_requirements(
fleet_requirements, job.job_spec.requirements
)
if requirements is None:
raise ValueError("Cannot combine fleet requirements")
return profile, requirements


def _check_can_create_new_instance_in_fleet(fleet: Fleet) -> bool:
if fleet.spec.configuration.ssh_config is not None:
return False
Expand Down Expand Up @@ -857,14 +869,12 @@ async def _get_next_instance_num(session: AsyncSession, fleet_model: FleetModel)
def _create_instance_model_for_job(
project: ProjectModel,
fleet_model: FleetModel,
run_spec: RunSpec,
job_model: JobModel,
job: Job,
job_provisioning_data: JobProvisioningData,
offer: InstanceOfferWithAvailability,
instance_num: int,
profile: Profile,
) -> InstanceModel:
profile = run_spec.merged_profile
if not job_provisioning_data.dockerized:
# terminate vastai/k8s instances immediately
termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE
Expand Down
Loading