Skip to content

Commit

Permalink
only remove past if >1, sort next scheduled
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Mar 29, 2024
1 parent 4dd4129 commit 700e016
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
15 changes: 6 additions & 9 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,12 @@ func TestJob_NextRun(t *testing.T) {
// run a job every 10 milliseconds that starts 10 milliseconds after the current time
j, err := s.NewJob(
DurationJob(
10*time.Millisecond,
100*time.Millisecond,
),
NewTask(
func() {},
),
WithStartAt(WithStartDateTime(testTime.Add(10*time.Millisecond))),
WithStartAt(WithStartDateTime(testTime.Add(100*time.Millisecond))),
WithSingletonMode(LimitModeReschedule),
)
require.NoError(t, err)
Expand All @@ -515,18 +515,15 @@ func TestJob_NextRun(t *testing.T) {
nextRun, err := j.NextRun()
require.NoError(t, err)

// `NextRun` should report `testTime.Add(10*time.Millisecond)`
assert.Equal(t, testTime.Add(10*time.Millisecond), nextRun)
assert.Equal(t, testTime.Add(100*time.Millisecond), nextRun)

// sleep for 11ms to wait for the next job
time.Sleep(11 * time.Millisecond)
time.Sleep(150 * time.Millisecond)

nextRun, err = j.NextRun()
assert.NoError(t, err)

// `NextRun` should report a time 20 milliseconds after `testTime`, but instead reports a value that is `30ms` after
assert.Equal(t, testTime.Add(20*time.Millisecond), nextRun)
assert.Equal(t, 20*time.Millisecond, nextRun.Sub(testTime))
assert.Equal(t, testTime.Add(200*time.Millisecond), nextRun)
assert.Equal(t, 200*time.Millisecond, nextRun.Sub(testTime))

err = s.Shutdown()
require.NoError(t, err)
Expand Down
19 changes: 13 additions & 6 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
// always grab the last element in the slice as that is the furthest
// out in the future and the time from which we want to calculate
// the subsequent next run time.
slices.SortStableFunc(j.nextScheduled, func(a, b time.Time) int {
return a.Compare(b)
})
j.lastScheduledRun = j.nextScheduled[len(j.nextScheduled)-1]
}

Expand Down Expand Up @@ -345,14 +348,18 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
return
}

var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
// if the job has more than one nextScheduled time,
// we need to remove any that are in the past.
if len(j.nextScheduled) > 1 {
var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
}
newNextScheduled = append(newNextScheduled, t)
}
newNextScheduled = append(newNextScheduled, t)
j.nextScheduled = newNextScheduled
}
j.nextScheduled = newNextScheduled

// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
Expand Down
2 changes: 1 addition & 1 deletion scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
WithSingletonMode(LimitModeReschedule),
},
func() time.Duration {
return 20 * time.Second
return 10 * time.Second
},
1,
},
Expand Down

0 comments on commit 700e016

Please sign in to comment.