Skip to content

Commit

Permalink
[Jobs] Fix race condition in supervisor actor creation and add timeou…
Browse files Browse the repository at this point in the history
…t for pending jobs (ray-project#34223)

@rkooo567 and @sihanwang41 found a race condition when submitting a job causing the job to fail. The failure happens when this sequence of events happens:

A job is submitted. Its job_info is put to the internal KV. This happens here, before the JobSupervisor is actually created.
In the constructor of JobManager, we call await self._recover_running_jobs(), which finds the job_info in the internal KV and starts to monitor that job. Because the JobSupervisor actor doesn't exist yet, the JobManager job monitoring loop fails to ping it, and puts the status of this job as FAILED in the internal KV.
The JobSupervisor is created. JobSupervisor.run() checks that the status is PENDING, but it's not, so it raises the error "run should only be called once" which is not helpful to the user.
If step 2 happens before step 1, there's no issue. But these are both async, so the order isn't guaranteed.

The solution in this PR is to allow the JobManager monitoring loop to handle the case PENDING. It handles it by skipping the ping to the JobSupervisor actor for that iteration of the loop.

This PR adds a unit test that fails with ray-project#34190 (which forces the race condition).

This PR also adds a timeout to fail jobs that have been pending for 15 minutes, configurable via environment variable.

Some questions are still open:

Why did this only start to fail recently? The only recent change is [Jobs] Fix race condition on submitting multiple jobs with the same id ray-project#33259, but it's not clear how this would matter in the case of a single job.
What is a reasonable default timeout for pending jobs, and should we even have one? It should be larger than the existing runtime_env setup timeout (10 minutes) in order to distinguish runtime env setup timeouts from other timeouts. Not sure if there are other existing timeouts that we should consider.
  • Loading branch information
architkulkarni committed Apr 12, 2023
1 parent 35fdf67 commit 9a725ae
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 16 deletions.
4 changes: 4 additions & 0 deletions dashboard/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
WAIT_AVAILABLE_AGENT_TIMEOUT = 10
TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS = 0.1
RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR = "RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES"

# The max time to wait for the JobSupervisor to start before failing the job.
DEFAULT_JOB_START_TIMEOUT_SECONDS = 60 * 15
RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR = "RAY_JOB_START_TIMEOUT_SECONDS"
# Port that dashboard prometheus metrics will be exported to
DASHBOARD_METRIC_PORT = env_integer("DASHBOARD_METRIC_PORT", 44227)
COMPONENT_METRICS_TAG_KEYS = ["ip", "pid", "Component", "SessionName"]
Expand Down
115 changes: 100 additions & 15 deletions dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
import ray._private.ray_constants as ray_constants
from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
from ray.actor import ActorHandle
from ray.dashboard.consts import RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR
from ray.dashboard.consts import (
RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR,
DEFAULT_JOB_START_TIMEOUT_SECONDS,
RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR,
)
from ray.dashboard.modules.job.common import (
JOB_ID_METADATA_KEY,
JOB_NAME_METADATA_KEY,
Expand Down Expand Up @@ -351,16 +355,29 @@ async def run(
resources_specified: bool = False,
):
"""
Stop and start both happen asynchrously, coordinated by asyncio event
Stop and start both happen asynchronously, coordinated by asyncio event
and coroutine, respectively.
1) Sets job status as running
2) Pass runtime env and metadata to subprocess as serialized env
variables.
3) Handle concurrent events of driver execution and
"""
curr_status = await self._job_info_client.get_status(self._job_id)
assert curr_status == JobStatus.PENDING, "Run should only be called once."
curr_info = await self._job_info_client.get_info(self._job_id)
if curr_info is None:
raise RuntimeError(f"Status could not be retrieved for job {self._job_id}.")
curr_status = curr_info.status
curr_message = curr_info.message
if curr_status == JobStatus.RUNNING:
raise RuntimeError(
f"Job {self._job_id} is already in RUNNING state. "
f"JobSupervisor.run() should only be called once. "
)
if curr_status != JobStatus.PENDING:
raise RuntimeError(
f"Job {self._job_id} is not in PENDING state. "
f"Current status is {curr_status} with message {curr_message}."
)

if _start_signal_actor:
# Block in PENDING state until start signal received.
Expand Down Expand Up @@ -555,22 +572,90 @@ async def _monitor_job(
async def _monitor_job_internal(
self, job_id: str, job_supervisor: Optional[ActorHandle] = None
):
is_alive = True
if job_supervisor is None:
job_supervisor = self._get_actor_for_job(job_id)
timeout = float(
os.environ.get(
RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR,
DEFAULT_JOB_START_TIMEOUT_SECONDS,
)
)

if job_supervisor is None:
logger.error(f"Failed to get job supervisor for job {job_id}.")
await self._job_info_client.put_status(
job_id,
JobStatus.FAILED,
message="Unexpected error occurred: Failed to get job supervisor.",
)
is_alive = False
is_alive = True

while is_alive:
try:
job_status = await self._job_info_client.get_status(job_id)
if job_status == JobStatus.PENDING:
# Compare the current time with the job start time.
# If the job is still pending, we will set the status
# to FAILED.
job_info = await self._job_info_client.get_info(job_id)

if time.time() - job_info.start_time / 1000 > timeout:
err_msg = (
"Job supervisor actor failed to start within "
f"{timeout} seconds. This timeout can be "
f"configured by setting the environment "
f"variable {RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR}."
)
resources_specified = (
(
job_info.entrypoint_num_cpus is not None
and job_info.entrypoint_num_cpus > 0
)
or (
job_info.entrypoint_num_gpus is not None
and job_info.entrypoint_num_gpus > 0
)
or (
job_info.entrypoint_resources is not None
and len(job_info.entrypoint_resources) > 0
)
)
if resources_specified:
err_msg += (
" This may be because the job entrypoint's specified "
"resources (entrypoint_num_cpus, entrypoint_num_gpus, "
"entrypoint_resources) aren't available on the cluster."
" Try checking the cluster's available resources with "
"`ray status` and specifying fewer resources for the "
"job entrypoint."
)
await self._job_info_client.put_status(
job_id,
JobStatus.FAILED,
message=err_msg,
)
is_alive = False
logger.error(err_msg)
continue

if job_supervisor is None:
job_supervisor = self._get_actor_for_job(job_id)

if job_supervisor is None:
if job_status == JobStatus.PENDING:
# Maybe the job supervisor actor is not created yet.
# We will wait for the next loop.
continue
else:
# The job supervisor actor is not created, but the job
# status is not PENDING. This means the job supervisor
# actor is not created due to some unexpected errors.
# We will set the job status to FAILED.
logger.error(f"Failed to get job supervisor for job {job_id}.")
await self._job_info_client.put_status(
job_id,
JobStatus.FAILED,
message=(
"Unexpected error occurred: "
"Failed to get job supervisor."
),
)
is_alive = False
continue

await job_supervisor.ping.remote()

await asyncio.sleep(self.JOB_MONITOR_LOOP_PERIOD_S)
except Exception as e:
is_alive = False
Expand Down
61 changes: 60 additions & 1 deletion dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
JobSupervisor,
generate_job_id,
)
from ray.dashboard.consts import RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR
from ray.dashboard.consts import (
RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR,
RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR,
)
from ray.job_submission import JobStatus
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy # noqa: F401
from ray.tests.conftest import call_ray_start # noqa: F401
Expand Down Expand Up @@ -1076,5 +1079,61 @@ async def test_simultaneous_drivers(job_manager):
assert "done" in job_manager.get_job_logs(job_id)


@pytest.mark.asyncio
async def test_monitor_job_pending(job_manager):
"""Test that monitor_job does not error when the job is PENDING."""

# Create a signal actor to keep the job pending.
start_signal_actor = SignalActor.remote()

# Submit a job.
job_id = await job_manager.submit_job(
entrypoint="echo 'hello world'",
_start_signal_actor=start_signal_actor,
)

# Trigger _recover_running_jobs while the job is still pending. This
# will pick up the new pending job.
await job_manager._recover_running_jobs()

# Trigger the job to start.
ray.get(start_signal_actor.send.remote())

# Wait for the job to finish.
await async_wait_for_condition_async_predicate(
check_job_succeeded, job_manager=job_manager, job_id=job_id
)


@pytest.mark.asyncio
async def test_job_pending_timeout(job_manager, monkeypatch):
"""Test the timeout for pending jobs."""

monkeypatch.setenv(RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR, "0.1")

# Create a signal actor to keep the job pending.
start_signal_actor = SignalActor.remote()

# Submit a job.
job_id = await job_manager.submit_job(
entrypoint="echo 'hello world'",
_start_signal_actor=start_signal_actor,
)

# Trigger _recover_running_jobs while the job is still pending. This
# will pick up the new pending job.
await job_manager._recover_running_jobs()

# Wait for the job to timeout.
await async_wait_for_condition_async_predicate(
check_job_failed, job_manager=job_manager, job_id=job_id
)

# Check that the job timed out.
job_info = await job_manager.get_job_info(job_id)
assert job_info.status == JobStatus.FAILED
assert "Job supervisor actor failed to start within" in job_info.message


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

0 comments on commit 9a725ae

Please sign in to comment.