Skip to content

Commit

Permalink
Merge pull request #103 from flant/fix_delay_between_tasks
Browse files Browse the repository at this point in the history
fix: delay between tasks
  • Loading branch information
diafour committed Jan 24, 2020
2 parents 16936ec + 91a73b2 commit 10cf159
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions pkg/task/queue/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,32 +268,31 @@ func (q *TaskQueue) Stop() {

func (q *TaskQueue) Start() {
go func() {
var sleepDelay time.Duration
for {
var sleepDelay time.Duration

log.Debugf("queue %s: wait for task, delay %d", q.Name, sleepDelay)
var t = q.waitForTask(sleepDelay)
log.Debugf("queue %s: get task %s", q.Name, t.GetType())

// dump queue
log.Debugf("queue %s: tasks after wait %s", q.Name, q.String())

if t == nil {
log.Debugf("queue %s: got nil task, stop queue", q.Name)
return
}

// dump task and a whole queue
log.Debugf("queue %s: get task %s", q.Name, t.GetType())
log.Debugf("queue %s: tasks after wait %s", q.Name, q.String())

// Now the task can be handled!
if q.Handler == nil {
continue
}

var nextSleepDelay time.Duration
taskRes := q.Handler(t)
switch taskRes.Status {
case "Fail":
t.IncrementFailureCount()
// delay before retry
sleepDelay = DelayOnFailedTask
nextSleepDelay = DelayOnFailedTask
case "Success":
// add tasks after current task in reverse order
for i := len(taskRes.AfterTasks) - 1; i >= 0; i-- {
Expand All @@ -312,9 +311,11 @@ func (q *TaskQueue) Start() {
})
}
if taskRes.DelayBeforeNextTask != 0 {
sleepDelay = taskRes.DelayBeforeNextTask
nextSleepDelay = taskRes.DelayBeforeNextTask
}

sleepDelay = nextSleepDelay

// dump queue
log.Debugf("queue %s: tasks after handle %s", q.Name, q.String())
}
Expand Down

0 comments on commit 10cf159

Please sign in to comment.