Skip to content

Commit

Permalink
Handle all ray job statuses (#4389)
Browse files Browse the repository at this point in the history
Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
  • Loading branch information
EngHabu committed Nov 10, 2023
1 parent 243a8cb commit 7712626
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont
return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil
}

// Kuberay creates a Ray cluster first, and then submits a Ray job to the cluster
// KubeRay creates a Ray cluster first, and then submits a Ray job to the cluster
switch rayJob.Status.JobDeploymentStatus {
case rayv1alpha1.JobDeploymentStatusInitializing:
return pluginsCore.PhaseInfoInitializing(rayJob.CreationTimestamp.Time, pluginsCore.DefaultPhaseVersion, "cluster is creating", info), nil
Expand All @@ -480,7 +480,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont
case rayv1alpha1.JobDeploymentStatusFailedJobDeploy:
reason := fmt.Sprintf("Failed to submit Ray job %s with error: %s", rayJob.Name, rayJob.Status.Message)
return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil
case rayv1alpha1.JobDeploymentStatusWaitForDashboard:
case rayv1alpha1.JobDeploymentStatusWaitForDashboard, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus:
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil
case rayv1alpha1.JobDeploymentStatusRunning, rayv1alpha1.JobDeploymentStatusComplete:
switch rayJob.Status.JobStatus {
Expand All @@ -491,10 +491,19 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont
return pluginsCore.PhaseInfoSuccess(info), nil
case rayv1alpha1.JobStatusPending, rayv1alpha1.JobStatusRunning:
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil
case rayv1alpha1.JobStatusStopped:
// There is no current usage of this job status in KubeRay. It's unclear what it represents
fallthrough
default:
// We already handle all known job status, so this should never happen unless a future version of ray
// introduced a new job status.
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job status: %s", rayJob.Status.JobStatus)
}
default:
// We already handle all known deployment status, so this should never happen unless a future version of ray
// introduced a new job status.
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job deployment status: %s", rayJob.Status.JobDeploymentStatus)
}

return pluginsCore.PhaseInfoUndefined, nil
}

func init() {
Expand Down

0 comments on commit 7712626

Please sign in to comment.