Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added JobsWaitingInQueue in Scheduler #721

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Scheduler interface {
// Update replaces the existing Job's JobDefinition with the provided
// JobDefinition. The Job's Job.ID() remains the same.
Update(uuid.UUID, JobDefinition, Task, ...JobOption) (Job, error)
// JobsWaitingInQueue number of jobs waiting in Queue in case of LimitModeWait
// In case of LimitModeReschedule or no limit it will be always zero
JobsWaitingInQueue() int
}

// -----------------------------------------------
Expand Down Expand Up @@ -678,6 +681,13 @@ func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition, task Task,
return s.addOrUpdateJob(id, jobDefinition, task, options)
}

func (s *scheduler) JobsWaitingInQueue() int {
if s.exec.limitMode != nil && s.exec.limitMode.mode == LimitModeWait {
return len(s.exec.limitMode.in)
}
return 0
}

// -----------------------------------------------
// -----------------------------------------------
// ------------- Scheduler Options ---------------
Expand Down
85 changes: 69 additions & 16 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func TestScheduler_StopTimeout(t *testing.T) {
}

func TestScheduler_Shutdown(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)

t.Run("start, stop, start, shutdown", func(t *testing.T) {
s := newTestScheduler(t,
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestScheduler_Shutdown(t *testing.T) {
}

func TestScheduler_NewJob(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
jd JobDefinition
Expand Down Expand Up @@ -462,7 +462,7 @@ func TestScheduler_NewJob(t *testing.T) {
}

func TestScheduler_NewJobErrors(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
jd JobDefinition
Expand Down Expand Up @@ -762,7 +762,7 @@ func TestScheduler_NewJobErrors(t *testing.T) {
}

func TestScheduler_NewJobTask(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)

testFuncPtr := func() {}
testFuncWithParams := func(one, two string) {}
Expand Down Expand Up @@ -867,7 +867,7 @@ func TestScheduler_NewJobTask(t *testing.T) {
}

func TestScheduler_WithOptionsErrors(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
opt SchedulerOption
Expand Down Expand Up @@ -929,7 +929,7 @@ func TestScheduler_WithOptionsErrors(t *testing.T) {
}

func TestScheduler_Singleton(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
duration time.Duration
Expand Down Expand Up @@ -992,7 +992,7 @@ func TestScheduler_Singleton(t *testing.T) {
}

func TestScheduler_LimitMode(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
numJobs int
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func TestScheduler_LimitMode(t *testing.T) {
}

func TestScheduler_LimitModeAndSingleton(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
numJobs int
Expand Down Expand Up @@ -1205,7 +1205,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
notLocked := make(chan struct{}, 10)
notLeader := make(chan struct{}, 10)

goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
count int
Expand Down Expand Up @@ -1354,7 +1354,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
}

func TestScheduler_RemoveJob(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
addJob bool
Expand Down Expand Up @@ -1392,8 +1392,61 @@ func TestScheduler_RemoveJob(t *testing.T) {
}
}

func TestScheduler_JobsWaitingInQueue(t *testing.T) {
defer goleak.VerifyNone(t)
tests := []struct {
name string
limit uint
mode LimitMode
startAt func() OneTimeJobStartAtOption
expectedInQueue int
}{
{
"with mode wait limit 1",
1,
LimitModeWait,
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond))
},
4,
},
{
"with mode wait limit 10",
10,
LimitModeWait,
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond))
},
0,
},
{
"with mode Reschedule",
1,
LimitModeReschedule,
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond))
},
0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, WithLimitConcurrentJobs(tt.limit, tt.mode))
for i := 0; i <= 4; i++ {
_, err := s.NewJob(OneTimeJob(tt.startAt()), NewTask(func() { time.Sleep(500 * time.Millisecond) }))
require.NoError(t, err)
}
s.Start()
time.Sleep(20 * time.Millisecond)
assert.Equal(t, tt.expectedInQueue, s.JobsWaitingInQueue())
require.NoError(t, s.Shutdown())
})
}
}

func TestScheduler_RemoveLotsOfJobs(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
numJobs int
Expand Down Expand Up @@ -1435,7 +1488,7 @@ func TestScheduler_RemoveLotsOfJobs(t *testing.T) {
}

func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
s := newTestScheduler(t)
s.Start()

Expand All @@ -1458,7 +1511,7 @@ func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) {
}

func TestScheduler_WithEventListeners(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)

listenerRunCh := make(chan error, 1)
testErr := fmt.Errorf("test error")
Expand Down Expand Up @@ -1733,7 +1786,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
}

func TestScheduler_LastRunSingleton(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)

tests := []struct {
name string
Expand Down Expand Up @@ -1846,7 +1899,7 @@ func TestScheduler_OneTimeJob(t *testing.T) {
}

func TestScheduler_WithLimitedRuns(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)

tests := []struct {
name string
Expand Down Expand Up @@ -1984,7 +2037,7 @@ func (t *testMonitor) RecordJobTiming(startTime, endTime time.Time, _ uuid.UUID,
}

func TestScheduler_WithMonitor(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
jd JobDefinition
Expand Down
Loading