Skip to content

Commit

Permalink
Added JobsWaitingInQueue in Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
0x01F4 committed Apr 30, 2024
1 parent cde2513 commit 52de599
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
10 changes: 10 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Scheduler interface {
// Update replaces the existing Job's JobDefinition with the provided
// JobDefinition. The Job's Job.ID() remains the same.
Update(uuid.UUID, JobDefinition, Task, ...JobOption) (Job, error)
// JobsWaitingInQueue number of jobs waiting in Queue in case of LimitModeWait
// In case of LimitModeReschedule or no limit it will be always zero
JobsWaitingInQueue() int
}

// -----------------------------------------------
Expand Down Expand Up @@ -678,6 +681,13 @@ func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition, task Task,
return s.addOrUpdateJob(id, jobDefinition, task, options)
}

func (s *scheduler) JobsWaitingInQueue() int {
if s.exec.limitMode != nil && s.exec.limitMode.mode == LimitModeWait {
return len(s.exec.limitMode.in)
}
return 0
}

// -----------------------------------------------
// -----------------------------------------------
// ------------- Scheduler Options ---------------
Expand Down
53 changes: 53 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,59 @@ func TestScheduler_RemoveJob(t *testing.T) {
}
}

func TestScheduler_JobsWaitingInQueue(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
limit uint
mode LimitMode
startAt func() OneTimeJobStartAtOption
expectedInQueue int
}{
{
"with mode wait limit 1",
1,
LimitModeWait,
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond))
},
4,
},
{
"with mode wait limit 10",
10,
LimitModeWait,
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond))
},
0,
},
{
"with mode Reschedule",
1,
LimitModeReschedule,
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond))
},
0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, WithLimitConcurrentJobs(tt.limit, tt.mode))
for i := 0; i <= 4; i++ {
_, err := s.NewJob(OneTimeJob(tt.startAt()), NewTask(func() { time.Sleep(10 * time.Second) }))
require.NoError(t, err)
}
s.Start()
time.Sleep(100 * time.Millisecond)
assert.Equal(t, tt.expectedInQueue, s.JobsWaitingInQueue())
_ = s.Shutdown()
})
}
}

func TestScheduler_RemoveLotsOfJobs(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
Expand Down

0 comments on commit 52de599

Please sign in to comment.