Skip to content

Commit

Permalink
feat: Support running (kubeflow#894)
Browse files Browse the repository at this point in the history
* feat: Support running

Signed-off-by: Ce Gao <gaoce@caicloud.io>

* fix: Do not mark trial running when the job is created

Signed-off-by: Ce Gao <gaoce@caicloud.io>

* fix: Add nil pointer check

Signed-off-by: Ce Gao <gaoce@caicloud.io>

* fix: Avoid nil pointer

Signed-off-by: Ce Gao <gaoce@caicloud.io>

* fix: Update

Signed-off-by: Ce Gao <gaoce@caicloud.io>
  • Loading branch information
gaocegege authored and k8s-ci-robot committed Dec 6, 2019
1 parent 83eba02 commit 5d46799
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 7 deletions.
2 changes: 0 additions & 2 deletions pkg/controller.v1alpha3/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha3.Trial, desiredJob
}
eventMsg := fmt.Sprintf("Job %s has been created", desiredJob.GetName())
r.recorder.Eventf(instance, corev1.EventTypeNormal, JobCreatedReason, eventMsg)
msg := "Trial is running"
instance.MarkTrialStatusRunning(TrialRunningReason, msg)
} else {
logger.Error(err, "Trial Get error")
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/controller.v1alpha3/trial/trial_controller_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ const (
JobSucceededReason = "JobSucceeded"
JobMetricsUnavailableReason = "MetricsUnavailable"
JobFailedReason = "JobFailed"
JobRunningReason = "JobRunning"
ReconcileFailedReason = "ReconcileFailed"
)
23 changes: 22 additions & 1 deletion pkg/controller.v1alpha3/trial/trial_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
)

func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1alpha3.Trial, deployedJob *unstructured.Unstructured, jobCondition *commonv1.JobCondition) {
if jobCondition == nil || instance == nil || deployedJob == nil {
return
}
now := metav1.Now()
jobConditionType := (*jobCondition).Type
if jobConditionType == commonv1.JobSucceeded {
Expand Down Expand Up @@ -63,8 +66,17 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1alpha3.Tri
eventMsg := fmt.Sprintf("Job %s has failed: %s", deployedJob.GetName(), jobConditionMessage)
r.recorder.Eventf(instance, corev1.EventTypeNormal, JobFailedReason, eventMsg)
IncreaseTrialsFailedCount()
} else if jobConditionType == commonv1.JobRunning {
msg := "Trial is running"
instance.MarkTrialStatusRunning(TrialRunningReason, msg)
jobConditionMessage := (*jobCondition).Message
eventMsg := fmt.Sprintf("Job %s is running: %s",
deployedJob.GetName(), jobConditionMessage)
r.recorder.Eventf(instance, corev1.EventTypeNormal,
JobRunningReason, eventMsg)
// TODO(gaocegege): Should we maintain a TrialsRunningCount?
}
//else nothing to do
// else nothing to do
return
}

Expand Down Expand Up @@ -118,6 +130,9 @@ func (r *ReconcileTrial) updateFinalizers(instance *trialsv1alpha3.Trial, finali
}

func isTrialObservationAvailable(instance *trialsv1alpha3.Trial) bool {
if instance == nil {
return false
}
objectiveMetricName := instance.Spec.Objective.ObjectiveMetricName
if instance.Status.Observation != nil && instance.Status.Observation.Metrics != nil {
for _, metric := range instance.Status.Observation.Metrics {
Expand All @@ -130,6 +145,9 @@ func isTrialObservationAvailable(instance *trialsv1alpha3.Trial) bool {
}

func isTrialComplete(instance *trialsv1alpha3.Trial, jobCondition *commonv1.JobCondition) bool {
if jobCondition == nil || instance == nil {
return false
}
jobConditionType := (*jobCondition).Type
if jobConditionType == commonv1.JobSucceeded && isTrialObservationAvailable(instance) {
return true
Expand All @@ -142,6 +160,9 @@ func isTrialComplete(instance *trialsv1alpha3.Trial, jobCondition *commonv1.JobC
}

func isJobSucceeded(jobCondition *commonv1.JobCondition) bool {
if jobCondition == nil {
return false
}
jobConditionType := (*jobCondition).Type
if jobConditionType == commonv1.JobSucceeded {
return true
Expand Down
7 changes: 4 additions & 3 deletions pkg/job/v1alpha3/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ func (j Job) GetDeployedJobStatus(
log.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
log.Info("NestedFieldCopy unstructured to status error",
"err", "Status is not found in job")
return &jobCondition, nil
// Job does not have the running condition in status, thus we think
// the job is running when it is created.
log.Info("NestedFieldCopy", "err", "status cannot be found in job")
return nil, nil
}

statusMap := status.(map[string]interface{})
Expand Down
5 changes: 4 additions & 1 deletion pkg/job/v1alpha3/kubeflow/kubeflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (k Kubeflow) GetDeployedJobStatus(
}
log.Info("NestedFieldCopy unstructured to status error",
"err", "Status is not found in job")
return &jobCondition, nil
return nil, nil
}

statusMap := status.(map[string]interface{})
Expand All @@ -46,10 +46,13 @@ func (k Kubeflow) GetDeployedJobStatus(
log.Error(err, "Convert unstructured to status error")
return nil, err
}
// Get the latest condition and set it to jobCondition.
if len(jobStatus.Conditions) > 0 {
lc := jobStatus.Conditions[len(jobStatus.Conditions)-1]
jobCondition.Type = lc.Type
jobCondition.Message = lc.Message
jobCondition.Status = lc.Status
jobCondition.Reason = lc.Reason
}

return &jobCondition, nil
Expand Down

0 comments on commit 5d46799

Please sign in to comment.