Skip to content

Commit

Permalink
add new features, OneTimeJob and Job.RunNow() (#646)
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Dec 19, 2023
1 parent f7cd2bc commit 6e15f16
Show file tree
Hide file tree
Showing 8 changed files with 483 additions and 55 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ Jobs can be run every x days at specific times.
Jobs can be run every x weeks on specific days of the week and at specific times.
- [**Monthly**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#MonthlyJob):
Jobs can be run every x months on specific days of the month and at specific times.
- [**One time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#OneTimeJob):
Jobs can be run once at a specific time. These are non-recurring jobs.

### Concurrency Limits
Jobs can be limited individually or across the entire scheduler.
Expand Down
2 changes: 2 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var (
ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration")
ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil")
ErrJobNotFound = fmt.Errorf("gocron: job not found")
ErrJobRunNowFailed = fmt.Errorf("gocron: Job: RunNow: scheduler unreachable")
ErrMonthlyJobDays = fmt.Errorf("gocron: MonthlyJob: daysOfTheMonth must be between 31 and -31 inclusive, and not 0")
ErrMonthlyJobAtTimeNil = fmt.Errorf("gocron: MonthlyJob: atTime within atTimes must not be nil")
ErrMonthlyJobAtTimesNil = fmt.Errorf("gocron: MonthlyJob: atTimes must not be nil")
Expand All @@ -22,6 +23,7 @@ var (
ErrNewJobTaskNotFunc = fmt.Errorf("gocron: NewJob: Task.Function must be of kind reflect.Func")
ErrNewJobWrongNumberOfParameters = fmt.Errorf("gocron: NewJob: Number of provided parameters does not match expected")
ErrNewJobWrongTypeOfParameters = fmt.Errorf("gocron: NewJob: Type of provided parameters does not match expected")
ErrOneTimeJobStartDateTimePast = fmt.Errorf("gocron: OneTimeJob: start must not be in the past")
ErrStopExecutorTimedOut = fmt.Errorf("gocron: timed out waiting for executor to stop")
ErrStopJobsTimedOut = fmt.Errorf("gocron: timed out waiting for jobs to finish")
ErrStopSchedulerTimedOut = fmt.Errorf("gocron: timed out waiting for scheduler to stop")
Expand Down
48 changes: 48 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,28 @@ func ExampleJob_NextRun() {
fmt.Println(j.NextRun())
}

func ExampleJob_RunNow() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()

j, _ := s.NewJob(
MonthlyJob(
1,
NewDaysOfTheMonth(3, -5, -1),
NewAtTimes(
NewAtTime(10, 30, 0),
NewAtTime(11, 15, 0),
),
),
NewTask(
func() {},
),
)
s.Start()
// Runs the job one time now, without impacting the schedule
_ = j.RunNow()
}

func ExampleMonthlyJob() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
Expand All @@ -222,6 +244,32 @@ func ExampleNewScheduler() {
fmt.Println(s.Jobs())
}

func ExampleOneTimeJob() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()

// run a job once, immediately
_, _ = s.NewJob(
OneTimeJob(
OneTimeJobStartImmediately(),
),
NewTask(
func() {},
),
)
// run a job once in 10 seconds
_, _ = s.NewJob(
OneTimeJob(
OneTimeJobStartDateTime(time.Now().Add(10*time.Second)),
),
NewTask(
func() {},
),
)

s.Start()
}

func ExampleScheduler_NewJob() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
Expand Down
111 changes: 63 additions & 48 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type executor struct {
cancel context.CancelFunc
logger Logger
stopCh chan struct{}
jobsIDsIn chan uuid.UUID
jobsIn chan jobIn
jobIDsOut chan uuid.UUID
jobOutRequest chan jobOutRequest
stopTimeout time.Duration
Expand All @@ -25,8 +25,13 @@ type executor struct {
locker Locker
}

type jobIn struct {
id uuid.UUID
shouldSendOut bool
}

type singletonRunner struct {
in chan uuid.UUID
in chan jobIn
rescheduleLimiter chan struct{}
}

Expand All @@ -35,7 +40,7 @@ type limitModeConfig struct {
mode LimitMode
limit uint
rescheduleLimiter chan struct{}
in chan uuid.UUID
in chan jobIn
// singletonJobs is used to track singleton jobs that are running
// in the limit mode runner. This is used to prevent the same job
// from running multiple times across limit mode runners when both
Expand Down Expand Up @@ -72,7 +77,7 @@ func (e *executor) start() {
// are run immediately.
// 2. sent from time.AfterFuncs in which job schedules
// are spun up by the scheduler
case id := <-e.jobsIDsIn:
case jIn := <-e.jobsIn:
select {
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
Expand Down Expand Up @@ -111,14 +116,16 @@ func (e *executor) start() {
// the executor from building up a waiting queue
// and forces rescheduling
case e.limitMode.rescheduleLimiter <- struct{}{}:
e.limitMode.in <- id
e.limitMode.in <- jIn
default:
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
select {
case e.jobIDsOut <- id:
default:
if jIn.shouldSendOut {
select {
case e.jobIDsOut <- jIn.id:
default:
}
}
}
} else {
Expand All @@ -127,51 +134,53 @@ func (e *executor) start() {
// to work through the channel backlog. A hard limit of 1000 is in place
// at which point this call would block.
// TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- id
e.limitMode.in <- jIn
}
} else {
// no limit mode, so we're either running a regular job or
// a job with a singleton mode
//
// get the job, so we can figure out what kind it is and how
// to execute it
j := requestJobCtx(ctx, id, e.jobOutRequest)
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
if j == nil {
// safety check as it'd be strange bug if this occurred
return
}
if j.singletonMode {
// for singleton mode, get the existing runner for the job
// or spin up a new one
runner, ok := e.singletonRunners[id]
runner, ok := e.singletonRunners[jIn.id]
if !ok {
runner.in = make(chan uuid.UUID, 1000)
runner.in = make(chan jobIn, 1000)
if j.singletonLimitMode == LimitModeReschedule {
runner.rescheduleLimiter = make(chan struct{}, 1)
}
e.singletonRunners[id] = runner
e.singletonRunners[jIn.id] = runner
singletonJobsWg.Add(1)
go e.singletonModeRunner("singleton-"+id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
}

if j.singletonLimitMode == LimitModeReschedule {
// reschedule mode uses the limiter channel to check
// for a running job and reschedules if the channel is full.
select {
case runner.rescheduleLimiter <- struct{}{}:
runner.in <- id
runner.in <- jIn
default:
// runner is busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
select {
case e.jobIDsOut <- id:
default:
if jIn.shouldSendOut {
select {
case e.jobIDsOut <- jIn.id:
default:
}
}
}
} else {
// wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- id
runner.in <- jIn
}
} else {
select {
Expand All @@ -187,7 +196,7 @@ func (e *executor) start() {
// complete.
standardJobsWg.Add(1)
go func(j internalJob) {
e.runJob(j)
e.runJob(j, jIn.shouldSendOut)
standardJobsWg.Done()
}(*j)
}
Expand All @@ -200,11 +209,11 @@ func (e *executor) start() {
}
}

func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for {
select {
case id := <-in:
case jIn := <-in:
select {
case <-e.ctx.Done():
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
Expand All @@ -214,24 +223,28 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
}

ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, id, e.jobOutRequest)
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel()
if j != nil {
if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
_, ok := e.limitMode.singletonJobs[id]
_, ok := e.limitMode.singletonJobs[jIn.id]
if ok {
// this job is already running, so don't run it
// but instead reschedule it
e.limitMode.singletonJobsMu.Unlock()
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
if jIn.shouldSendOut {
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
}
}
// remove the limiter block to allow another job to be scheduled
// remove the limiter block, as this particular job
// was a singleton already running, and we want to
// allow another job to be scheduled
if limitMode == LimitModeReschedule {
select {
case <-rescheduleLimiter:
Expand All @@ -240,14 +253,14 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
}
continue
}
e.limitMode.singletonJobs[id] = struct{}{}
e.limitMode.singletonJobs[jIn.id] = struct{}{}
e.limitMode.singletonJobsMu.Unlock()
}
e.runJob(*j)
e.runJob(*j, jIn.shouldSendOut)

if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
delete(e.limitMode.singletonJobs, id)
delete(e.limitMode.singletonJobs, jIn.id)
e.limitMode.singletonJobsMu.Unlock()
}
}
Expand All @@ -267,24 +280,24 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
}
}

func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: singletonModeRunner starting", "name", name)
for {
select {
case id := <-in:
case jIn := <-in:
select {
case <-e.ctx.Done():
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
e.logger.Debug("gocron: singletonModeRunner shutting down", "name", name)
wg.Done()
return
default:
}

ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, id, e.jobOutRequest)
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel()
if j != nil {
e.runJob(*j)
e.runJob(*j, jIn.shouldSendOut)
}

// remove the limiter block to allow another job to be scheduled
Expand All @@ -295,14 +308,14 @@ func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitG
}
}
case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name)
e.logger.Debug("singletonModeRunner shutting down", "name", name)
wg.Done()
return
}
}
}

func (e *executor) runJob(j internalJob) {
func (e *executor) runJob(j internalJob, shouldSendOut bool) {
if j.ctx == nil {
return
}
Expand All @@ -327,12 +340,14 @@ func (e *executor) runJob(j internalJob) {
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)

select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
if shouldSendOut {
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
}
}

err := callJobFuncWithParams(j.function, j.parameters...)
Expand Down
Loading

0 comments on commit 6e15f16

Please sign in to comment.