Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 49 additions & 30 deletions src/dstack/_internal/cli/utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
from rich.table import Table

from dstack._internal.cli.utils.common import NO_OFFERS_WARNING, add_row_from_dict, console
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.configurations import DevEnvironmentConfiguration
from dstack._internal.core.models.instances import InstanceAvailability, Resources
from dstack._internal.core.models.instances import (
InstanceAvailability,
InstanceOfferWithAvailability,
InstanceType,
)
from dstack._internal.core.models.profiles import (
DEFAULT_RUN_TERMINATION_IDLE_TIME,
TerminationPolicy,
)
from dstack._internal.core.models.runs import (
Job,
JobProvisioningData,
JobRuntimeData,
JobStatus,
JobSubmission,
Probe,
Expand Down Expand Up @@ -294,27 +297,24 @@ def _format_price(price: float, is_spot: bool) -> str:
return price_str


def _format_backend(backend: Any, region: str) -> str:
backend_str = getattr(backend, "value", backend)
backend_str = backend_str.replace("remote", "ssh")
def _format_backend(backend_type: BackendType, region: str) -> str:
backend_str = backend_type.value
if backend_type == BackendType.REMOTE:
backend_str = "ssh"
return f"{backend_str} ({region})"


def _format_instance_type(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> str:
instance_type = jpd.instance_type.name
if jrd is not None and getattr(jrd, "offer", None) is not None:
if jrd.offer.total_blocks > 1:
instance_type += f" ({jrd.offer.blocks}/{jrd.offer.total_blocks})"
if jpd.reservation:
instance_type += f" ({jpd.reservation})"
return instance_type


def _get_resources(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> Resources:
resources: Resources = jpd.instance_type.resources
if jrd is not None and getattr(jrd, "offer", None) is not None:
resources = jrd.offer.instance.resources
return resources
def _format_instance_type(
instance_type: InstanceType,
shared_offer: Optional[InstanceOfferWithAvailability],
reservation: Optional[str],
) -> str:
instance_type_str = instance_type.name
if shared_offer is not None:
instance_type_str += f" ({shared_offer.blocks}/{shared_offer.total_blocks})"
if reservation is not None:
instance_type_str += f" ({reservation})"
return instance_type_str


def _format_run_name(run: CoreRun, show_deployment_num: bool) -> str:
Expand Down Expand Up @@ -387,16 +387,35 @@ def get_runs_table(
}
jpd = latest_job_submission.job_provisioning_data
if jpd is not None:
shared_offer: Optional[InstanceOfferWithAvailability] = None
instance_type = jpd.instance_type
price = jpd.price
jrd = latest_job_submission.job_runtime_data
resources = _get_resources(jpd, jrd)
update_dict: Dict[Union[str, int], Any] = {
"BACKEND": _format_backend(jpd.backend, jpd.region),
"RESOURCES": resources.pretty_format(include_spot=False),
"GPU": resources.pretty_format(gpu_only=True, include_spot=False),
"INSTANCE TYPE": _format_instance_type(jpd, jrd),
"PRICE": _format_price(jpd.price, resources.spot),
}
job_row.update(update_dict)
if jrd is not None and jrd.offer is not None and jrd.offer.total_blocks > 1:
# We only use offer data from jrd if the job is/was running on a shared
# instance (the instance blocks feature). In that case, jpd contains the full
# instance offer data, while jrd contains the shared offer (a fraction of
# the full offer). Although jrd always contains the offer, we don't use it in
# other cases, as, unlike jpd offer data, jrd offer is not updated after
# Compute.update_provisioning_data() call, but some backends, namely
# Kubernetes, may update offer data via that method.
# As long as we don't have a backend which both supports the blocks feature
# and may update offer data in update_provisioning_data(), this logic is fine.
shared_offer = jrd.offer
instance_type = shared_offer.instance
price = shared_offer.price
resources = instance_type.resources
job_row.update(
{
"BACKEND": _format_backend(jpd.backend, jpd.region),
"RESOURCES": resources.pretty_format(include_spot=False),
"GPU": resources.pretty_format(gpu_only=True, include_spot=False),
"INSTANCE TYPE": _format_instance_type(
instance_type, shared_offer, jpd.reservation
),
"PRICE": _format_price(price, resources.spot),
}
)
if merge_job_rows:
_status = job_row["STATUS"]
_resources = job_row["RESOURCES"]
Expand Down
186 changes: 117 additions & 69 deletions src/dstack/_internal/core/backends/kubernetes/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
Resources,
SSHConnectionParams,
)
from dstack._internal.core.models.resources import CPUSpec, Memory
from dstack._internal.core.models.resources import CPUSpec, GPUSpec, Memory
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
from dstack._internal.core.models.volumes import Volume
from dstack._internal.utils.common import parse_memory
Expand Down Expand Up @@ -123,38 +123,10 @@ def get_offers_by_requirements(
)
nodes = get_value(node_list, ".items", list[client.V1Node], required=True)
for node in nodes:
try:
name = get_value(node, ".metadata.name", str, required=True)
cpu_arch = normalize_arch(
get_value(node, ".status.node_info.architecture", str)
).to_cpu_architecture()
allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True)
cpus = _parse_cpu(allocatable["cpu"])
memory_mib = _parse_memory(allocatable["memory"])
disk_size_mib = _parse_memory(allocatable["ephemeral-storage"])
gpus = _get_node_gpus(node)
except (AttributeError, KeyError, ValueError) as e:
logger.exception("Failed to process node: %s: %s", type(e).__name__, e)
continue
instance_offer = InstanceOfferWithAvailability(
backend=BackendType.KUBERNETES,
instance=InstanceType(
name=name,
resources=Resources(
cpus=cpus,
cpu_arch=cpu_arch,
memory_mib=memory_mib,
gpus=gpus,
spot=False,
disk=Disk(size_mib=disk_size_mib),
),
),
price=0,
region=DUMMY_REGION,
availability=InstanceAvailability.AVAILABLE,
instance_runtime=InstanceRuntime.RUNNER,
)
instance_offers.extend(filter_offers_by_requirements([instance_offer], requirements))
if (instance_offer := _get_instance_offer_from_node(node)) is not None:
instance_offers.extend(
filter_offers_by_requirements([instance_offer], requirements)
)
return instance_offers

def run_job(
Expand Down Expand Up @@ -216,18 +188,17 @@ def run_job(
assert isinstance(resources_spec.cpu, CPUSpec)
if (cpu_min := resources_spec.cpu.count.min) is not None:
resources_requests["cpu"] = str(cpu_min)
if (cpu_max := resources_spec.cpu.count.max) is not None:
resources_limits["cpu"] = str(cpu_max)
if (gpu_spec := resources_spec.gpu) is not None:
gpu_min = gpu_spec.count.min
if gpu_min is not None and gpu_min > 0:
if not (offer_gpus := instance_offer.instance.resources.gpus):
raise ComputeError(
"GPU is requested but the offer has no GPUs:"
f" {gpu_spec=} {instance_offer=}",
)
gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu(
self.api, offer_gpus[0]
self.api, gpu_spec
)
logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_min)
# Limit must be set (GPU resources cannot be overcommitted)
# and must be equal to request.
resources_requests[gpu_resource] = resources_limits[gpu_resource] = str(gpu_min)
# It should be NoSchedule, but we also add NoExecute toleration just in case.
for effect in [TaintEffect.NO_SCHEDULE, TaintEffect.NO_EXECUTE]:
Expand All @@ -238,11 +209,13 @@ def run_job(
)
if (memory_min := resources_spec.memory.min) is not None:
resources_requests["memory"] = _render_memory(memory_min)
if (
resources_spec.disk is not None
and (disk_min := resources_spec.disk.size.min) is not None
):
resources_requests["ephemeral-storage"] = _render_memory(disk_min)
if (memory_max := resources_spec.memory.max) is not None:
resources_limits["memory"] = _render_memory(memory_max)
if (disk_spec := resources_spec.disk) is not None:
if (disk_min := disk_spec.size.min) is not None:
resources_requests["ephemeral-storage"] = _render_memory(disk_min)
if (disk_max := disk_spec.size.max) is not None:
resources_limits["ephemeral-storage"] = _render_memory(disk_max)
if (shm_size := resources_spec.shm_size) is not None:
shm_volume_name = "dev-shm"
volumes_.append(
Expand Down Expand Up @@ -328,8 +301,9 @@ def run_job(
instance_type=instance_offer.instance,
instance_id=instance_name,
# Although we can already get Service's ClusterIP from the `V1Service` object returned
# by the `create_namespaced_service` method, we still need PodIP for multinode runs.
# We'll update both hostname and internal_ip once the pod is assigned to the node.
# by the `create_namespaced_service` method, we still need 1) updated instance offer
# 2) PodIP for multinode runs.
# We'll update all these fields once the pod is assigned to the node.
hostname=None,
internal_ip=None,
region=instance_offer.region,
Expand Down Expand Up @@ -368,6 +342,15 @@ def update_provisioning_data(
namespace=self.config.namespace,
)
provisioning_data.hostname = get_value(service, ".spec.cluster_ip", str, required=True)
node = call_api_method(
self.api.read_node,
client.V1Node,
name=get_value(pod, ".spec.node_name", str, required=True),
)
if (instance_offer := _get_instance_offer_from_node(node)) is not None:
provisioning_data.instance_type = instance_offer.instance
provisioning_data.region = instance_offer.region
provisioning_data.price = instance_offer.price

def terminate_instance(
self, instance_id: str, region: str, backend_data: Optional[str] = None
Expand Down Expand Up @@ -500,6 +483,40 @@ def terminate_gateway(
)


def _get_instance_offer_from_node(node: client.V1Node) -> Optional[InstanceOfferWithAvailability]:
try:
name = get_value(node, ".metadata.name", str, required=True)
cpu_arch = normalize_arch(
get_value(node, ".status.node_info.architecture", str)
).to_cpu_architecture()
allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True)
cpus = _parse_cpu(allocatable["cpu"])
memory_mib = _parse_memory(allocatable["memory"])
disk_size_mib = _parse_memory(allocatable["ephemeral-storage"])
gpus = _get_node_gpus(node)
except (AttributeError, KeyError, ValueError) as e:
logger.exception("Failed to process node: %s: %s", type(e).__name__, e)
return None
return InstanceOfferWithAvailability(
backend=BackendType.KUBERNETES,
instance=InstanceType(
name=name,
resources=Resources(
cpus=cpus,
cpu_arch=cpu_arch,
memory_mib=memory_mib,
gpus=gpus,
spot=False,
disk=Disk(size_mib=disk_size_mib),
),
),
price=0,
region=DUMMY_REGION,
availability=InstanceAvailability.AVAILABLE,
instance_runtime=InstanceRuntime.RUNNER,
)


def _parse_cpu(cpu: str) -> int:
if cpu.endswith("m"):
# "m" means millicpu (1/1000 CPU), e.g., 7900m -> 7.9 -> 7
Expand Down Expand Up @@ -590,36 +607,39 @@ def _get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]:


def _get_pod_spec_parameters_for_gpu(
api: client.CoreV1Api, gpu: Gpu
api: client.CoreV1Api, gpu_spec: GPUSpec
) -> tuple[str, client.V1NodeAffinity, str]:
gpu_vendor = gpu.vendor
assert gpu_vendor is not None
if gpu_vendor == AcceleratorVendor.NVIDIA:
node_affinity = _get_nvidia_gpu_node_affinity(api, gpu)
node_list = call_api_method(api.list_node, client.V1NodeList)
nodes = get_value(node_list, ".items", list[client.V1Node], required=True)
gpu_vendor = gpu_spec.vendor
# If no vendor specified, we assume it's NVIDIA. Technically, it's possible to request either
# NVIDIA or AMD in the run configuration using only GPU names (e.g.,`gpu: H100,MI300X:8`),
# but we ignore such configurations as it's hard to translate them to K8s request.
if gpu_vendor is None or gpu_vendor == AcceleratorVendor.NVIDIA:
node_affinity = _get_nvidia_gpu_node_affinity(gpu_spec, nodes)
return NVIDIA_GPU_RESOURCE, node_affinity, NVIDIA_GPU_NODE_TAINT
if gpu_vendor == AcceleratorVendor.AMD:
node_affinity = _get_amd_gpu_node_affinity(gpu)
node_affinity = _get_amd_gpu_node_affinity(gpu_spec, nodes)
return AMD_GPU_RESOURCE, node_affinity, AMD_GPU_NODE_TAINT
raise ComputeError(f"Unsupported GPU vendor: {gpu_vendor}")


def _get_nvidia_gpu_node_affinity(api: client.CoreV1Api, gpu: Gpu) -> client.V1NodeAffinity:
def _get_nvidia_gpu_node_affinity(
gpu_spec: GPUSpec, nodes: list[client.V1Node]
) -> client.V1NodeAffinity:
matching_gpu_label_values: set[str] = set()
# We cannot generate an expected GPU label value from the Gpu model instance
# as the actual values may have additional components (socket, memory type, etc.)
# that we don't preserve in the Gpu model, e.g., "NVIDIA-H100-80GB-HBM3".
# Moreover, a single Gpu may match multiple label values.
# As a workaround, we iterate and process all node labels once again (we already
# processed them in `get_offers_by_requirements()`).
node_list = call_api_method(api.list_node, client.V1NodeList)
nodes = get_value(node_list, ".items", list[client.V1Node], required=True)
for node in nodes:
labels = get_value(node, ".metadata.labels", dict[str, str]) or {}
if _get_nvidia_gpu_from_node_labels(labels) == gpu:
gpu = _get_nvidia_gpu_from_node_labels(labels)
if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec):
matching_gpu_label_values.add(labels[NVIDIA_GPU_PRODUCT_LABEL])
if not matching_gpu_label_values:
raise ComputeError(f"NVIDIA GPU is requested but no matching GPU labels found: {gpu=}")
logger.debug("Selecting nodes by labels %s for NVIDIA %s", matching_gpu_label_values, gpu.name)
raise ComputeError(
f"NVIDIA GPU is requested but no matching GPU labels found: {gpu_spec=}"
)
logger.debug(
"Selecting nodes by labels %s for NVIDIA %s", matching_gpu_label_values, gpu_spec.name
)
return client.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
node_selector_terms=[
Expand All @@ -637,10 +657,15 @@ def _get_nvidia_gpu_node_affinity(api: client.CoreV1Api, gpu: Gpu) -> client.V1N
)


def _get_amd_gpu_node_affinity(gpu: Gpu) -> client.V1NodeAffinity:
device_ids = AMD_GPU_NAME_TO_DEVICE_IDS.get(gpu.name)
if device_ids is None:
raise ComputeError(f"AMD GPU is requested but no matching device ids found: {gpu=}")
def _get_amd_gpu_node_affinity(
gpu_spec: GPUSpec, nodes: list[client.V1Node]
) -> client.V1NodeAffinity:
matching_device_ids: set[int] = set()
for node in nodes:
labels = get_value(node, ".metadata.labels", dict[str, str]) or {}
gpu = _get_amd_gpu_from_node_labels(labels)
if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec):
matching_device_ids.update(AMD_GPU_NAME_TO_DEVICE_IDS[gpu.name])
return client.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
node_selector_terms=[
Expand All @@ -652,12 +677,35 @@ def _get_amd_gpu_node_affinity(gpu: Gpu) -> client.V1NodeAffinity:
),
],
)
for device_id in device_ids
for device_id in matching_device_ids
],
),
)


def _gpu_matches_gpu_spec(gpu: Gpu, gpu_spec: GPUSpec) -> bool:
if gpu_spec.vendor is not None and gpu.vendor != gpu_spec.vendor:
return False
if gpu_spec.name is not None and gpu.name.lower() not in map(str.lower, gpu_spec.name):
return False
if gpu_spec.memory is not None:
min_memory_gib = gpu_spec.memory.min
if min_memory_gib is not None and gpu.memory_mib < min_memory_gib * 1024:
return False
max_memory_gib = gpu_spec.memory.max
if max_memory_gib is not None and gpu.memory_mib > max_memory_gib * 1024:
return False
if gpu_spec.compute_capability is not None:
if gpu.vendor != AcceleratorVendor.NVIDIA:
return False
gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO.get(gpu.name)
if gpu_info is None:
return False
if gpu_info.compute_capability < gpu_spec.compute_capability:
return False
return True


def _continue_setup_jump_pod(
api: client.CoreV1Api,
namespace: str,
Expand Down