Skip to content

Commit

Permalink
issue-740: ascending time function
Browse files Browse the repository at this point in the history
  • Loading branch information
rbroggi committed Jun 22, 2024
1 parent 64d6e48 commit ebe98f5
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 26 deletions.
21 changes: 3 additions & 18 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"math/rand"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -448,11 +447,9 @@ type oneTimeJobDefinition struct {

func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error {
sortedTimes := o.startAt(j)
sort.Slice(sortedTimes, func(i, j int) bool {
return sortedTimes[i].Before(sortedTimes[j])
})
slices.SortStableFunc(sortedTimes, AscendingTime)
// keep only schedules that are in the future
idx, found := slices.BinarySearchFunc(sortedTimes, now, timeCmp())
idx, found := slices.BinarySearchFunc(sortedTimes, now, AscendingTime)
if found {
idx++
}
Expand Down Expand Up @@ -499,18 +496,6 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition {
}
}

func timeCmp() func(element time.Time, target time.Time) int {
return func(element time.Time, target time.Time) int {
if element.Equal(target) {
return 0
}
if element.Before(target) {
return -1
}
return 1
}
}

// -----------------------------------------------
// -----------------------------------------------
// ----------------- Job Options -----------------
Expand Down Expand Up @@ -917,7 +902,7 @@ type oneTimeJob struct {
// lastRun: 8 => [idx=3,found=found] => next is none
// lastRun: 9 => [idx=3,found=found] => next is none
func (o oneTimeJob) next(lastRun time.Time) time.Time {
idx, found := slices.BinarySearchFunc(o.sortedTimes, lastRun, timeCmp())
idx, found := slices.BinarySearchFunc(o.sortedTimes, lastRun, AscendingTime)
// if found, the next run is the following index
if found {
idx++
Expand Down
4 changes: 1 addition & 3 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
// always grab the last element in the slice as that is the furthest
// out in the future and the time from which we want to calculate
// the subsequent next run time.
slices.SortStableFunc(j.nextScheduled, func(a, b time.Time) int {
return a.Compare(b)
})
slices.SortStableFunc(j.nextScheduled, AscendingTime)
scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1]
}

Expand Down
4 changes: 2 additions & 2 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2190,8 +2190,8 @@ func TestScheduler_AtTimesJob(t *testing.T) {
},
},
{
name: "two runs in the future",
atTimes: []time.Time{n.Add(1 * time.Millisecond), n.Add(3 * time.Millisecond)},
name: "two runs in the future - order is maintained even if times are provided out of order",
atTimes: []time.Time{n.Add(3 * time.Millisecond), n.Add(1 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
Expand Down
8 changes: 5 additions & 3 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ func convertAtTimesToDateTime(atTimes AtTimes, location *time.Location) ([]time.
}
atTimesDate = append(atTimesDate, at.time(location))
}
slices.SortStableFunc(atTimesDate, func(a, b time.Time) int {
return a.Compare(b)
})
slices.SortStableFunc(atTimesDate, AscendingTime)
return atTimesDate, nil
}

func AscendingTime(a, b time.Time) int {

Check failure on line 95 in util.go

View workflow job for this annotation

GitHub Actions / lint and test (1.21)

exported: exported function AscendingTime should have comment or be unexported (revive)

Check failure on line 95 in util.go

View workflow job for this annotation

GitHub Actions / lint and test (1.22)

exported: exported function AscendingTime should have comment or be unexported (revive)
return a.Compare(b)
}

type waitGroupWithMutex struct {
wg sync.WaitGroup
mu sync.Mutex
Expand Down

0 comments on commit ebe98f5

Please sign in to comment.