Skip to content

Commit

Permalink
[BUG] Fix Data race when using RunByTag (#356)
Browse files Browse the repository at this point in the history
* [BUG] Fix Data race when using RunByTag

* Change naming: runJobWithDetails to addJobDetails
  • Loading branch information
seunghyupoh3517 committed Jul 20, 2022
1 parent fa855ed commit cf9e6c3
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 42 deletions.
1 change: 1 addition & 0 deletions gocron.go
Expand Up @@ -45,6 +45,7 @@ var (
ErrInvalidInterval = errors.New(".Every() interval must be greater than 0")
ErrInvalidIntervalType = errors.New(".Every() interval must be int, time.Duration, or string")
ErrInvalidIntervalUnitsSelection = errors.New(".Every(time.Duration) and .Cron() cannot be used with units (e.g. .Seconds())")
ErrInvalidFunctionParameters = errors.New("length of function parameters must match job function parameters")

ErrAtTimeNotSupported = errors.New("the At() method is not supported for this time unit")
ErrWeekdayNotSupported = errors.New("weekday is not supported for time unit")
Expand Down
13 changes: 12 additions & 1 deletion job.go
Expand Up @@ -397,7 +397,6 @@ func (j *Job) SingletonMode() {
defer j.mu.Unlock()
j.runConfig.mode = singletonMode
j.jobFunction.limiter = &singleflight.Group{}

}

// shouldRun evaluates if this job should run again
Expand All @@ -410,10 +409,14 @@ func (j *Job) shouldRun() bool {

// LastRun returns the time the job was run last
func (j *Job) LastRun() time.Time {
j.mu.RLock()
defer j.mu.RUnlock()
return j.lastRun
}

func (j *Job) setLastRun(t time.Time) {
j.mu.Lock()
defer j.mu.Unlock()
j.lastRun = t
}

Expand All @@ -432,9 +435,17 @@ func (j *Job) setNextRun(t time.Time) {

// RunCount returns the number of time the job ran so far
func (j *Job) RunCount() int {
j.mu.RLock()
defer j.mu.RUnlock()
return j.runCount
}

func (j *Job) incrementRunCount() {
j.mu.Lock()
defer j.mu.Unlock()
j.runCount++
}

func (j *Job) stop() {
j.mu.Lock()
defer j.mu.Unlock()
Expand Down
24 changes: 18 additions & 6 deletions scheduler.go
Expand Up @@ -79,7 +79,7 @@ func (s *Scheduler) StartAsync() {
}
}

//start starts the scheduler, scheduling and running jobs
// start starts the scheduler, scheduling and running jobs
func (s *Scheduler) start() {
go s.executor.start()
s.setRunning(true)
Expand Down Expand Up @@ -342,7 +342,6 @@ func (s *Scheduler) calculateTotalDaysDifference(lastRun time.Time, daysToWeekda
}

func (s *Scheduler) calculateDays(job *Job, lastRun time.Time) nextRun {

if job.getInterval() == 1 {
lastRunDayPlusJobAtTime := s.roundToMidnight(lastRun).Add(job.getAtTime(lastRun))

Expand Down Expand Up @@ -533,6 +532,21 @@ func (s *Scheduler) run(job *Job) {
return
}

job = s.addJobDetails(job)
if job.error != nil {
// delete the job from the scheduler as this job
// cannot be executed
s.RemoveByReference(job)
return
// return job.error
}

s.executor.jobFunctions <- job.jobFunction.copy()
job.setLastRun(s.now())
job.incrementRunCount()
}

func (s *Scheduler) addJobDetails(job *Job) *Job {
job.mu.Lock()
defer job.mu.Unlock()

Expand All @@ -544,13 +558,11 @@ func (s *Scheduler) run(job *Job) {
job.parameters[job.parametersLen] = job.copy()
default:
// something is really wrong and we should never get here
return
job.error = wrapOrError(job.error, ErrInvalidFunctionParameters)
}
}

s.executor.jobFunctions <- job.jobFunction.copy()
job.setLastRun(s.now())
job.runCount++
return job
}

func (s *Scheduler) runContinuous(job *Job) {
Expand Down
56 changes: 21 additions & 35 deletions scheduler_test.go
Expand Up @@ -52,7 +52,6 @@ func TestImmediateExecution(t *testing.T) {
case <-semaphore:
// test passed
}

}

func TestScheduler_Every_InvalidInterval(t *testing.T) {
Expand All @@ -75,7 +74,6 @@ func TestScheduler_Every_InvalidInterval(t *testing.T) {
assert.EqualError(t, err, tc.expectedError)
})
}

}

func TestScheduler_EveryRandom(t *testing.T) {
Expand Down Expand Up @@ -172,7 +170,6 @@ func TestScheduler_Every(t *testing.T) {
s.Stop()
assert.Equal(t, 2, counter)
})

}

func TestExecutionSeconds(t *testing.T) {
Expand Down Expand Up @@ -392,7 +389,6 @@ func TestWeekdayAt(t *testing.T) {
}

func TestScheduler_Remove(t *testing.T) {

t.Run("remove from non-running", func(t *testing.T) {
s := NewScheduler(time.UTC)
s.TagsUnique()
Expand Down Expand Up @@ -784,7 +780,6 @@ func TestClearUnique(t *testing.T) {
}

func TestSetUnit(t *testing.T) {

testCases := []struct {
desc string
timeUnit schedulingUnit
Expand Down Expand Up @@ -1036,7 +1031,7 @@ func _getMinutes(i int) time.Duration {
}

func TestScheduler_Do(t *testing.T) {
var testCases = []struct {
testCases := []struct {
description string
evalFunc func(*Scheduler)
}{
Expand Down Expand Up @@ -1239,7 +1234,6 @@ func TestCalculateMonths(t *testing.T) {
}

func TestScheduler_SingletonMode(t *testing.T) {

testCases := []struct {
description string
removeJob bool
Expand All @@ -1250,7 +1244,6 @@ func TestScheduler_SingletonMode(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
var trigger int32

Expand All @@ -1273,11 +1266,9 @@ func TestScheduler_SingletonMode(t *testing.T) {
s.Stop()
})
}

}

func TestScheduler_SingletonModeAll(t *testing.T) {

testCases := []struct {
description string
removeJob bool
Expand All @@ -1288,7 +1279,6 @@ func TestScheduler_SingletonModeAll(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
s.SingletonModeAll()

Expand All @@ -1313,7 +1303,6 @@ func TestScheduler_SingletonModeAll(t *testing.T) {
s.Stop()
})
}

}

func TestScheduler_LimitRunsTo(t *testing.T) {
Expand Down Expand Up @@ -1404,7 +1393,8 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
// 1s - job 1 hits the limit and is skipped
// 2s - job 1 & 2 run
// 3s - job 1 hits the limit and is skipped
{"reschedule mode", 2, RescheduleMode, 4, false,
{
"reschedule mode", 2, RescheduleMode, 4, false,
func() {
semaphore <- true
time.Sleep(200 * time.Millisecond)
Expand All @@ -1416,15 +1406,17 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
// 1s - job 1 runs twice, the blocked run and the regularly scheduled run
// 2s - jobs 1 & 3 run
// 3s - jobs 2 & 3 run, job 1 hits the limit and waits
{"wait mode", 2, WaitMode, 8, false,
{
"wait mode", 2, WaitMode, 8, false,
func() {
semaphore <- true
time.Sleep(100 * time.Millisecond)
},
},

// Same as above - this confirms the same behavior when jobs are removed rather than the scheduler being stopped
{"wait mode - with job removal", 2, WaitMode, 8, true,
{
"wait mode - with job removal", 2, WaitMode, 8, true,
func() {
semaphore <- true
time.Sleep(100 * time.Millisecond)
Expand All @@ -1434,7 +1426,6 @@ func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(tc.maxConcurrentJobs, tc.mode)

Expand Down Expand Up @@ -1510,7 +1501,6 @@ func TestScheduler_TagsUnique(t *testing.T) {

_, err = s.Every("1s").Tag(bar).Do(func() {})
assert.EqualError(t, err, ErrTagsUnique(bar).Error())

}

func TestScheduler_MultipleTagsChained(t *testing.T) {
Expand Down Expand Up @@ -1671,21 +1661,29 @@ func TestScheduler_Update(t *testing.T) {

func TestScheduler_RunByTag(t *testing.T) {
var (
s = NewScheduler(time.Local)
count = 0
wg sync.WaitGroup
s = NewScheduler(time.Local)
wg sync.WaitGroup
counterMutex sync.RWMutex
count = 0
)

s.Every(1).Day().StartAt(time.Now().Add(time.Hour)).Tag("tag").Do(func() {
counterMutex.Lock()
defer counterMutex.Unlock()
count++
wg.Done()
})
wg.Add(1)
wg.Add(3)
s.StartAsync()

assert.NoError(t, s.RunByTag("tag"))
assert.NoError(t, s.RunByTag("tag"))
assert.NoError(t, s.RunByTag("tag"))

wg.Wait()
assert.Equal(t, 1, count)
counterMutex.RLock()
defer counterMutex.RUnlock()
assert.Equal(t, 3, count)
assert.Error(t, s.RunByTag("wrong-tag"))
}

Expand Down Expand Up @@ -1837,7 +1835,6 @@ func TestScheduler_WaitForSchedules(t *testing.T) {
}

func TestScheduler_LenWeekDays(t *testing.T) {

testCases := []struct {
description string
weekDays []time.Weekday
Expand All @@ -1860,7 +1857,6 @@ func TestScheduler_LenWeekDays(t *testing.T) {
assert.Equal(t, len(j.scheduledWeekdays), tc.finalLen)
})
}

}

func TestScheduler_CallNextWeekDay(t *testing.T) {
Expand All @@ -1869,7 +1865,7 @@ func TestScheduler_CallNextWeekDay(t *testing.T) {
}

const wantTimeUntilNextRun = time.Hour * 24 * 2
var lastRun = januaryFirst2020At(0, 0, 0)
lastRun := januaryFirst2020At(0, 0, 0)

testCases := []struct {
description string
Expand All @@ -1881,7 +1877,6 @@ func TestScheduler_CallNextWeekDay(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
s.Every(1)

Expand All @@ -1895,10 +1890,8 @@ func TestScheduler_CallNextWeekDay(t *testing.T) {

got := s.durationToNextRun(lastRun, job).duration
assert.Equal(t, wantTimeUntilNextRun, got)

})
}

}

func TestScheduler_Midday(t *testing.T) {
Expand Down Expand Up @@ -1986,7 +1979,6 @@ func TestScheduler_CheckNextWeekDay(t *testing.T) {
job.lastRun = secondLastRun
gotSecond := s.durationToNextRun(secondLastRun, job).duration
assert.Equal(t, wantTimeUntilNextSecondRun, gotSecond)

})
}

Expand Down Expand Up @@ -2043,18 +2035,14 @@ func TestScheduler_CheckEveryWeekHigherThanOne(t *testing.T) {
} else if tc.caseTest == 3 {
assert.Equal(t, wantTimeUntilNextRunTwoWeeksLessOneDay, got)
}

}
job.runCount++
}

})
}

}

func TestScheduler_StartImmediately(t *testing.T) {

testCases := []struct {
description string
scheduler *Scheduler
Expand Down Expand Up @@ -2110,7 +2098,6 @@ func TestScheduler_CheckCalculateDaysOfMonth(t *testing.T) {
func TestScheduler_CheckSetBehaviourBeforeJobCreated(t *testing.T) {
s := NewScheduler(time.UTC)
s.Month(1, 2).Every(1).Do(func() {})

}

func TestScheduler_MonthLastDayAtTime(t *testing.T) {
Expand All @@ -2124,7 +2111,6 @@ func TestScheduler_MonthLastDayAtTime(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

s := NewScheduler(time.UTC)
got := s.durationToNextRun(tc.job.LastRun(), tc.job).duration
assert.Equalf(t, tc.wantTimeUntilNextRun, got, fmt.Sprintf("expected %s / got %s", tc.wantTimeUntilNextRun.String(), got.String()))
Expand Down

0 comments on commit cf9e6c3

Please sign in to comment.