Skip to content

Commit

Permalink
Fix the distributed locking didn't take effect if the max concurrent (#…
Browse files Browse the repository at this point in the history
…471)

jobs was set
  • Loading branch information
git-hulk committed May 7, 2023
1 parent 02bb002 commit c4e641b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 52 deletions.
62 changes: 33 additions & 29 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (e *executor) limitModeRunner() {
return
case jf := <-e.limitModeQueue:
if !e.stopped.Load() {
runJob(jf)
e.runJob(jf)
}
}
}
Expand All @@ -125,6 +125,37 @@ func (e *executor) start() {
go e.run()
}

func (e *executor) runJob(f jobFunction) {
switch f.runConfig.mode {
case defaultMode:
if e.distributedLocker != nil {
l, err := e.distributedLocker.Lock(f.ctx, f.name)
if err != nil || l == nil {
return
}
defer func() {
durationToNextRun := time.Until(f.jobFuncNextRun)
if durationToNextRun > time.Second*5 {
durationToNextRun = time.Second * 5
}
if durationToNextRun > time.Millisecond*100 {
timeToSleep := time.Duration(float64(durationToNextRun) * 0.9)
time.Sleep(timeToSleep)
}
_ = l.Unlock(f.ctx)
}()
}
runJob(f)
case singletonMode:
e.singletonWgs.Store(f.singletonWg, struct{}{})

if !f.singletonRunnerOn.Load() {
go f.singletonRunner()
}
f.singletonQueue <- struct{}{}
}
}

func (e *executor) run() {
for {
select {
Expand Down Expand Up @@ -171,34 +202,7 @@ func (e *executor) run() {
return
}

switch f.runConfig.mode {
case defaultMode:
if e.distributedLocker != nil {
l, err := e.distributedLocker.Lock(f.ctx, f.name)
if err != nil || l == nil {
return
}
defer func() {
durationToNextRun := time.Until(f.jobFuncNextRun)
if durationToNextRun > time.Second*5 {
durationToNextRun = time.Second * 5
}
if durationToNextRun > time.Millisecond*100 {
timeToSleep := time.Duration(float64(durationToNextRun) * 0.9)
time.Sleep(timeToSleep)
}
_ = l.Unlock(f.ctx)
}()
}
runJob(f)
case singletonMode:
e.singletonWgs.Store(f.singletonWg, struct{}{})

if !f.singletonRunnerOn.Load() {
go f.singletonRunner()
}
f.singletonQueue <- struct{}{}
}
e.runJob(f)
}()
case <-e.ctx.Done():
e.jobsWg.Wait()
Expand Down
48 changes: 25 additions & 23 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2581,6 +2581,14 @@ func (l *lock) Unlock(_ context.Context) error {
}

func TestScheduler_EnableDistributedLocking(t *testing.T) {
runTestWithDistributedLocking(t, 0)
}

func TestScheduler_EnableMaxConcurrentJobs(t *testing.T) {
runTestWithDistributedLocking(t, 10)
}

func runTestWithDistributedLocking(t *testing.T, maxConcurrentJobs int) {
resultChan := make(chan int, 10)
f := func(schedulerInstance int) {
resultChan <- schedulerInstance
Expand All @@ -2591,30 +2599,24 @@ func TestScheduler_EnableDistributedLocking(t *testing.T) {
store: make(map[string]struct{}, 0),
}

s1 := NewScheduler(time.UTC)
s1.WithDistributedLocker(l)
_, err := s1.Every("500ms").Do(f, 1)
require.NoError(t, err)

s2 := NewScheduler(time.UTC)
s2.WithDistributedLocker(l)
_, err = s2.Every("500ms").Do(f, 2)
require.NoError(t, err)

s3 := NewScheduler(time.UTC)
s3.WithDistributedLocker(l)
_, err = s3.Every("500ms").Do(f, 3)
require.NoError(t, err)

s1.StartAsync()
s2.StartAsync()
s3.StartAsync()

schedulers := make([]*Scheduler, 0)
for i := 0; i < 3; i++ {
s := NewScheduler(time.UTC)
s.WithDistributedLocker(l)
if maxConcurrentJobs > 0 {
s.SetMaxConcurrentJobs(maxConcurrentJobs, WaitMode)
}
_, err := s.Every("500ms").Do(f, 1)
require.NoError(t, err)
schedulers = append(schedulers, s)
}
for i := range schedulers {
schedulers[i].StartAsync()
}
time.Sleep(1700 * time.Millisecond)

s1.Stop()
s2.Stop()
s3.Stop()
for i := range schedulers {
schedulers[i].Stop()
}
close(resultChan)

var results []int
Expand Down

0 comments on commit c4e641b

Please sign in to comment.