diff --git a/executor.go b/executor.go index 109805e..eae65e6 100644 --- a/executor.go +++ b/executor.go @@ -122,12 +122,7 @@ func (e *executor) start() { // all runners are busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - if jIn.shouldSendOut { - select { - case e.jobIDsOut <- jIn.id: - default: - } - } + e.sendOutToScheduler(&jIn) } } else { // since we're not using LimitModeReschedule, but instead using LimitModeWait @@ -136,6 +131,7 @@ func (e *executor) start() { // at which point this call would block. // TODO when metrics are added, this should increment a wait metric e.limitMode.in <- jIn + e.sendOutToScheduler(&jIn) } } else { // no limit mode, so we're either running a regular job or @@ -171,20 +167,17 @@ func (e *executor) start() { select { case runner.rescheduleLimiter <- struct{}{}: runner.in <- jIn + e.sendOutToScheduler(&jIn) default: // runner is busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - if jIn.shouldSendOut { - select { - case e.jobIDsOut <- jIn.id: - default: - } - } + e.sendOutToScheduler(&jIn) } } else { // wait mode, fill up that queue (buffered channel, so it's ok) runner.in <- jIn + e.sendOutToScheduler(&jIn) } } else { select { @@ -213,6 +206,20 @@ func (e *executor) start() { } } +func (e *executor) sendOutToScheduler(jIn *jobIn) { + if jIn.shouldSendOut { + select { + case e.jobIDsOut <- jIn.id: + case <-e.ctx.Done(): + return + } + } + // we need to set this to false now, because to handle + // non-limit jobs, we send out from the e.runJob function + // and in this case we don't want to send out twice. + jIn.shouldSendOut = false +} + func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { e.logger.Debug("gocron: limitModeRunner starting", "name", name) for { @@ -250,10 +257,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith // was a singleton already running, and we want to // allow another job to be scheduled if limitMode == LimitModeReschedule { - select { - case <-rescheduleLimiter: - default: - } + <-rescheduleLimiter } continue } @@ -271,10 +275,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith // remove the limiter block to allow another job to be scheduled if limitMode == LimitModeReschedule { - select { - case <-rescheduleLimiter: - default: - } + <-rescheduleLimiter } case <-e.ctx.Done(): e.logger.Debug("limitModeRunner shutting down", "name", name) @@ -306,10 +307,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup // remove the limiter block to allow another job to be scheduled if limitMode == LimitModeReschedule { - select { - case <-rescheduleLimiter: - default: - } + <-rescheduleLimiter } case <-e.ctx.Done(): e.logger.Debug("singletonModeRunner shutting down", "name", name) diff --git a/job.go b/job.go index 6b0649e..3dd2a1c 100644 --- a/job.go +++ b/job.go @@ -846,7 +846,9 @@ type Job interface { NextRun() (time.Time, error) // RunNow runs the job once, now. This does not alter // the existing run schedule, and will respect all job - // and scheduler limits. + // and scheduler limits. This means that running a job now may + // cause the job's regular interval to be rescheduled due to + // the instance being run by RunNow blocking your run limit. RunNow() error // Tags returns the job's string tags. Tags() []string diff --git a/scheduler_test.go b/scheduler_test.go index b7c05ca..6727408 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/google/uuid" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -1508,7 +1507,7 @@ func TestScheduler_RunJobNow(t *testing.T) { WithSingletonMode(LimitModeReschedule), }, func() time.Duration { - return 10 * time.Second + return 20 * time.Second }, 1, },