Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExecutionTimeout to fail executions instead of jobs #3974

Merged
merged 12 commits into from
May 13, 2024
3 changes: 3 additions & 0 deletions pkg/jobstore/boltdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ func (s *BoltJobstoreTestSuite) SetupTest() {
s.clock.Add(1 * time.Second)
execution := mock.ExecutionForJob(job)
execution.ComputeState.StateType = models.ExecutionStateNew
// clear out CreateTime and ModifyTime from the mocked execution to let the job store fill those
execution.CreateTime = 0
execution.ModifyTime = 0
err = s.store.CreateExecution(s.ctx, *execution, models.Event{})
s.Require().NoError(err)

Expand Down
85 changes: 81 additions & 4 deletions pkg/models/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/bacalhau-project/bacalhau/pkg/lib/math"
"github.com/bacalhau-project/bacalhau/pkg/util/idgen"
)

const (
Expand All @@ -16,10 +17,11 @@ const (
)

const (
EvalTriggerJobRegister = "job-register"
EvalTriggerJobCancel = "job-cancel"
EvalTriggerRetryFailedExec = "exec-failure"
EvalTriggerExecUpdate = "exec-update"
EvalTriggerJobRegister = "job-register"
EvalTriggerJobCancel = "job-cancel"
EvalTriggerExecFailure = "exec-failure"
EvalTriggerExecUpdate = "exec-update"
EvalTriggerExecTimeout = "exec-timeout"
)

// Evaluation is just to ask the scheduler to reassess if additional job instances must be
Expand Down Expand Up @@ -61,6 +63,81 @@ type Evaluation struct {
ModifyTime int64 `json:"ModifyTime"`
}

// NewEvaluation creates a new Evaluation.
func NewEvaluation() *Evaluation {
return &Evaluation{
ID: idgen.NewEvaluationID(),
Status: EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
}

// WithJobID sets the JobID of the Evaluation.
func (e *Evaluation) WithJobID(jobID string) *Evaluation {
e.JobID = jobID
return e
}

// WithNamespace sets the Namespace of the Evaluation.
func (e *Evaluation) WithNamespace(namespace string) *Evaluation {
e.Namespace = namespace
return e
}

// WithTriggeredBy sets the TriggeredBy of the Evaluation.
func (e *Evaluation) WithTriggeredBy(triggeredBy string) *Evaluation {
e.TriggeredBy = triggeredBy
return e
}

// WithPriority sets the Priority of the Evaluation.
func (e *Evaluation) WithPriority(priority int) *Evaluation {
e.Priority = priority
return e
}

// WithType sets the Type of the Evaluation.
func (e *Evaluation) WithType(jobType string) *Evaluation {
e.Type = jobType
return e
}

// WithStatus sets the Status of the Evaluation.
func (e *Evaluation) WithStatus(status string) *Evaluation {
e.Status = status
return e
}

// WithComment sets the Comment of the Evaluation.
func (e *Evaluation) WithComment(comment string) *Evaluation {
e.Comment = comment
return e
}

// WithWaitUntil sets the WaitUntil of the Evaluation.
func (e *Evaluation) WithWaitUntil(waitUntil time.Time) *Evaluation {
e.WaitUntil = waitUntil
return e
}

// Normalize ensures that the Evaluation is in a valid state.
func (e *Evaluation) Normalize() *Evaluation {
if e.ID == "" {
e.ID = idgen.NewEvaluationID()
}
if e.Status == "" {
e.Status = EvalStatusPending
}
if e.CreateTime == 0 {
e.CreateTime = time.Now().UTC().UnixNano()
}
if e.ModifyTime == 0 {
e.ModifyTime = time.Now().UTC().UnixNano()
}
return e
}

// TerminalStatus returns if the current status is terminal and
// will no longer transition.
func (e *Evaluation) TerminalStatus() bool {
Expand Down
7 changes: 6 additions & 1 deletion pkg/models/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ func (e *Event) WithMessage(message string) *Event {

// WithError returns a new Event with the given error.
func (e *Event) WithError(err error) *Event {
e.Message = err.Error()
return e.WithErrorMessage(err.Error())
}

// WithErrorMessage returns a new Event with the given error.
func (e *Event) WithErrorMessage(message string) *Event {
e.Message = message
return e.WithDetail(DetailsKeyIsError, "true")
}
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
6 changes: 6 additions & 0 deletions pkg/models/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func (e *Execution) GetModifyTime() time.Time {
return time.Unix(0, e.ModifyTime).UTC()
}

// HasExecutionExpired returns true if the execution has been running longer than
// the provided timeout duration.
func (e *Execution) HasExecutionExpired(timeout time.Duration) bool {
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved
return e.ComputeState.StateType == ExecutionStateBidAccepted && time.Since(e.GetModifyTime()) > timeout
}

// Normalize Allocation to ensure fields are initialized to the expectations
// of this version of Bacalhau. Should be called when restoring persisted
// Executions or receiving Executions from Bacalhau clients potentially on an
Expand Down
2 changes: 2 additions & 0 deletions pkg/node/config_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var DefaultRequesterConfig = RequesterConfigParams{
},

HousekeepingBackgroundTaskInterval: 30 * time.Second,
HousekeepingTimeoutBuffer: 2 * time.Minute,
NodeRankRandomnessRange: 5,
OverAskForBidsFactor: 3,

Expand Down Expand Up @@ -83,6 +84,7 @@ var TestRequesterConfig = RequesterConfigParams{
ExecutionTimeout: 30 * time.Second,
},
HousekeepingBackgroundTaskInterval: 30 * time.Second,
HousekeepingTimeoutBuffer: 100 * time.Millisecond,
NodeRankRandomnessRange: 5,
OverAskForBidsFactor: 3,

Expand Down
1 change: 1 addition & 0 deletions pkg/node/config_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type RequesterConfigParams struct {
JobDefaults transformer.JobDefaults

HousekeepingBackgroundTaskInterval time.Duration
HousekeepingTimeoutBuffer time.Duration
NodeRankRandomnessRange int
OverAskForBidsFactor uint
JobSelectionPolicy JobSelectionPolicy
Expand Down
16 changes: 10 additions & 6 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,16 @@ func NewRequesterNode(
ResultTransformer: resultTransformers,
})

housekeeping := requester.NewHousekeeping(requester.HousekeepingParams{
Endpoint: endpoint,
JobStore: jobStore,
NodeID: nodeID,
Interval: requesterConfig.HousekeepingBackgroundTaskInterval,
housekeeping, err := orchestrator.NewHousekeeping(orchestrator.HousekeepingParams{
EvaluationBroker: evalBroker,
JobStore: jobStore,
Interval: requesterConfig.HousekeepingBackgroundTaskInterval,
TimeoutBuffer: requesterConfig.HousekeepingTimeoutBuffer,
})
if err != nil {
return nil, err
}
housekeeping.Start(ctx)

// register debug info providers for the /debug endpoint
debugInfoProviders := []model.DebugInfoProvider{
Expand Down Expand Up @@ -301,7 +305,7 @@ func NewRequesterNode(
// A single Cleanup function to make sure the order of closing dependencies is correct
cleanupFunc := func(ctx context.Context) {
// stop the housekeeping background task
housekeeping.Stop()
housekeeping.Stop(ctx)
for _, worker := range workers {
worker.Stop()
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/orchestrator/events.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package orchestrator

import (
"fmt"
"time"

"github.com/bacalhau-project/bacalhau/pkg/models"
)

const (
EventTopicJobSubmission models.EventTopic = "Submission"
EventTopicJobScheduling models.EventTopic = "Scheduling"
EventTopicJobSubmission models.EventTopic = "Submission"
EventTopicJobScheduling models.EventTopic = "Scheduling"
EventTopicExecutionTimeout models.EventTopic = "Exec Timeout"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid including spaces in topic names. Let's opt for Camel Case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topics main use is improving readability of events and let users understand which component or lifecycle or job orchestration the event is related. Not an internal detail and not intended to be consumed to other applications.

As the goal is improve human readability, allowing spaces can be improve the user experience and will improve how we display them on the CLI and UI without having to truncate them or wrap them at weird palces

)

const (
Expand All @@ -23,6 +25,9 @@ const (
execStoppedByOversubscriptionMessage = "Execution stop requested because there are more executions than needed"
execRejectedByNodeMessage = "Node responded to execution run request"
execFailedMessage = "Execution did not complete successfully"

executionTimeoutMessage = "Execution timed out"
executionTimeoutHint = "Try increasing the task timeout or reducing the task size"
)

func event(topic models.EventTopic, msg string, details map[string]string) models.Event {
Expand Down Expand Up @@ -63,6 +68,14 @@ func ExecStoppedByNodeUnhealthyEvent() models.Event {
return event(EventTopicJobScheduling, execStoppedByNodeUnhealthyMessage, map[string]string{})
}

func ExecStoppedByExecutionTimeoutEvent(timeout time.Duration) models.Event {
e := models.NewEvent(EventTopicExecutionTimeout).
WithErrorMessage(fmt.Sprintf("%s. Execution took longer than %s", executionTimeoutMessage, timeout)).
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved
WithHint(executionTimeoutHint).
WithFailsExecution(true)
return *e
}

func ExecStoppedByNodeRejectedEvent() models.Event {
return event(EventTopicJobScheduling, execStoppedByNodeRejectedMessage, map[string]string{})
}
Expand Down