Skip to content

Commit

Permalink
rework the exectuor stopping to use context with cancel and wait grou…
Browse files Browse the repository at this point in the history
…ps instead of stop chans (#455)
  • Loading branch information
JohnRoesler committed Apr 17, 2023
1 parent f9acde1 commit b77fa0d
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 49 deletions.
57 changes: 38 additions & 19 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,24 @@ const (
)

type executor struct {
jobFunctions chan jobFunction
stopCh chan struct{}
stoppedCh chan struct{}
limitMode limitMode
jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler
ctx context.Context // used to tell the executor to stop
cancel context.CancelFunc // used to tell the executor to stop
wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop
jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish
singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete
limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler
maxRunningJobs *semaphore.Weighted
}

func newExecutor() executor {
return executor{
e := executor{
jobFunctions: make(chan jobFunction, 1),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
singletonWgs: &sync.Map{},
wg: &sync.WaitGroup{},
}
e.wg.Add(1)
return e
}

func runJob(f jobFunction) {
Expand All @@ -51,9 +56,13 @@ func runJob(f jobFunction) {
}

func (jf *jobFunction) singletonRunner() {
jf.singletonRunnerOn.Store(true)
jf.singletonWg.Add(1)
for {
select {
case <-jf.ctx.Done():
jf.singletonWg.Done()
jf.singletonRunnerOn.Store(false)
return
default:
if jf.singletonQueue.Load() != 0 {
Expand All @@ -65,15 +74,12 @@ func (jf *jobFunction) singletonRunner() {
}

func (e *executor) start() {
stopCtx, cancel := context.WithCancel(context.Background())
runningJobsWg := sync.WaitGroup{}

for {
select {
case f := <-e.jobFunctions:
runningJobsWg.Add(1)
e.jobsWg.Add(1)
go func() {
defer runningJobsWg.Done()
defer e.jobsWg.Done()

panicHandlerMutex.RLock()
defer panicHandlerMutex.RUnlock()
Expand All @@ -94,7 +100,7 @@ func (e *executor) start() {
return
case WaitMode:
select {
case <-stopCtx.Done():
case <-e.ctx.Done():
return
case <-f.ctx.Done():
return
Expand All @@ -114,19 +120,32 @@ func (e *executor) start() {
case defaultMode:
runJob(f)
case singletonMode:
e.singletonWgs.Store(f.singletonWg, struct{}{})

if !f.singletonRunnerOn.Load() {
go f.singletonRunner()
}

f.singletonQueue.Add(1)
}
}()
case <-e.stopCh:
cancel()
runningJobsWg.Wait()
close(e.stoppedCh)
case <-e.ctx.Done():
e.jobsWg.Wait()
e.wg.Done()
return
}
}
}

func (e *executor) stop() {
close(e.stopCh)
<-e.stoppedCh
e.cancel()
e.wg.Wait()
if e.singletonWgs != nil {
e.singletonWgs.Range(func(key, value any) bool {
if wg, ok := key.(*sync.WaitGroup); ok {
wg.Wait()
}
return true
})
}
}
9 changes: 9 additions & 0 deletions executor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gocron

import (
"context"
"sync"
"sync/atomic"
"testing"
Expand All @@ -10,6 +11,10 @@ import (

func Test_ExecutorExecute(t *testing.T) {
e := newExecutor()
stopCtx, cancel := context.WithCancel(context.Background())
e.ctx = stopCtx
e.cancel = cancel
e.jobsWg = &sync.WaitGroup{}

wg := &sync.WaitGroup{}
wg.Add(1)
Expand Down Expand Up @@ -41,6 +46,10 @@ func Test_ExecutorPanicHandling(t *testing.T) {
SetPanicHandler(handler)

e := newExecutor()
stopCtx, cancel := context.WithCancel(context.Background())
e.ctx = stopCtx
e.cancel = cancel
e.jobsWg = &sync.WaitGroup{}

wg := &sync.WaitGroup{}
wg.Add(1)
Expand Down
65 changes: 35 additions & 30 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,20 @@ type random struct {
}

type jobFunction struct {
eventListeners // additional functions to allow run 'em during job performing
function any // task's function
parameters []any // task's function parameters
parametersLen int // length of the passed parameters
name string // nolint the function name to run
runConfig runConfig // configuration for how many times to run the job
singletonQueue *atomic.Int64 // limits inflight runs of a job to one
ctx context.Context // for cancellation
cancel context.CancelFunc // for cancellation
isRunning *atomic.Bool // whether the job func is currently being run
runStartCount *atomic.Int64 // number of times the job was started
runFinishCount *atomic.Int64 // number of times the job was finished
eventListeners // additional functions to allow run 'em during job performing
function any // task's function
parameters []any // task's function parameters
parametersLen int // length of the passed parameters
name string // nolint the function name to run
runConfig runConfig // configuration for how many times to run the job
singletonQueue *atomic.Int64 // limits inflight runs of a job to one
singletonRunnerOn *atomic.Bool // whether the runner function for singleton is running
ctx context.Context // for cancellation
cancel context.CancelFunc // for cancellation
isRunning *atomic.Bool // whether the job func is currently being run
runStartCount *atomic.Int64 // number of times the job was started
runFinishCount *atomic.Int64 // number of times the job was finished
singletonWg *sync.WaitGroup // used by singleton runner
}

type eventListeners struct {
Expand All @@ -66,18 +68,20 @@ type jobMutex struct {

func (jf *jobFunction) copy() jobFunction {
cp := jobFunction{
eventListeners: jf.eventListeners,
function: jf.function,
parameters: nil,
parametersLen: jf.parametersLen,
name: jf.name,
runConfig: jf.runConfig,
singletonQueue: jf.singletonQueue,
ctx: jf.ctx,
cancel: jf.cancel,
isRunning: jf.isRunning,
runStartCount: jf.runStartCount,
runFinishCount: jf.runFinishCount,
eventListeners: jf.eventListeners,
function: jf.function,
parameters: nil,
parametersLen: jf.parametersLen,
name: jf.name,
runConfig: jf.runConfig,
singletonQueue: jf.singletonQueue,
ctx: jf.ctx,
cancel: jf.cancel,
isRunning: jf.isRunning,
runStartCount: jf.runStartCount,
runFinishCount: jf.runFinishCount,
singletonWg: jf.singletonWg,
singletonRunnerOn: jf.singletonRunnerOn,
}
cp.parameters = append(cp.parameters, jf.parameters...)
return cp
Expand Down Expand Up @@ -110,11 +114,12 @@ func newJob(interval int, startImmediately bool, singletonMode bool) *Job {
lastRun: time.Time{},
nextRun: time.Time{},
jobFunction: jobFunction{
ctx: ctx,
cancel: cancel,
isRunning: &atomic.Bool{},
runStartCount: &atomic.Int64{},
runFinishCount: &atomic.Int64{},
ctx: ctx,
cancel: cancel,
isRunning: &atomic.Bool{},
runStartCount: &atomic.Int64{},
runFinishCount: &atomic.Int64{},
singletonRunnerOn: &atomic.Bool{},
},
tags: []string{},
startsImmediately: startImmediately,
Expand Down Expand Up @@ -389,7 +394,7 @@ func (j *Job) SingletonMode() {
defer j.mu.Unlock()
j.runConfig.mode = singletonMode
j.jobFunction.singletonQueue = &atomic.Int64{}
go j.jobFunction.singletonRunner()
j.jobFunction.singletonWg = &sync.WaitGroup{}
}

// shouldRun evaluates if this job should run again
Expand Down
10 changes: 10 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,23 @@ func (s *Scheduler) StartAsync() {

// start starts the scheduler, scheduling and running jobs
func (s *Scheduler) start() {
stopCtx, cancel := context.WithCancel(context.Background())
s.executor.ctx = stopCtx
s.executor.cancel = cancel
s.executor.jobsWg = &sync.WaitGroup{}

go s.executor.start()
s.setRunning(true)
s.runJobs(s.Jobs())
}

func (s *Scheduler) runJobs(jobs []*Job) {
for _, job := range jobs {
ctx, cancel := context.WithCancel(context.Background())
job.mu.Lock()
job.ctx = ctx
job.cancel = cancel
job.mu.Unlock()
s.runContinuous(job)
}
}
Expand Down

0 comments on commit b77fa0d

Please sign in to comment.