Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add another out channel so we can properly report lastRun #700

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ import (
)

type executor struct {
ctx context.Context
cancel context.CancelFunc
logger Logger
stopCh chan struct{}
jobsIn chan jobIn
jobIDsOut chan uuid.UUID
jobOutRequest chan jobOutRequest
stopTimeout time.Duration
done chan error
singletonRunners *sync.Map // map[uuid.UUID]singletonRunner
limitMode *limitModeConfig
elector Elector
locker Locker
monitor Monitor
ctx context.Context
cancel context.CancelFunc
logger Logger
stopCh chan struct{}
jobsIn chan jobIn
jobsOutForRescheduling chan uuid.UUID
jobsOutCompleted chan uuid.UUID
jobOutRequest chan jobOutRequest
stopTimeout time.Duration
done chan error
singletonRunners *sync.Map // map[uuid.UUID]singletonRunner
limitMode *limitModeConfig
elector Elector
locker Locker
monitor Monitor
}

type jobIn struct {
Expand Down Expand Up @@ -122,7 +123,7 @@ func (e *executor) start() {
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
}
} else {
// since we're not using LimitModeReschedule, but instead using LimitModeWait
Expand All @@ -131,7 +132,7 @@ func (e *executor) start() {
// at which point this call would block.
// TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- jIn
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
}
} else {
// no limit mode, so we're either running a regular job or
Expand Down Expand Up @@ -167,17 +168,17 @@ func (e *executor) start() {
select {
case runner.rescheduleLimiter <- struct{}{}:
runner.in <- jIn
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
default:
// runner is busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
}
} else {
// wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- jIn
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
}
} else {
select {
Expand Down Expand Up @@ -206,10 +207,10 @@ func (e *executor) start() {
}
}

func (e *executor) sendOutToScheduler(jIn *jobIn) {
func (e *executor) sendOutForRescheduling(jIn *jobIn) {
if jIn.shouldSendOut {
select {
case e.jobIDsOut <- jIn.id:
case e.jobsOutForRescheduling <- jIn.id:
case <-e.ctx.Done():
return
}
Expand Down Expand Up @@ -250,7 +251,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith
return
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
case e.jobsOutForRescheduling <- j.id:
}
}
// remove the limiter block, as this particular job
Expand Down Expand Up @@ -331,20 +332,24 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {

if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
return
}
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)

e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}

startTime := time.Now()
err := callJobFuncWithParams(j.function, j.parameters...)
Expand Down
24 changes: 13 additions & 11 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ type internalJob struct {
name string
tags []string
jobSchedule
lastRun, nextRun time.Time
lastScheduledRun time.Time
nextScheduled time.Time
lastRun time.Time
function any
parameters []any
timer clockwork.Timer
Expand Down Expand Up @@ -681,18 +683,18 @@ func (d dailyJob) next(lastRun time.Time) time.Time {

func (d dailyJob) nextDay(lastRun time.Time, firstPass bool) time.Time {
for _, at := range d.atTimes {
// sub the at time hour/min/sec onto the lastRun's values
// sub the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())

if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than,
// and not greater or equal as our lastRun day/time
// and not greater or equal as our lastScheduledRun day/time
// will be in the loop, and we don't want to select it again
return atDate
} else if !firstPass && !atDate.Before(lastRun) {
// now that we're looking at the next day, it's ok to consider
// the same at time that was last run (as lastRun has been incremented)
// the same at time that was last run (as lastScheduledRun has been incremented)
return atDate
}
}
Expand Down Expand Up @@ -727,18 +729,18 @@ func (w weeklyJob) nextWeekDayAtTime(lastRun time.Time, firstPass bool) time.Tim
// weekDayDiff is used to add the correct amount to the atDate day below
weekDayDiff := wd - lastRun.Weekday()
for _, at := range w.atTimes {
// sub the at time hour/min/sec onto the lastRun's values
// sub the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())

if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than,
// and not greater or equal as our lastRun day/time
// and not greater or equal as our lastScheduledRun day/time
// will be in the loop, and we don't want to select it again
return atDate
} else if !firstPass && !atDate.Before(lastRun) {
// now that we're looking at the next week, it's ok to consider
// the same at time that was last run (as lastRun has been incremented)
// the same at time that was last run (as lastScheduledRun has been incremented)
return atDate
}
}
Expand Down Expand Up @@ -795,7 +797,7 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass
for _, day := range days {
if day >= lastRun.Day() {
for _, at := range m.atTimes {
// sub the day, and the at time hour/min/sec onto the lastRun's values
// sub the day, and the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())

Expand All @@ -807,12 +809,12 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass

if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than,
// and not greater or equal as our lastRun day/time
// and not greater or equal as our lastScheduledRun day/time
// will be in the loop, and we don't want to select it again
return atDate
} else if !firstPass && !atDate.Before(lastRun) {
// now that we're looking at the next month, it's ok to consider
// the same at time that was lastRun (as lastRun has been incremented)
// the same at time that was lastScheduledRun (as lastScheduledRun has been incremented)
return atDate
}
}
Expand Down Expand Up @@ -892,7 +894,7 @@ func (j job) NextRun() (time.Time, error) {
if ij == nil || ij.id == uuid.Nil {
return time.Time{}, ErrJobNotFound
}
return ij.nextRun, nil
return ij.nextScheduled, nil
}

func (j job) Tags() []string {
Expand Down
37 changes: 25 additions & 12 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
singletonRunners: nil,
logger: &noOpLogger{},

jobsIn: make(chan jobIn),
jobIDsOut: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
}

s := &scheduler{
Expand Down Expand Up @@ -147,8 +148,11 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
s.logger.Info("gocron: new scheduler created")
for {
select {
case id := <-s.exec.jobIDsOut:
s.selectExecJobIDsOut(id)
case id := <-s.exec.jobsOutForRescheduling:
s.selectExecJobsOutForRescheduling(id)

case id := <-s.exec.jobsOutCompleted:
s.selectExecJobsOutCompleted(id)

case in := <-s.newJobCh:
s.selectNewJob(in)
Expand Down Expand Up @@ -287,14 +291,14 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) {

// Jobs coming back from the executor to the scheduler that
// need to evaluated for rescheduling.
func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
j, ok := s.jobs[id]
if !ok {
// the job was removed while it was running, and
// so we don't need to reschedule it.
return
}
j.lastRun = j.nextRun
j.lastScheduledRun = j.nextScheduled

// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
Expand All @@ -313,7 +317,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
}
}

next := j.next(j.lastRun)
next := j.next(j.lastScheduledRun)
if next.IsZero() {
// the job's next function will return zero for OneTime jobs.
// since they are one time only, they do not need rescheduling.
Expand All @@ -329,7 +333,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
next = j.next(next)
}
}
j.nextRun = next
j.nextScheduled = next
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
// set the actual timer on the job here and listen for
// shut down events so that the job doesn't attempt to
Expand All @@ -347,6 +351,15 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
s.jobs[id] = j
}

func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
j, ok := s.jobs[id]
if !ok {
return
}
j.lastRun = s.now()
s.jobs[id] = j
}

func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
if j, ok := s.jobs[out.id]; ok {
select {
Expand Down Expand Up @@ -386,7 +399,7 @@ func (s *scheduler) selectNewJob(in newJobIn) {
}
})
}
j.nextRun = next
j.nextScheduled = next
}

s.jobs[j.id] = j
Expand Down Expand Up @@ -437,7 +450,7 @@ func (s *scheduler) selectStart() {
}
})
}
j.nextRun = next
j.nextScheduled = next
s.jobs[id] = j
}
select {
Expand Down
Loading
Loading