Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 58 additions & 59 deletions src/dstack/_internal/cli/services/configurators/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Dict, List, Optional, Set, Tuple

import gpuhunt
from rich.live import Live

import dstack._internal.core.models.resources as resources
from dstack._internal.cli.services.args import disk_spec, gpu_spec, port_mapping
Expand All @@ -14,7 +15,12 @@
BaseApplyConfigurator,
)
from dstack._internal.cli.services.profile import apply_profile_args, register_profile_args
from dstack._internal.cli.utils.common import confirm_ask, console
from dstack._internal.cli.utils.common import (
LIVE_TABLE_PROVISION_INTERVAL_SECS,
LIVE_TABLE_REFRESH_RATE_PER_SEC,
confirm_ask,
console,
)
from dstack._internal.cli.utils.run import print_run_plan
from dstack._internal.core.errors import (
CLIError,
Expand Down Expand Up @@ -165,65 +171,16 @@ def apply_configuration(
abort_at_exit = False
try:
# We can attach to run multiple times if it goes from running to pending (retried).
while True:
with console.status(f"Launching [code]{run.name}[/]") as status:
while run.status in (
RunStatus.SUBMITTED,
RunStatus.PENDING,
RunStatus.PROVISIONING,
):
job_statuses = "\n".join(
f" - {job.job_spec.job_name} [secondary]({job.job_submissions[-1].status.value})[/]"
for job in run._run.jobs
)
status.update(
f"Launching [code]{run.name}[/] [secondary]({run.status.value})[/]\n{job_statuses}"
)
time.sleep(5)
run.refresh()
console.print(
f"[code]{run.name}[/] provisioning completed [secondary]({run.status.value})[/]"
)
Comment on lines -168 to -186
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is deleted but without it you will always get Lost run connection. since you later try to check if run is still active but it's not even started yet.


current_job_submission = run._run.latest_job_submission
if run.status in (RunStatus.RUNNING, RunStatus.DONE):
if run._run.run_spec.configuration.type == RunConfigurationType.SERVICE.value:
console.print(
f"Service is published at [link={run.service_url}]{run.service_url}[/]\n"
)
bind_address: Optional[str] = getattr(
configurator_args, _BIND_ADDRESS_ARG, None
)
try:
if run.attach(bind_address=bind_address):
for entry in run.logs():
sys.stdout.buffer.write(entry)
sys.stdout.buffer.flush()
else:
console.print("[error]Failed to attach, exiting...[/]")
exit(1)
finally:
run.detach()

# After reading the logs, the run may not be marked as finished immediately.
# Give the run some time to transit into a finished state before exiting.
reattach = False
for _ in range(30):
run.refresh()
if _run_resubmitted(run, current_job_submission):
# The run was resubmitted
reattach = True
# Live table setup
with Live(console=console, refresh_per_second=LIVE_TABLE_REFRESH_RATE_PER_SEC) as live:
while True:
live.update(self.get_runs_table([run], verbose=True))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot access attribute "get_runs_table" for class "BaseRunConfigurator*"

Ensure get_runs_table is properly imported and used.

# if run.status.is_finished():
if _finished_provisioning(run, configurator_args):
break
if run.status.is_finished():
_print_finished_message(run)
exit(_get_run_exit_code(run))
time.sleep(1)
if not reattach:
console.print(
"[error]Lost run connection. Timed out waiting for run final status."
" Check `dstack ps` to see if it's done or failed."
)
exit(1)
time.sleep(LIVE_TABLE_PROVISION_INTERVAL_SECS)
run.refresh()
run = self.api.client.runs.get(self.api.project, run.name)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run = self.api.client.runs.get(self.api.project, run.name) is not needed.

except KeyboardInterrupt:
try:
if not confirm_ask(f"\nStop the run [code]{run.name}[/] before detaching?"):
Expand Down Expand Up @@ -530,3 +487,45 @@ def _run_resubmitted(run: Run, current_job_submission: Optional[JobSubmission])
not run.status.is_finished()
and run._run.latest_job_submission.submitted_at > current_job_submission.submitted_at
)


def _finished_provisioning(run: Run, configurator_args) -> bool:
current_job_submission = run._run.latest_job_submission
if run.status in (RunStatus.RUNNING, RunStatus.DONE):
if run._run.run_spec.configuration.type == RunConfigurationType.SERVICE.value:
console.print(
f"Service is published at [link={run.service_url}]{run.service_url}[/]\n"
)
bind_address: Optional[str] = getattr(configurator_args, _BIND_ADDRESS_ARG, None)
try:
if run.attach(bind_address=bind_address):
for entry in run.logs():
sys.stdout.buffer.write(entry)
sys.stdout.buffer.flush()
else:
console.print("[error]Failed to attach, exiting...[/]")
exit(1)
finally:
run.detach()

# After reading the logs, the run may not be marked as finished immediately.
# Give the run some time to transit into a finished state before exiting.
reattach = False
for _ in range(30):
run.refresh()
if _run_resubmitted(run, current_job_submission):
# The run was resubmitted
reattach = True
break
if run.status.is_finished():
_print_finished_message(run)
exit(_get_run_exit_code(run))
time.sleep(1)
if not reattach:
console.print(
"[error]Lost run connection. Timed out waiting for run final status."
" Check `dstack ps` to see if it's done or failed."
)
exit(1)
return True
return False