diff --git a/src/dstack/_internal/cli/services/configurators/run.py b/src/dstack/_internal/cli/services/configurators/run.py index 7ce6f72b6a..b5db81e6ba 100644 --- a/src/dstack/_internal/cli/services/configurators/run.py +++ b/src/dstack/_internal/cli/services/configurators/run.py @@ -6,6 +6,7 @@ from typing import Dict, List, Optional, Set, Tuple import gpuhunt +from rich.live import Live import dstack._internal.core.models.resources as resources from dstack._internal.cli.services.args import disk_spec, gpu_spec, port_mapping @@ -14,7 +15,12 @@ BaseApplyConfigurator, ) from dstack._internal.cli.services.profile import apply_profile_args, register_profile_args -from dstack._internal.cli.utils.common import confirm_ask, console +from dstack._internal.cli.utils.common import ( + LIVE_TABLE_PROVISION_INTERVAL_SECS, + LIVE_TABLE_REFRESH_RATE_PER_SEC, + confirm_ask, + console, +) from dstack._internal.cli.utils.run import print_run_plan from dstack._internal.core.errors import ( CLIError, @@ -165,65 +171,16 @@ def apply_configuration( abort_at_exit = False try: # We can attach to run multiple times if it goes from running to pending (retried). - while True: - with console.status(f"Launching [code]{run.name}[/]") as status: - while run.status in ( - RunStatus.SUBMITTED, - RunStatus.PENDING, - RunStatus.PROVISIONING, - ): - job_statuses = "\n".join( - f" - {job.job_spec.job_name} [secondary]({job.job_submissions[-1].status.value})[/]" - for job in run._run.jobs - ) - status.update( - f"Launching [code]{run.name}[/] [secondary]({run.status.value})[/]\n{job_statuses}" - ) - time.sleep(5) - run.refresh() - console.print( - f"[code]{run.name}[/] provisioning completed [secondary]({run.status.value})[/]" - ) - - current_job_submission = run._run.latest_job_submission - if run.status in (RunStatus.RUNNING, RunStatus.DONE): - if run._run.run_spec.configuration.type == RunConfigurationType.SERVICE.value: - console.print( - f"Service is published at [link={run.service_url}]{run.service_url}[/]\n" - ) - bind_address: Optional[str] = getattr( - configurator_args, _BIND_ADDRESS_ARG, None - ) - try: - if run.attach(bind_address=bind_address): - for entry in run.logs(): - sys.stdout.buffer.write(entry) - sys.stdout.buffer.flush() - else: - console.print("[error]Failed to attach, exiting...[/]") - exit(1) - finally: - run.detach() - - # After reading the logs, the run may not be marked as finished immediately. - # Give the run some time to transit into a finished state before exiting. - reattach = False - for _ in range(30): - run.refresh() - if _run_resubmitted(run, current_job_submission): - # The run was resubmitted - reattach = True + # Live table setup + with Live(console=console, refresh_per_second=LIVE_TABLE_REFRESH_RATE_PER_SEC) as live: + while True: + live.update(self.get_runs_table([run], verbose=True)) + # if run.status.is_finished(): + if _finished_provisioning(run, configurator_args): break - if run.status.is_finished(): - _print_finished_message(run) - exit(_get_run_exit_code(run)) - time.sleep(1) - if not reattach: - console.print( - "[error]Lost run connection. Timed out waiting for run final status." - " Check `dstack ps` to see if it's done or failed." - ) - exit(1) + time.sleep(LIVE_TABLE_PROVISION_INTERVAL_SECS) + run.refresh() + run = self.api.client.runs.get(self.api.project, run.name) except KeyboardInterrupt: try: if not confirm_ask(f"\nStop the run [code]{run.name}[/] before detaching?"): @@ -530,3 +487,45 @@ def _run_resubmitted(run: Run, current_job_submission: Optional[JobSubmission]) not run.status.is_finished() and run._run.latest_job_submission.submitted_at > current_job_submission.submitted_at ) + + +def _finished_provisioning(run: Run, configurator_args) -> bool: + current_job_submission = run._run.latest_job_submission + if run.status in (RunStatus.RUNNING, RunStatus.DONE): + if run._run.run_spec.configuration.type == RunConfigurationType.SERVICE.value: + console.print( + f"Service is published at [link={run.service_url}]{run.service_url}[/]\n" + ) + bind_address: Optional[str] = getattr(configurator_args, _BIND_ADDRESS_ARG, None) + try: + if run.attach(bind_address=bind_address): + for entry in run.logs(): + sys.stdout.buffer.write(entry) + sys.stdout.buffer.flush() + else: + console.print("[error]Failed to attach, exiting...[/]") + exit(1) + finally: + run.detach() + + # After reading the logs, the run may not be marked as finished immediately. + # Give the run some time to transit into a finished state before exiting. + reattach = False + for _ in range(30): + run.refresh() + if _run_resubmitted(run, current_job_submission): + # The run was resubmitted + reattach = True + break + if run.status.is_finished(): + _print_finished_message(run) + exit(_get_run_exit_code(run)) + time.sleep(1) + if not reattach: + console.print( + "[error]Lost run connection. Timed out waiting for run final status." + " Check `dstack ps` to see if it's done or failed." + ) + exit(1) + return True + return False