Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 39 additions & 38 deletions src/dstack/_internal/cli/services/configurators/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def apply_configuration(
confirm_message += "Create the fleet?"
else:
action_message += f"Found fleet [code]{plan.spec.configuration.name}[/]."
if plan.current_resource.spec == plan.spec:
if plan.current_resource.spec.configuration == plan.spec.configuration:
if command_args.yes and not command_args.force:
# --force is required only with --yes,
# otherwise we may ask for force apply interactively.
Expand Down Expand Up @@ -208,6 +208,42 @@ def _resolve_ssh_key(ssh_key_path: Optional[str]) -> Optional[SSHKey]:
exit()


def _get_plan(api: Client, spec: FleetSpec) -> FleetPlan:
try:
return api.client.fleets.get_plan(
project_name=api.project,
spec=spec,
)
except requests.exceptions.HTTPError as e:
# Handle older server versions that do not have /get_plan for fleets
# TODO: Can be removed in 0.19
if e.response.status_code == 405:
logger.warning(
"Fleet apply plan is not fully supported before 0.18.17. "
"Update the server to view full-featured apply plan."
)
user = api.client.users.get_my_user()
spec.configuration_path = None
current_resource = None
if spec.configuration.name is not None:
try:
current_resource = api.client.fleets.get(
project_name=api.project, name=spec.configuration.name
)
except ResourceNotExistsError:
pass
return FleetPlan(
project_name=api.project,
user=user.username,
spec=spec,
current_resource=current_resource,
offers=[],
total_offers=0,
max_offer_price=0,
)
raise e


def _print_plan_header(plan: FleetPlan):
def th(s: str) -> str:
return f"[bold]{s}[/bold]"
Expand All @@ -218,8 +254,8 @@ def th(s: str) -> str:

configuration_table.add_row(th("Project"), plan.project_name)
configuration_table.add_row(th("User"), plan.user)
configuration_table.add_row(th("Configuration"), plan.spec.configuration_path)
configuration_table.add_row(th("Type"), "fleet")
configuration_table.add_row(th("Configuration"), plan.spec.configuration_path or "?")
configuration_table.add_row(th("Type"), plan.spec.configuration.type)

fleet_type = "cloud"
nodes = plan.spec.configuration.nodes or "-"
Expand Down Expand Up @@ -301,41 +337,6 @@ def th(s: str) -> str:
console.print()


def _get_plan(api: Client, spec: FleetSpec) -> FleetPlan:
try:
return api.client.fleets.get_plan(
project_name=api.project,
spec=spec,
)
except requests.exceptions.HTTPError as e:
# Handle older server versions that do not have /get_plan for fleets
# TODO: Can be removed in 0.19
if e.response.status_code == 405:
logger.warning(
"Fleet apply plan is not fully supported before 0.18.17. "
"Update the server to view full-featured apply plan."
)
spec.configuration_path = None
current_fleet = None
if spec.configuration.name is not None:
try:
current_fleet = api.client.fleets.get(
project_name=api.project, name=spec.configuration.name
)
except ResourceNotExistsError:
pass
return FleetPlan(
project_name=api.project,
user="?",
spec=spec,
current_resource=current_fleet,
offers=[],
total_offers=0,
max_offer_price=0,
)
raise e


def _finished_provisioning(fleet: Fleet) -> bool:
for instance in fleet.instances:
if instance.status in [InstanceStatus.PENDING, InstanceStatus.PROVISIONING]:
Expand Down
185 changes: 141 additions & 44 deletions src/dstack/_internal/cli/services/configurators/gateway.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
import argparse
import time
from typing import List

from rich.live import Live
from rich.table import Table

from dstack._internal.cli.services.configurators.base import BaseApplyConfigurator
from dstack._internal.cli.utils.common import confirm_ask, console
from dstack._internal.cli.utils.gateway import print_gateways_table
from dstack._internal.cli.utils.common import (
LIVE_TABLE_PROVISION_INTERVAL_SECS,
LIVE_TABLE_REFRESH_RATE_PER_SEC,
confirm_ask,
console,
)
from dstack._internal.cli.utils.gateway import get_gateways_table
from dstack._internal.core.errors import ResourceNotExistsError
from dstack._internal.core.models.configurations import ApplyConfigurationType
from dstack._internal.core.models.gateways import GatewayConfiguration
from dstack._internal.core.models.gateways import (
Gateway,
GatewayConfiguration,
GatewayPlan,
GatewaySpec,
GatewayStatus,
)
from dstack.api._public import Client


class GatewayConfigurator(BaseApplyConfigurator):
Expand All @@ -20,55 +36,87 @@ def apply_configuration(
configurator_args: argparse.Namespace,
unknown_args: List[str],
):
# TODO: Show apply plan
# TODO: Update gateway in-place when domain/default change
confirmed = False
if conf.name is not None:
try:
gateway = self.api.client.gateways.get(
project_name=self.api.project, gateway_name=conf.name
spec = GatewaySpec(
configuration=conf,
configuration_path=configuration_path,
)
with console.status("Getting apply plan..."):
plan = _get_plan(api=self.api, spec=spec)
_print_plan_header(plan)

action_message = ""
confirm_message = ""
if plan.current_resource is None:
if plan.spec.configuration.name is not None:
action_message += (
f"Gateway [code]{plan.spec.configuration.name}[/] does not exist yet."
)
except ResourceNotExistsError:
pass
else:
if gateway.configuration == conf:
if not command_args.force:
console.print(
"Gateway configuration has not changed. Use --force to recreate the gateway."
)
return
if not command_args.yes and not confirm_ask(
"Gateway configuration has not changed. Re-create the gateway?"
):
console.print("\nExiting...")
return
elif not command_args.yes and not confirm_ask(
f"Gateway [code]{conf.name}[/] already exists. Re-create the gateway?"
):
console.print("\nExiting...")
return
confirmed = True
with console.status("Deleting gateway..."):
self.api.client.gateways.delete(
project_name=self.api.project, gateways_names=[conf.name]
confirm_message += "Create the gateway?"
else:
action_message += f"Found gateway [code]{plan.spec.configuration.name}[/]."
if plan.current_resource.configuration == plan.spec.configuration:
if command_args.yes and not command_args.force:
# --force is required only with --yes,
# otherwise we may ask for force apply interactively.
console.print(
"No configuration changes detected. Use --force to apply anyway."
)
if not confirmed and not command_args.yes:
confirm_message = (
"Configuration does not specify the gateway name. Create a new gateway?"
)
if conf.name is not None:
confirm_message = (
f"Gateway [code]{conf.name}[/] does not exist yet. Create the gateway?"
return
action_message += " No configuration changes detected."
confirm_message += "Re-create the gateway?"
else:
action_message += " Configuration changes detected."
confirm_message += "Re-create the gateway?"

console.print(action_message)
if not command_args.yes and not confirm_ask(confirm_message):
console.print("\nExiting...")
return

if plan.current_resource is not None:
with console.status("Deleting existing gateway..."):
self.api.client.gateways.delete(
project_name=self.api.project,
gateways_names=[plan.current_resource.name],
)
if not confirm_ask(confirm_message):
console.print("\nExiting...")
return
# Gateway deletion is async. Wait for gateway to be deleted.
while True:
try:
self.api.client.gateways.get(
project_name=self.api.project,
gateway_name=plan.current_resource.name,
)
except ResourceNotExistsError:
break
else:
time.sleep(1)

with console.status("Creating gateway..."):
gateway = self.api.client.gateways.create(
project_name=self.api.project,
configuration=conf,
)
print_gateways_table([gateway])
if command_args.detach:
console.print("Gateway configuration submitted. Exiting...")
return
console.print()
try:
with Live(console=console, refresh_per_second=LIVE_TABLE_REFRESH_RATE_PER_SEC) as live:
while True:
live.update(get_gateways_table([gateway], verbose=True))
if _finished_provisioning(gateway):
break
time.sleep(LIVE_TABLE_PROVISION_INTERVAL_SECS)
gateway = self.api.client.gateways.get(self.api.project, gateway.name)
except KeyboardInterrupt:
if confirm_ask("Delete the gateway before exiting?"):
with console.status("Deleting gateway..."):
self.api.client.gateways.delete(
project_name=self.api.project,
gateways_names=[gateway.name],
)
else:
console.print("Exiting... Gateway provisioning will continue in the background.")

def delete_configuration(
self,
Expand Down Expand Up @@ -96,3 +144,52 @@ def delete_configuration(
)

console.print(f"Gateway [code]{conf.name}[/] deleted")


def _get_plan(api: Client, spec: GatewaySpec) -> GatewayPlan:
# TODO: Implement server-side /get_plan with an offer included
user = api.client.users.get_my_user()
current_resource = None
if spec.configuration.name is not None:
try:
current_resource = api.client.gateways.get(
project_name=api.project,
gateway_name=spec.configuration.name,
)
except ResourceNotExistsError:
pass
return GatewayPlan(
project_name=api.project,
user=user.username,
spec=spec,
current_resource=current_resource,
)


def _print_plan_header(plan: GatewayPlan):
def th(s: str) -> str:
return f"[bold]{s}[/bold]"

configuration_table = Table(box=None, show_header=False)
configuration_table.add_column(no_wrap=True) # key
configuration_table.add_column() # value

configuration_table.add_row(th("Project"), plan.project_name)
configuration_table.add_row(th("User"), plan.user)
configuration_table.add_row(th("Configuration"), plan.spec.configuration_path)
configuration_table.add_row(th("Type"), plan.spec.configuration.type)

domain = "-"
if plan.spec.configuration.domain is not None:
domain = plan.spec.configuration.domain

configuration_table.add_row(th("Backend"), plan.spec.configuration.backend.value)
configuration_table.add_row(th("Region"), plan.spec.configuration.region)
configuration_table.add_row(th("Domain"), domain)

console.print(configuration_table)
console.print()


def _finished_provisioning(gateway: Gateway) -> bool:
return gateway.status in [GatewayStatus.RUNNING, GatewayStatus.FAILED]
Loading