diff --git a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go index 5ebe1d0075..70d7c7e2f9 100644 --- a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go @@ -234,10 +234,14 @@ func (p Plugin) sendRequest(method string, databricksJob map[string]interface{}, return nil, err } var data map[string]interface{} - err = json.Unmarshal(responseBody, &data) - if err != nil { - return nil, fmt.Errorf("failed to parse response with err: [%v]", err) + + if len(responseBody) != 0 { + err = json.Unmarshal(responseBody, &data) + if err != nil { + return nil, fmt.Errorf("failed to parse response with err: [%v]", err) + } } + if resp.StatusCode != http.StatusOK { message := "" if v, ok := data["message"]; ok { @@ -259,10 +263,16 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase taskInfo := createTaskInfo(exec.RunID, jobID, exec.DatabricksInstance) switch lifeCycleState { // Job response format. https://docs.databricks.com/en/workflows/jobs/jobs-2.0-api.html#runlifecyclestate + case "QUEUED": + return core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, message), nil case "PENDING": return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, message, taskInfo), nil case "RUNNING": fallthrough + case "BLOCKED": + fallthrough + case "WAITING_FOR_RETRY": + fallthrough case "TERMINATING": return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil case "TERMINATED":