Skip to content

Commit

Permalink
limit task retry when system error
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Dec 13, 2021
1 parent dd78a2f commit 942443f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
15 changes: 14 additions & 1 deletion apistructs/pipeline_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (

const (
// TerminusDefineTag add this tag env to container for collecting logs
TerminusDefineTag = "TERMINUS_DEFINE_TAG"
TerminusDefineTag = "TERMINUS_DEFINE_TAG"
PipelineTaskMaxRetryLimit = 144
PipelineTaskMaxRetryDuration = 24 * time.Hour
)

type PipelineTaskDTO struct {
Expand Down Expand Up @@ -267,6 +269,17 @@ func (t *PipelineTaskInspect) ConvertErrors() {
}
}

func (t *PipelineTaskInspect) IsErrorsExceed() (bool, *PipelineTaskErrResponse) {
now := time.Now()
for _, g := range t.Errors {
if (!g.Ctx.StartTime.IsZero() && g.Ctx.StartTime.Add(PipelineTaskMaxRetryDuration).Before(now)) ||
g.Ctx.Count >= PipelineTaskMaxRetryLimit {
return true, g
}
}
return false, nil
}

func (l *PipelineTaskLoop) Duplicate() *PipelineTaskLoop {
if l == nil {
return nil
Expand Down
21 changes: 21 additions & 0 deletions apistructs/pipeline_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package apistructs
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestPipelineTaskLoop_Duplicate(t *testing.T) {
Expand Down Expand Up @@ -95,3 +98,21 @@ func TestPipelineTaskLoop_IsEmpty(t *testing.T) {
})
}
}

func TestIsErrorsExceed(t *testing.T) {
now := time.Now()
timeExceedInspect := &PipelineTaskInspect{}
timeExceedInspect.Errors = timeExceedInspect.AppendError(&PipelineTaskErrResponse{Msg: "xxx", Ctx: PipelineTaskErrCtx{StartTime: now.Add(-25 * time.Hour)}})
isExceed, _ := timeExceedInspect.IsErrorsExceed()
assert.Equal(t, true, isExceed)

countExceedInspect := &PipelineTaskInspect{}
for i := 0; i < 143; i++ {
countExceedInspect.Errors = countExceedInspect.AppendError(&PipelineTaskErrResponse{Msg: "xxx"})
isExceed, _ = countExceedInspect.IsErrorsExceed()
assert.Equal(t, false, isExceed)
}
countExceedInspect.Errors = countExceedInspect.AppendError(&PipelineTaskErrResponse{Msg: "xxx"})
isExceed, _ = countExceedInspect.IsErrorsExceed()
assert.Equal(t, true, isExceed)
}
5 changes: 5 additions & 0 deletions modules/pipeline/pipengine/reconciler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func reconcileTask(tr *taskrun.TaskRun) error {
rlog.TErrorf(tr.P.ID, tr.Task.ID, "failed to handle taskOp: %s, user abnormalErr: %v, don't need retry", taskOp.Op(), abnormalErr)
return abnormalErr
}
if isExceed, errCtx := tr.Task.Inspect.IsErrorsExceed(); isExceed {
rlog.TErrorf(tr.P.ID, tr.Task.ID, "failed to handle taskOp: %s, errors exceed limit, stop retry, retry times: %d, start itme: %s, pipelineID: %d, taskID: %d",
taskOp.Op(), errCtx.Ctx.Count, errCtx.Ctx.StartTime.Format("2006-01-02 15:04:05"), tr.P.ID, tr.Task.ID)
return abnormalErr
}
// don't contain user error mean err is platform error, should retry always
rlog.TErrorf(tr.P.ID, tr.Task.ID, "failed to handle taskOp: %s, abnormalErr: %v, continue retry, retry times: %d", taskOp.Op(), abnormalErr, platformErrRetryTimes)
resetTaskForAbnormalRetry(tr, platformErrRetryTimes)
Expand Down
3 changes: 2 additions & 1 deletion modules/pipeline/pipengine/reconciler/taskrun/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func (tr *TaskRun) waitOp(itr TaskOp, o *Elem) (result error) {
}

// if result only contain platform error, task will retry, so don't set status changed
if result != nil && !errorsx.IsContainUserError(result) {
isExceed, _ := tr.Task.Inspect.IsErrorsExceed()
if result != nil && !errorsx.IsContainUserError(result) && !isExceed {
tr.Task.Status = oldStatus
}

Expand Down

0 comments on commit 942443f

Please sign in to comment.