From 920a05cca053b8cf70e3827067dde09de574d0c6 Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Thu, 15 Feb 2024 09:08:18 +0300 Subject: [PATCH 01/10] Refactor pool tests --- ruff.toml | 2 +- .../_internal/server/services/test_pools.py | 478 +++++++++--------- 2 files changed, 243 insertions(+), 237 deletions(-) diff --git a/ruff.toml b/ruff.toml index 32e7b000d..26348546e 100644 --- a/ruff.toml +++ b/ruff.toml @@ -10,4 +10,4 @@ ignore =[ [lint.isort] known-first-party = ["dstack"] -known-third-party = ["mkdocs_gen_files"] +known-third-party = ["mkdocs_gen_files", "datacrunch"] diff --git a/src/tests/_internal/server/services/test_pools.py b/src/tests/_internal/server/services/test_pools.py index b47ce8e11..6a371a9fa 100644 --- a/src/tests/_internal/server/services/test_pools.py +++ b/src/tests/_internal/server/services/test_pools.py @@ -27,245 +27,251 @@ from dstack._internal.server.testing.common import create_project, create_user -@pytest.mark.asyncio -async def test_pool(session: AsyncSession, test_db): - user = await create_user(session=session) - project = await create_project(session=session, owner=user) - pool = await services_pools.create_pool_model( - session=session, project=project, name="test_pool" - ) - im = InstanceModel( - name="test_instnce", - project=project, - pool=pool, - status=InstanceStatus.PENDING, - job_provisioning_data="", - offer="", - region="", - price=1, - backend=BackendType.LOCAL, - ) - session.add(im) - await session.commit() - await session.refresh(pool) - - core_model_pool = services_pools.pool_model_to_pool(pool) - assert core_model_pool == Pool( - name="test_pool", - default=True, - created_at=pool.created_at.replace(tzinfo=dt.timezone.utc), # ??? - total_instances=1, - available_instances=0, - ) - - list_pools = await services_pools.list_project_pool(session=session, project=project) - assert list_pools == [services_pools.pool_model_to_pool(pool)] - - list_pool_models = await services_pools.list_project_pool_models( - session=session, project=project - ) - assert len(list_pool_models) == 1 - - pool_intances = await services_pools.get_pool_instances(session, project, "test_pool") - assert pool_intances == [im] - - -def test_convert_instance(): - expected_instance = Instance( - backend=BackendType.LOCAL, - instance_type=InstanceType( - name="instance", resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]) - ), - name="test_instance", - hostname="hostname_test", - status=InstanceStatus.PENDING, - price=1.0, - ) - - im = InstanceModel( - id=str(uuid.uuid4()), - created_at=dt.datetime.now(), - name="test_instance", - status=InstanceStatus.PENDING, - project_id=str(uuid.uuid4()), - pool=None, - job_provisioning_data='{"ssh_proxy":null, "backend":"local","hostname":"hostname_test","region":"eu-west","price":1.0,"username":"user1","ssh_port":12345,"dockerized":false,"instance_id":"test_instance","instance_type": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', - offer='{"price":"LOCAL", "price":1.0, "backend":"local", "region":"eu-west-1", "availability":"available","instance": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', - ) - - instance = services_pools.instance_model_to_instance(im) - assert instance == expected_instance - - -@pytest.mark.asyncio -async def test_delete_pool(session: AsyncSession, test_db): - POOL_NAME = "test_pool" - user = await services_users.create_user(session, "test_user", global_role=GlobalRole.ADMIN) - project = await services_projects.create_project(session, user, "test_project") - project_model = await services_projects.get_project_model_by_name_or_error( - session, project.project_name - ) - pool = await services_pools.create_pool_model(session, project_model, POOL_NAME) - - await services_pools.delete_pool(session, project_model, POOL_NAME) - - deleted_pools = await services_pools.list_deleted_pools(session, project_model) - assert len(deleted_pools) == 1 - assert pool.name == deleted_pools[0].name - - -@pytest.mark.asyncio -async def test_show_pool(session: AsyncSession, test_db): - POOL_NAME = "test_pool" - user = await create_user(session=session) - project = await create_project(session=session, owner=user) - pool = await services_pools.create_pool_model(session=session, project=project, name=POOL_NAME) - im = InstanceModel( - name="test_instnce", - project=project, - pool=pool, - status=InstanceStatus.PENDING, - job_provisioning_data='{"ssh_proxy":null, "backend":"local","hostname":"hostname_test","region":"eu-west","price":1.0,"username":"user1","ssh_port":12345,"dockerized":false,"instance_id":"test_instance","instance_type": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', - offer='{"price":"LOCAL", "price":1.0, "backend":"local", "region":"eu-west-1", "availability":"available","instance": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', - region="eu-west", - price=1, - backend=BackendType.LOCAL, - ) - session.add(im) - await session.commit() - - pool_instances = await services_pools.show_pool(session, project, POOL_NAME) - assert len(pool_instances.instances) == 1 - - -@pytest.mark.asyncio -async def test_get_pool_instances(session: AsyncSession, test_db): - POOL_NAME = "test_pool" - user = await create_user(session=session) - project = await create_project(session=session, owner=user) - pool = await services_pools.create_pool_model(session=session, project=project, name=POOL_NAME) - im = InstanceModel( - name="test_instnce", - project=project, - pool=pool, - status=InstanceStatus.PENDING, - job_provisioning_data='{"backend":"local","hostname":"hostname_test","region":"eu-west","price":1.0,"username":"user1","ssh_port":12345,"dockerized":false,"instance_id":"test_instance","instance_type": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', - offer='{"price":"LOCAL", "price":1.0, "backend":"local", "region":"eu-west-1", "availability":"available","instance": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', - region="eu-west", - price=1, - backend=BackendType.LOCAL, - ) - session.add(im) - await session.commit() - - instances = await services_pools.get_pool_instances(session, project, POOL_NAME) - assert len(instances) == 1 - - empty_instances = await services_pools.get_pool_instances(session, project, f"{POOL_NAME}-0") - assert len(empty_instances) == 0 - - -@pytest.mark.asyncio -async def test_generate_instance_name(session: AsyncSession, test_db): - user = await create_user(session=session) - project = await create_project(session=session, owner=user) - pool = await services_pools.create_pool_model( - session=session, project=project, name="test_pool" - ) - im = InstanceModel( - name="test_instnce", - project=project, - pool=pool, - status=InstanceStatus.PENDING, - job_provisioning_data="", - offer="", - backend=BackendType.REMOTE, - region="", - price=0, - ) - session.add(im) - await session.commit() - - name = await services_pools.generate_instance_name( - session=session, project=project, pool_name="test_pool" - ) - car, _, cdr = name.partition("-") - assert len(car) > 0 - assert len(cdr) > 0 - - -@pytest.mark.asyncio -async def test_pool_double_name(session: AsyncSession, test_db): - user = await create_user(session=session) - project = await create_project(session=session, owner=user) - await services_pools.create_pool_model(session=session, project=project, name="test_pool") - with pytest.raises(ValueError): - await services_pools.create_pool_model(session=session, project=project, name="test_pool") +class TestPoolService: + @pytest.mark.asyncio + async def test_pool_smoke(self, session: AsyncSession, test_db): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + pool = await services_pools.create_pool_model( + session=session, project=project, name="test_pool" + ) + im = InstanceModel( + name="test_instnce", + project=project, + pool=pool, + status=InstanceStatus.PENDING, + job_provisioning_data="", + offer="", + region="", + price=1, + backend=BackendType.LOCAL, + ) + session.add(im) + await session.commit() + await session.refresh(pool) + + core_model_pool = services_pools.pool_model_to_pool(pool) + assert core_model_pool == Pool( + name="test_pool", + default=True, + created_at=pool.created_at.replace(tzinfo=dt.timezone.utc), # ??? + total_instances=1, + available_instances=0, + ) + + list_pools = await services_pools.list_project_pool(session=session, project=project) + assert list_pools == [services_pools.pool_model_to_pool(pool)] + + list_pool_models = await services_pools.list_project_pool_models( + session=session, project=project + ) + assert len(list_pool_models) == 1 + + pool_intances = await services_pools.get_pool_instances(session, project, "test_pool") + assert pool_intances == [im] + + @pytest.mark.asyncio + async def test_delete_pool(self, session: AsyncSession, test_db): + POOL_NAME = "test_pool" + user = await services_users.create_user(session, "test_user", global_role=GlobalRole.ADMIN) + project = await services_projects.create_project(session, user, "test_project") + project_model = await services_projects.get_project_model_by_name_or_error( + session, project.project_name + ) + pool = await services_pools.create_pool_model(session, project_model, POOL_NAME) + + await services_pools.delete_pool(session, project_model, POOL_NAME) + + deleted_pools = await services_pools.list_deleted_pools(session, project_model) + assert len(deleted_pools) == 1 + assert pool.name == deleted_pools[0].name + + @pytest.mark.asyncio + async def test_show_pool(self, session: AsyncSession, test_db): + POOL_NAME = "test_pool" + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + pool = await services_pools.create_pool_model( + session=session, project=project, name=POOL_NAME + ) + im = InstanceModel( + name="test_instnce", + project=project, + pool=pool, + status=InstanceStatus.PENDING, + job_provisioning_data='{"ssh_proxy":null, "backend":"local","hostname":"hostname_test","region":"eu-west","price":1.0,"username":"user1","ssh_port":12345,"dockerized":false,"instance_id":"test_instance","instance_type": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', + offer='{"price":"LOCAL", "price":1.0, "backend":"local", "region":"eu-west-1", "availability":"available","instance": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', + region="eu-west", + price=1, + backend=BackendType.LOCAL, + ) + session.add(im) + await session.commit() + + pool_instances = await services_pools.show_pool(session, project, POOL_NAME) + assert len(pool_instances.instances) == 1 + + @pytest.mark.asyncio + async def test_get_pool_instances(self, session: AsyncSession, test_db): + POOL_NAME = "test_pool" + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + pool = await services_pools.create_pool_model( + session=session, project=project, name=POOL_NAME + ) + im = InstanceModel( + name="test_instnce", + project=project, + pool=pool, + status=InstanceStatus.PENDING, + job_provisioning_data='{"backend":"local","hostname":"hostname_test","region":"eu-west","price":1.0,"username":"user1","ssh_port":12345,"dockerized":false,"instance_id":"test_instance","instance_type": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', + offer='{"price":"LOCAL", "price":1.0, "backend":"local", "region":"eu-west-1", "availability":"available","instance": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', + region="eu-west", + price=1, + backend=BackendType.LOCAL, + ) + session.add(im) + await session.commit() + instances = await services_pools.get_pool_instances(session, project, POOL_NAME) + assert len(instances) == 1 -@pytest.mark.asyncio -async def test_create_cloud_instance(session: AsyncSession, test_db): - user = await create_user(session) - project = await create_project(session, user) - - profile = Profile(name="test_profile") - - requirements = Requirements(resources=resources.ResourcesSpec(cpu=1), spot=True) - - offer = InstanceOfferWithAvailability( - backend=BackendType.DATACRUNCH, - instance=InstanceType( - name="instance", resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]) - ), - region="en", - price=0.1, - availability=InstanceAvailability.AVAILABLE, - ) - - launched_instance = LaunchedInstanceInfo( - instance_id="running_instance.id", - ip_address="running_instance.ip", - region="running_instance.location", - ssh_port=22, - username="root", - dockerized=True, - backend_data=None, - ) - - class DummyBackend: - TYPE = BackendType.DATACRUNCH - - def compute(self): - return self - - def create_instance(self, *args, **kwargs): - return launched_instance - - offers = [(DummyBackend(), offer)] - - with patch("dstack._internal.server.services.runs.get_run_plan_by_requirements") as reqs: - reqs.return_value = offers - await runs.create_instance( - session, - project, - user, - profile=profile, - pool_name="test_pool", - instance_name="test_instance", - requirements=requirements, - ssh_key=SSHKey(public=""), + empty_instances = await services_pools.get_pool_instances( + session, project, f"{POOL_NAME}-0" ) + assert len(empty_instances) == 0 - pool = await services_pools.get_pool(session, project, "test_pool") - assert pool is not None - instance = pool.instances[0] - assert instance.name == "test_instance" - assert instance.deleted == False - assert instance.deleted_at is None +class TestPoolUtils: + @pytest.mark.asyncio + async def test_generate_instance_name(self, session: AsyncSession, test_db): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + pool = await services_pools.create_pool_model( + session=session, project=project, name="test_pool" + ) + im = InstanceModel( + name="test_instnce", + project=project, + pool=pool, + status=InstanceStatus.PENDING, + job_provisioning_data="", + offer="", + backend=BackendType.REMOTE, + region="", + price=0, + ) + session.add(im) + await session.commit() + + name = await services_pools.generate_instance_name( + session=session, project=project, pool_name="test_pool" + ) + car, _, cdr = name.partition("-") + assert len(car) > 0 + assert len(cdr) > 0 + + def test_convert_instance(self): + expected_instance = Instance( + backend=BackendType.LOCAL, + instance_type=InstanceType( + name="instance", resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]) + ), + name="test_instance", + hostname="hostname_test", + status=InstanceStatus.PENDING, + price=1.0, + ) + + im = InstanceModel( + id=str(uuid.uuid4()), + created_at=dt.datetime.now(), + name="test_instance", + status=InstanceStatus.PENDING, + project_id=str(uuid.uuid4()), + pool=None, + job_provisioning_data='{"ssh_proxy":null, "backend":"local","hostname":"hostname_test","region":"eu-west","price":1.0,"username":"user1","ssh_port":12345,"dockerized":false,"instance_id":"test_instance","instance_type": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', + offer='{"price":"LOCAL", "price":1.0, "backend":"local", "region":"eu-west-1", "availability":"available","instance": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description":""}}}', + ) + + instance = services_pools.instance_model_to_instance(im) + assert instance == expected_instance - # assert instance.job_provisioning_data == '{"backend": "datacrunch", "instance_type": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description": ""}}, "instance_id": "running_instance.id", "ssh_proxy": null, "hostname": "running_instance.ip", "region": "running_instance.location", "price": 0.1, "username": "root", "ssh_port": 22, "dockerized": true, "backend_data": null}' - assert ( - instance.offer - == '{"backend": "datacrunch", "instance": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description": ""}}, "region": "en", "price": 0.1, "availability": "available"}' - ) + +class TestCreatePool: + @pytest.mark.asyncio + async def test_pool_double_name(self, session: AsyncSession, test_db): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + await services_pools.create_pool_model(session=session, project=project, name="test_pool") + with pytest.raises(ValueError): + await services_pools.create_pool_model( + session=session, project=project, name="test_pool" + ) + + @pytest.mark.asyncio + async def test_create_cloud_instance(self, session: AsyncSession, test_db): + user = await create_user(session) + project = await create_project(session, user) + + profile = Profile(name="test_profile") + + requirements = Requirements(resources=resources.ResourcesSpec(cpu=1), spot=True) + + offer = InstanceOfferWithAvailability( + backend=BackendType.DATACRUNCH, + instance=InstanceType( + name="instance", resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]) + ), + region="en", + price=0.1, + availability=InstanceAvailability.AVAILABLE, + ) + + launched_instance = LaunchedInstanceInfo( + instance_id="running_instance.id", + ip_address="running_instance.ip", + region="running_instance.location", + ssh_port=22, + username="root", + dockerized=True, + backend_data=None, + ) + + class DummyBackend: + TYPE = BackendType.DATACRUNCH + + def compute(self): + return self + + def create_instance(self, *args, **kwargs): + return launched_instance + + offers = [(DummyBackend(), offer)] + + with patch("dstack._internal.server.services.runs.get_run_plan_by_requirements") as reqs: + reqs.return_value = offers + await runs.create_instance( + session, + project, + user, + profile=profile, + pool_name="test_pool", + instance_name="test_instance", + requirements=requirements, + ssh_key=SSHKey(public=""), + ) + + pool = await services_pools.get_pool(session, project, "test_pool") + assert pool is not None + instance = pool.instances[0] + + assert instance.name == "test_instance" + assert instance.deleted == False + assert instance.deleted_at is None + + # assert instance.job_provisioning_data == '{"backend": "datacrunch", "instance_type": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description": ""}}, "instance_id": "running_instance.id", "ssh_proxy": null, "hostname": "running_instance.ip", "region": "running_instance.location", "price": 0.1, "username": "root", "ssh_port": 22, "dockerized": true, "backend_data": null}' + assert ( + instance.offer + == '{"backend": "datacrunch", "instance": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description": ""}}, "region": "en", "price": 0.1, "availability": "available"}' + ) From 496e012d820e2cf94c12578bf18d2b79469d230b Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Thu, 15 Feb 2024 09:37:39 +0300 Subject: [PATCH 02/10] Add columns to `dstack pool show` (#898) * Added region and created properties to the pool api * Add columns to the dstack pool show --- src/dstack/_internal/cli/commands/pool.py | 12 ++++++++++++ src/dstack/_internal/core/models/pools.py | 2 ++ src/dstack/_internal/server/services/pools.py | 2 ++ src/dstack/_internal/server/testing/common.py | 1 + src/tests/_internal/server/services/test_pools.py | 6 +++++- 5 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/cli/commands/pool.py b/src/dstack/_internal/cli/commands/pool.py index b815a8a37..552d7a439 100644 --- a/src/dstack/_internal/cli/commands/pool.py +++ b/src/dstack/_internal/cli/commands/pool.py @@ -1,4 +1,5 @@ import argparse +import datetime from pathlib import Path from typing import Sequence @@ -290,18 +291,29 @@ def print_instance_table(instances: Sequence[Instance]) -> None: table = Table(box=None) table.add_column("INSTANCE NAME") table.add_column("BACKEND") + table.add_column("REGION") table.add_column("INSTANCE TYPE") + table.add_column("SPOT") table.add_column("STATUS") table.add_column("PRICE") + table.add_column("CREATED") for instance in instances: style = "success" if instance.status.is_available() else "warning" + created = ( + pretty_date(instance.created.replace(tzinfo=datetime.timezone.utc)) + if instance.created is not None + else "" + ) row = [ instance.name, instance.backend, + instance.region, instance.instance_type.resources.pretty_format(), + "yes" if instance.instance_type.resources.spot else "no", f"[{style}]{instance.status.value}[/]", f"${instance.price:.4}", + created, ] table.add_row(*row) diff --git a/src/dstack/_internal/core/models/pools.py b/src/dstack/_internal/core/models/pools.py index beb623a00..33a3edb65 100644 --- a/src/dstack/_internal/core/models/pools.py +++ b/src/dstack/_internal/core/models/pools.py @@ -24,6 +24,8 @@ class Instance(BaseModel): job_status: Optional[JobStatus] = None hostname: str status: InstanceStatus + created: Optional[datetime.datetime] + region: str price: float diff --git a/src/dstack/_internal/server/services/pools.py b/src/dstack/_internal/server/services/pools.py index 4260f0636..67183ec1d 100644 --- a/src/dstack/_internal/server/services/pools.py +++ b/src/dstack/_internal/server/services/pools.py @@ -229,6 +229,8 @@ def instance_model_to_instance(instance_model: InstanceModel) -> Instance: instance_type=jpd.instance_type, hostname=jpd.hostname, status=instance_model.status, + region=offer.region, + created=instance_model.created_at, price=offer.price, ) if instance_model.job is not None: diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 642270e58..6be2f7cce 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -320,6 +320,7 @@ async def create_instance( project=project, status=status, created_at=created_at, + started_at=created_at, finished_at=finished_at, job_provisioning_data='{"backend": "datacrunch", "instance_type": {"name": "instance", "resources": {"cpus": 1, "memory_mib": 512, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description": ""}}, "instance_id": "running_instance.id", "ssh_proxy": null, "hostname": "running_instance.ip", "region": "running_instance.location", "price": 0.1, "username": "root", "ssh_port": 22, "dockerized": true, "backend_data": null}', offer='{"backend": "datacrunch", "instance": {"name": "instance", "resources": {"cpus": 2, "memory_mib": 12000, "gpus": [], "spot": false, "disk": {"size_mib": 102400}, "description": ""}}, "region": "en", "price": 0.1, "availability": "available"}', diff --git a/src/tests/_internal/server/services/test_pools.py b/src/tests/_internal/server/services/test_pools.py index 6a371a9fa..a0fc85594 100644 --- a/src/tests/_internal/server/services/test_pools.py +++ b/src/tests/_internal/server/services/test_pools.py @@ -25,6 +25,7 @@ from dstack._internal.core.models.users import GlobalRole from dstack._internal.server.models import InstanceModel from dstack._internal.server.testing.common import create_project, create_user +from dstack._internal.utils.common import get_current_datetime class TestPoolService: @@ -172,6 +173,7 @@ async def test_generate_instance_name(self, session: AsyncSession, test_db): assert len(cdr) > 0 def test_convert_instance(self): + created = get_current_datetime() expected_instance = Instance( backend=BackendType.LOCAL, instance_type=InstanceType( @@ -180,12 +182,14 @@ def test_convert_instance(self): name="test_instance", hostname="hostname_test", status=InstanceStatus.PENDING, + created=created, + region='eu-west-1', price=1.0, ) im = InstanceModel( id=str(uuid.uuid4()), - created_at=dt.datetime.now(), + created_at=created, name="test_instance", status=InstanceStatus.PENDING, project_id=str(uuid.uuid4()), From 856a068e181f584924a9167407f79f8158bf4208 Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Thu, 15 Feb 2024 05:26:51 +0300 Subject: [PATCH 03/10] Remove unused code --- src/dstack/_internal/core/backends/base/offers.py | 2 -- src/dstack/_internal/core/models/instances.py | 9 --------- 2 files changed, 11 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/offers.py b/src/dstack/_internal/core/backends/base/offers.py index 89581e619..1dbb33d79 100644 --- a/src/dstack/_internal/core/backends/base/offers.py +++ b/src/dstack/_internal/core/backends/base/offers.py @@ -29,9 +29,7 @@ def get_catalog_offers( offers = [] catalog = catalog if catalog is not None else gpuhunt.default_catalog() - locs = [] for item in catalog.query(**asdict(q)): - locs.append(item.location) if locations is not None and item.location not in locations: continue offer = catalog_item_to_offer(backend, item, requirements) diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index 6c319d5fb..d9230684c 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -9,15 +9,6 @@ from dstack._internal.utils.common import pretty_resources -class InstanceState(str, Enum): - NOT_FOUND = "not_found" - PROVISIONING = "provisioning" - RUNNING = "running" - STOPPED = "stopped" - STOPPING = "stopping" - TERMINATED = "terminated" - - class Gpu(BaseModel): name: str memory_mib: int From 67b7277faab7bfff68216e9936521444a41142f7 Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Thu, 15 Feb 2024 05:42:38 +0300 Subject: [PATCH 04/10] Rename columns --- src/dstack/_internal/cli/commands/pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/cli/commands/pool.py b/src/dstack/_internal/cli/commands/pool.py index 552d7a439..c0953f9f3 100644 --- a/src/dstack/_internal/cli/commands/pool.py +++ b/src/dstack/_internal/cli/commands/pool.py @@ -289,10 +289,10 @@ def print_pool_table(pools: Sequence[Pool], verbose: bool) -> None: def print_instance_table(instances: Sequence[Instance]) -> None: table = Table(box=None) - table.add_column("INSTANCE NAME") + table.add_column("INSTANCE") table.add_column("BACKEND") table.add_column("REGION") - table.add_column("INSTANCE TYPE") + table.add_column("RESOURCES") table.add_column("SPOT") table.add_column("STATUS") table.add_column("PRICE") From ab681367881300655ae4b2416f08a79490e5a5fb Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Thu, 15 Feb 2024 05:43:11 +0300 Subject: [PATCH 05/10] Added starting timeout --- .../_internal/server/background/tasks/process_pools.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/tasks/process_pools.py b/src/dstack/_internal/server/background/tasks/process_pools.py index edc736632..aa3081175 100644 --- a/src/dstack/_internal/server/background/tasks/process_pools.py +++ b/src/dstack/_internal/server/background/tasks/process_pools.py @@ -94,7 +94,15 @@ async def check_shim(instance_id: UUID) -> None: "instance %s shim is not available, marked as failed", instance.name ) instance.status = InstanceStatus.FAILED - await session.commit() + + STARTING_TIMEOUT = 60 * 3 + expire_starting = ( + instance.created + timedelta(seconds=STARTING_TIMEOUT) < get_current_datetime() + ) + if instance.status == InstanceStatus.STARTING and expire_starting: + instance.status = InstanceStatus.FAILED + + await session.commit() @runner_ssh_tunnel(ports=[client.REMOTE_SHIM_PORT], retries=1) From 6d6dd399eac9c34e4fe18881522d66cbbddcf18b Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Thu, 15 Feb 2024 05:43:27 +0300 Subject: [PATCH 06/10] Fix locks --- .../server/background/tasks/process_pools.py | 4 - .../tasks/process_submitted_jobs.py | 74 +++++++------------ src/dstack/_internal/server/services/pools.py | 21 +++--- 3 files changed, 39 insertions(+), 60 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_pools.py b/src/dstack/_internal/server/background/tasks/process_pools.py index aa3081175..0e5508f48 100644 --- a/src/dstack/_internal/server/background/tasks/process_pools.py +++ b/src/dstack/_internal/server/background/tasks/process_pools.py @@ -124,8 +124,6 @@ async def terminate(instance_id: UUID) -> None: ) ).one() - # TODO: need lock - jpd = parse_raw_as(JobProvisioningData, instance.job_provisioning_data) BACKEND_TYPE = jpd.backend backends = await backends_services.get_project_backends(project=instance.project) @@ -160,8 +158,6 @@ async def terminate_idle_instance() -> None: ) instances = res.scalars().all() - # TODO: need lock - for instance in instances: last_time = instance.created_at.replace(tzinfo=datetime.timezone.utc) if instance.last_job_processed_at is not None: diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 415c4ccb0..93286e49d 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -12,7 +12,7 @@ InstanceOfferWithAvailability, LaunchedInstanceInfo, ) -from dstack._internal.core.models.profiles import DEFAULT_POOL_NAME, CreationPolicy +from dstack._internal.core.models.profiles import CreationPolicy from dstack._internal.core.models.runs import ( InstanceStatus, Job, @@ -23,17 +23,18 @@ RunSpec, ) from dstack._internal.server.db import get_session_ctx -from dstack._internal.server.models import InstanceModel, JobModel, PoolModel, RunModel +from dstack._internal.server.models import InstanceModel, JobModel, RunModel from dstack._internal.server.services import backends as backends_services from dstack._internal.server.services.jobs import ( + PROCESSING_POOL_LOCK, SUBMITTED_PROCESSING_JOBS_IDS, SUBMITTED_PROCESSING_JOBS_LOCK, ) from dstack._internal.server.services.logging import job_log from dstack._internal.server.services.pools import ( filter_pool_instances, + get_or_create_default_pool_by_name, get_pool_instances, - list_project_pool_models, ) from dstack._internal.server.services.runs import run_model_to_run from dstack._internal.server.utils.common import run_async @@ -87,57 +88,38 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): run_model = res.scalar_one() project_model = run_model.project - # check default pool - pool = project_model.default_pool - if pool is None: - # TODO: get_or_create_default_pool... - pools = await list_project_pool_models(session, job_model.project) - for pool_item in pools: - if pool_item.id == job_model.project.default_pool_id: - pool = pool_item - if pool_item.name == DEFAULT_POOL_NAME: - pool = pool_item - if pool is None: - pool = PoolModel( - name=DEFAULT_POOL_NAME, - project=project_model, - ) - session.add(pool) - await session.commit() - await session.refresh(pool) - - if pool.id is not None: - project_model.default_pool_id = pool.id - run_spec = parse_raw_as(RunSpec, run_model.run_spec) profile = run_spec.profile - run_pool = profile.pool_name - if run_pool is None: - run_pool = pool.name - - # pool capacity - pool_instances = await get_pool_instances(session, project_model, run_pool) - relevant_instances = filter_pool_instances( - pool_instances, profile, run_spec.configuration.resources, status=InstanceStatus.READY - ) + # check default pool + pool = project_model.default_pool + if pool is None: + pool = await get_or_create_default_pool_by_name( + session, project_model, pool_name=profile.pool_name + ) + project_model.default_pool = pool - if relevant_instances: - sorted_instances = sorted(relevant_instances, key=lambda instance: instance.name) - instance = sorted_instances[0] + async with PROCESSING_POOL_LOCK: + # pool capacity + pool_instances = await get_pool_instances(session, project_model, pool.name) + relevant_instances = filter_pool_instances( + pool_instances, profile, run_spec.configuration.resources, status=InstanceStatus.READY + ) - # need lock - instance.status = InstanceStatus.BUSY - instance.job = job_model + if relevant_instances: + sorted_instances = sorted(relevant_instances, key=lambda instance: instance.name) + instance = sorted_instances[0] + instance.status = InstanceStatus.BUSY + instance.job = job_model - logger.info(*job_log("now is provisioning", job_model)) - job_model.job_provisioning_data = instance.job_provisioning_data - job_model.status = JobStatus.PROVISIONING - job_model.last_processed_at = common_utils.get_current_datetime() + logger.info(*job_log("now is provisioning", job_model)) + job_model.job_provisioning_data = instance.job_provisioning_data + job_model.status = JobStatus.PROVISIONING + job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() + await session.commit() - return + return run = run_model_to_run(run_model) job = run.jobs[job_model.job_num] diff --git a/src/dstack/_internal/server/services/pools.py b/src/dstack/_internal/server/services/pools.py index 67183ec1d..8d9a8c055 100644 --- a/src/dstack/_internal/server/services/pools.py +++ b/src/dstack/_internal/server/services/pools.py @@ -27,6 +27,7 @@ from dstack._internal.core.models.runs import InstanceStatus, JobProvisioningData, Requirements from dstack._internal.server import settings from dstack._internal.server.models import InstanceModel, PoolModel, ProjectModel +from dstack._internal.server.services.jobs import PROCESSING_POOL_LOCK from dstack._internal.utils import common as common_utils from dstack._internal.utils import random_names from dstack._internal.utils.common import get_current_datetime @@ -165,18 +166,18 @@ async def remove_instance( logger.warning("Couldn't find pool") return - # TODO: need lock - terminated = False - for instance in pool.instances: - if instance.name == instance_name: - if force or instance.job_id is None: - instance.status = InstanceStatus.TERMINATING - terminated = True + async with PROCESSING_POOL_LOCK: + terminated = False + for instance in pool.instances: + if instance.name == instance_name: + if force or instance.job_id is None: + instance.status = InstanceStatus.TERMINATING + terminated = True - if not terminated: - logger.warning("Couldn't find instance to terminate") + if not terminated: + logger.warning("Couldn't find instance to terminate") - await session.commit() + await session.commit() async def delete_pool(session: AsyncSession, project: ProjectModel, pool_name: str) -> None: From 685274a2a674ee7d499945cc410132b17f86628c Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Thu, 15 Feb 2024 16:54:02 +0300 Subject: [PATCH 07/10] pool show -> pool ps --- src/dstack/_internal/cli/commands/pool.py | 16 +++++++++++----- .../_internal/server/services/test_pools.py | 2 +- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/dstack/_internal/cli/commands/pool.py b/src/dstack/_internal/cli/commands/pool.py index c0953f9f3..928d39c8a 100644 --- a/src/dstack/_internal/cli/commands/pool.py +++ b/src/dstack/_internal/cli/commands/pool.py @@ -77,18 +77,24 @@ def _register(self) -> None: delete_parser.set_defaults(subfunc=self._delete) # show pool instances - show_parser = subparsers.add_parser( - "show", + ps_parser = subparsers.add_parser( + "ps", help="Show pool instances", description="Show instances in the pool", formatter_class=self._parser.formatter_class, ) - show_parser.add_argument( + ps_parser.add_argument( "--pool", dest="pool_name", help="The name of the pool. If not set, the default pool will be used", ) - show_parser.set_defaults(subfunc=self._show) + ps_parser.add_argument( + "-w", + "--watch", + help="Watch instances in realtime", + action="store_true", + ) + ps_parser.set_defaults(subfunc=self._ps) # add instance add_parser = subparsers.add_parser( @@ -196,7 +202,7 @@ def _set_default(self, args: argparse.Namespace) -> None: if not result: console.print(f"Failed to set default pool {args.pool_name!r}", style="error") - def _show(self, args: argparse.Namespace) -> None: + def _ps(self, args: argparse.Namespace) -> None: resp = self.api.client.pool.show(self.api.project, args.pool_name) console.print(f" [bold]Pool name[/] {resp.name}\n") print_instance_table(resp.instances) diff --git a/src/tests/_internal/server/services/test_pools.py b/src/tests/_internal/server/services/test_pools.py index a0fc85594..45de935b8 100644 --- a/src/tests/_internal/server/services/test_pools.py +++ b/src/tests/_internal/server/services/test_pools.py @@ -183,7 +183,7 @@ def test_convert_instance(self): hostname="hostname_test", status=InstanceStatus.PENDING, created=created, - region='eu-west-1', + region="eu-west-1", price=1.0, ) From 050e542bf72cb83811ea2345aac80ac4e1f60a9b Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Thu, 15 Feb 2024 16:54:40 +0300 Subject: [PATCH 08/10] Added -w for `dstack pool ps` --- src/dstack/_internal/cli/commands/pool.py | 28 ++++++++++++++++++----- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/dstack/_internal/cli/commands/pool.py b/src/dstack/_internal/cli/commands/pool.py index 928d39c8a..cdfe04c2f 100644 --- a/src/dstack/_internal/cli/commands/pool.py +++ b/src/dstack/_internal/cli/commands/pool.py @@ -1,8 +1,10 @@ import argparse import datetime +import time from pathlib import Path from typing import Sequence +from rich.live import Live from rich.table import Table from dstack._internal.cli.commands import APIBaseCommand @@ -32,6 +34,9 @@ from dstack.api._public.resources import Resources from dstack.api.utils import load_profile +REFRESH_RATE_PER_SEC = 5 +LIVE_PROVISION_INTERVAL_SECS = 10 + logger = get_logger(__name__) @@ -203,9 +208,21 @@ def _set_default(self, args: argparse.Namespace) -> None: console.print(f"Failed to set default pool {args.pool_name!r}", style="error") def _ps(self, args: argparse.Namespace) -> None: - resp = self.api.client.pool.show(self.api.project, args.pool_name) - console.print(f" [bold]Pool name[/] {resp.name}\n") - print_instance_table(resp.instances) + if not args.watch: + resp = self.api.client.pool.show(self.api.project, args.pool_name) + console.print(f" [bold]Pool name[/] {resp.name}\n") + console.print(print_instance_table(resp.instances)) + console.print() + return + + try: + with Live(console=console, refresh_per_second=REFRESH_RATE_PER_SEC) as live: + while True: + resp = self.api.client.pool.show(self.api.project, args.pool_name) + live.update(print_instance_table(resp.instances)) + time.sleep(LIVE_PROVISION_INTERVAL_SECS) + except KeyboardInterrupt: + pass def _add(self, args: argparse.Namespace) -> None: super()._command(args) @@ -293,7 +310,7 @@ def print_pool_table(pools: Sequence[Pool], verbose: bool) -> None: console.print() -def print_instance_table(instances: Sequence[Instance]) -> None: +def print_instance_table(instances: Sequence[Instance]) -> Table: table = Table(box=None) table.add_column("INSTANCE") table.add_column("BACKEND") @@ -323,8 +340,7 @@ def print_instance_table(instances: Sequence[Instance]) -> None: ] table.add_row(*row) - console.print(table) - console.print() + return table def print_offers_table( From 8a3f12b34a1a99d96280e8b1631a2bee2549436e Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Fri, 16 Feb 2024 14:58:58 +0300 Subject: [PATCH 09/10] Fix ps -w update --- src/dstack/_internal/cli/commands/pool.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/cli/commands/pool.py b/src/dstack/_internal/cli/commands/pool.py index cdfe04c2f..2017cdef0 100644 --- a/src/dstack/_internal/cli/commands/pool.py +++ b/src/dstack/_internal/cli/commands/pool.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Sequence +from rich.console import Group from rich.live import Live from rich.table import Table @@ -208,9 +209,10 @@ def _set_default(self, args: argparse.Namespace) -> None: console.print(f"Failed to set default pool {args.pool_name!r}", style="error") def _ps(self, args: argparse.Namespace) -> None: + pool_name_template = " [bold]Pool name[/] {}\n" if not args.watch: resp = self.api.client.pool.show(self.api.project, args.pool_name) - console.print(f" [bold]Pool name[/] {resp.name}\n") + console.print(pool_name_template.format(resp.name)) console.print(print_instance_table(resp.instances)) console.print() return @@ -219,7 +221,10 @@ def _ps(self, args: argparse.Namespace) -> None: with Live(console=console, refresh_per_second=REFRESH_RATE_PER_SEC) as live: while True: resp = self.api.client.pool.show(self.api.project, args.pool_name) - live.update(print_instance_table(resp.instances)) + group = Group( + pool_name_template.format(resp.name), print_instance_table(resp.instances) + ) + live.update(group) time.sleep(LIVE_PROVISION_INTERVAL_SECS) except KeyboardInterrupt: pass From c61bac574377dee4f40a5f8d82c85bb2ac4a99ec Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Fri, 16 Feb 2024 18:12:15 +0300 Subject: [PATCH 10/10] Improve InstanceModel fail handling --- .../server/background/tasks/process_pools.py | 72 ++++++++++++++----- ...ba5_added_fail_reason_for_instancemodel.py | 26 +++++++ src/dstack/_internal/server/models.py | 13 +++- .../server/services/runner/client.py | 4 +- 4 files changed, 95 insertions(+), 20 deletions(-) create mode 100644 src/dstack/_internal/server/migrations/versions/ea4cd670dba5_added_fail_reason_for_instancemodel.py diff --git a/src/dstack/_internal/server/background/tasks/process_pools.py b/src/dstack/_internal/server/background/tasks/process_pools.py index 0e5508f48..88f658bf5 100644 --- a/src/dstack/_internal/server/background/tasks/process_pools.py +++ b/src/dstack/_internal/server/background/tasks/process_pools.py @@ -1,6 +1,7 @@ import datetime +from dataclasses import dataclass from datetime import timedelta -from typing import Dict +from typing import Dict, Optional, Union from uuid import UUID from pydantic import parse_raw_as @@ -25,6 +26,16 @@ PENDING_JOB_RETRY_INTERVAL = timedelta(seconds=60) + +@dataclass +class HealthStatus: + healthy: bool + reason: str + + def __str__(self): + return self.reason + + logger = get_logger(__name__) @@ -78,40 +89,67 @@ async def check_shim(instance_id: UUID) -> None: ssh_private_key = instance.project.ssh_private_key job_provisioning_data = parse_raw_as(JobProvisioningData, instance.job_provisioning_data) - instance_health = instance_healthcheck(ssh_private_key, job_provisioning_data) + instance_health: Union[Optional[HealthStatus], bool] = instance_healthcheck( + ssh_private_key, job_provisioning_data + ) + if isinstance(instance_health, bool) or instance_health is None: + health = HealthStatus(healthy=False, reason="SSH or tunnel error") + else: + health = instance_health - logger.debug("check instance %s status: shim health is %s", instance.name, instance_health) + if health.healthy: + logger.debug("check instance %s status: shim health is OK", instance.name) + instance.fail_count = 0 + instance.fail_reason = None - if instance_health: if instance.status in (InstanceStatus.CREATING, InstanceStatus.STARTING): instance.status = ( InstanceStatus.READY if instance.job_id is None else InstanceStatus.BUSY ) await session.commit() else: + logger.debug("check instance %s status: shim health: %s", instance.name, health) + + instance.fail_count += 1 + instance.fail_reason = health.reason + if instance.status in (InstanceStatus.READY, InstanceStatus.BUSY): logger.warning( "instance %s shim is not available, marked as failed", instance.name ) - instance.status = InstanceStatus.FAILED - - STARTING_TIMEOUT = 60 * 3 - expire_starting = ( - instance.created + timedelta(seconds=STARTING_TIMEOUT) < get_current_datetime() - ) - if instance.status == InstanceStatus.STARTING and expire_starting: - instance.status = InstanceStatus.FAILED + FAIL_THRESHOLD = 10 * 6 * 20 # instance_healthcheck fails 20 minutes constantly + if instance.fail_count > FAIL_THRESHOLD: + instance.status = InstanceStatus.TERMINATING + logger.warning("mark instance %s as TERMINATED", instance.name) + + if instance.status == InstanceStatus.STARTING and instance.started_at is not None: + STARTING_TIMEOUT = 10 * 60 # 10 minutes + starting_time_threshold = instance.started_at + timedelta(seconds=STARTING_TIMEOUT) + expire_starting = starting_time_threshold < get_current_datetime() + if expire_starting: + instance.status = InstanceStatus.TERMINATING await session.commit() @runner_ssh_tunnel(ports=[client.REMOTE_SHIM_PORT], retries=1) -def instance_healthcheck(*, ports: Dict[int, int]) -> bool: +def instance_healthcheck(*, ports: Dict[int, int]) -> HealthStatus: shim_client = client.ShimClient(port=ports[client.REMOTE_SHIM_PORT]) - resp = shim_client.healthcheck() - if resp is None: - return False # shim is not available yet - return resp.service == "dstack-shim" + try: + resp = shim_client.healthcheck(unmask_exeptions=True) + + if resp is None: + return HealthStatus(healthy=False, reason="Unknown reason") + + if resp.service == "dstack-shim": + return HealthStatus(healthy=True, reason="Service is OK") + else: + return HealthStatus( + healthy=False, + reason=f"Service name is {resp.service}, service version: {resp.version}", + ) + except Exception as e: + return HealthStatus(healthy=False, reason=f"Exception ({e.__class__.__name__}): {e}") async def terminate(instance_id: UUID) -> None: diff --git a/src/dstack/_internal/server/migrations/versions/ea4cd670dba5_added_fail_reason_for_instancemodel.py b/src/dstack/_internal/server/migrations/versions/ea4cd670dba5_added_fail_reason_for_instancemodel.py new file mode 100644 index 000000000..16e6a7bf5 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/ea4cd670dba5_added_fail_reason_for_instancemodel.py @@ -0,0 +1,26 @@ +"""Added fail reason for InstanceModel + +Revision ID: ea4cd670dba5 +Revises: 29c08c6a8cb3 +Create Date: 2024-02-16 18:10:38.805380 + +""" + + +# revision identifiers, used by Alembic. +revision = "ea4cd670dba5" +down_revision = "29c08c6a8cb3" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + pass + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + pass + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index 42c81c424..885b09dd8 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -267,6 +267,8 @@ class InstanceModel(BaseModel): UUIDType(binary=False), primary_key=True, default=uuid.uuid4 ) name: Mapped[str] = mapped_column(String(50)) + + # instance created_at: Mapped[datetime] = mapped_column(DateTime, default=get_current_datetime) deleted: Mapped[bool] = mapped_column(Boolean, default=False) deleted_at: Mapped[Optional[datetime]] = mapped_column(DateTime) @@ -283,20 +285,27 @@ class InstanceModel(BaseModel): started_at: Mapped[Optional[datetime]] = mapped_column(DateTime, default=get_current_datetime) finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime, default=get_current_datetime) + # temination policy termination_policy: Mapped[Optional[TerminationPolicy]] = mapped_column(String(50)) termination_idle_time: Mapped[int] = mapped_column( Integer, default=DEFAULT_TERMINATION_IDLE_TIME ) + # connection fail handling + fail_count: Mapped[int] = mapped_column(Integer, default=0) + fail_reason: Mapped[Optional[str]] = mapped_column(String(4000)) + + # backend backend: Mapped[BackendType] = mapped_column(Enum(BackendType)) backend_data: Mapped[Optional[str]] = mapped_column(String(4000)) + + # offer + offer: Mapped[str] = mapped_column(String(4000)) region: Mapped[str] = mapped_column(String(2000)) price: Mapped[float] = mapped_column(Float) job_provisioning_data: Mapped[str] = mapped_column(String(4000)) - offer: Mapped[str] = mapped_column(String(4000)) - # current job job_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("jobs.id")) job: Mapped[Optional["JobModel"]] = relationship(back_populates="instance", lazy="immediate") diff --git a/src/dstack/_internal/server/services/runner/client.py b/src/dstack/_internal/server/services/runner/client.py index e51360178..738fab2ae 100644 --- a/src/dstack/_internal/server/services/runner/client.py +++ b/src/dstack/_internal/server/services/runner/client.py @@ -92,12 +92,14 @@ def __init__( self.hostname = hostname self.port = port - def healthcheck(self) -> Optional[HealthcheckResponse]: + def healthcheck(self, unmask_exeptions: bool = False) -> Optional[HealthcheckResponse]: try: resp = requests.get(self._url("/api/healthcheck")) resp.raise_for_status() return HealthcheckResponse.parse_obj(resp.json()) except requests.exceptions.RequestException: + if unmask_exeptions: + raise return None def submit(self, username: str, password: str, image_name: str):