Skip to content

Commit

Permalink
add locker for distributed scheduling (#463)
Browse files Browse the repository at this point in the history
* add locker for distributed scheduling

* Update locker_test.go

* expand documentation

* unlock after 90% of the time till next run to prevent fast executions from locking

* time.Until vs. time.Sub

* add beta notes
  • Loading branch information
JohnRoesler committed May 5, 2023
1 parent 254a953 commit 019f6e1
Show file tree
Hide file tree
Showing 13 changed files with 668 additions and 35 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ fmt:
@go list -f {{.Dir}} ./... | xargs -I{} gofmt -w -s {}

lint:
@grep "^func " example_test.go | sort -c
@golangci-lint run

test:
Expand Down
20 changes: 13 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ If you want to chat, you can find us at Slack!
```golang
s := gocron.NewScheduler(time.UTC)

s.Every(5).Seconds().Do(func(){ ... })
job, err := s.Every(5).Seconds().Do(func(){ ... })
if err != nil {
// handle the error related to setting up the job
}

// strings parse to duration
s.Every("5m").Do(func(){ ... })
Expand Down Expand Up @@ -82,11 +85,12 @@ For more examples, take a look in our [go docs](https://pkg.go.dev/github.com/go

There are several options available to restrict how jobs run:

| Mode | Function | Behavior |
| --------------- | ------------------------ | ------------------------------------------------------------------------------- |
| Default | | jobs are rescheduled at every interval |
| Job singleton | `SingletonMode()` | a long running job will not be rescheduled until the current run is completed |
| Scheduler limit | `SetMaxConcurrentJobs()` | set a collective maximum number of concurrent jobs running across the scheduler |
| Mode | Function | Behavior |
|----------------------------|---------------------------|------------------------------------------------------------------------------------------------------|
| Default | | jobs are rescheduled at every interval |
| Job singleton | `SingletonMode()` | a long running job will not be rescheduled until the current run is completed |
| Scheduler limit | `SetMaxConcurrentJobs()` | set a collective maximum number of concurrent jobs running across the scheduler |
| Distributed locking (BETA) | `WithDistributedLocker()` | prevents the same job from being run more than once when running multiple instances of the scheduler |

## Tags

Expand All @@ -113,7 +117,9 @@ s.RunByTag("tag")
## FAQ

- Q: I'm running multiple pods on a distributed environment. How can I make a job not run once per pod causing duplication?
- A: We recommend using your own lock solution within the jobs themselves (you could use [Redis](https://redis.io/topics/distlock), for example)
- We recommend using your own lock solution within the jobs themselves (you could use [Redis](https://redis.io/topics/distlock), for example)
- A2: Currently in BETA (please provide feedback): Use the scheduler option `WithDistributedLocker` and either use an implemented backend
or implement your own and contribute it back in a PR (we hope)!

- Q: I've removed my job from the scheduler, but how can I stop a long-running job that has already been triggered?
- A: We recommend using a means of canceling your job, e.g. a `context.WithCancel()`.
Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version: "3.8"

services:
redis:
image: redis:6.2-alpine
25 changes: 22 additions & 3 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-co-op/gocron"
"github.com/redis/go-redis/v9"
)

var task = func() {}
Expand All @@ -20,7 +21,7 @@ func ExampleJob_Error() {
j := s.Jobs()[0]
fmt.Println(j.Error())
// Output:
// the given time format is not supported
// gocron: the given time format is not supported
}

func ExampleJob_FinishedRunCount() {
Expand Down Expand Up @@ -185,7 +186,7 @@ func ExampleJob_Weekday() {
fmt.Println(err)
// Output:
// Monday
// job not scheduled weekly on a weekday
// gocron: job not scheduled weekly on a weekday
}

func ExampleJob_Weekdays() {
Expand Down Expand Up @@ -823,7 +824,7 @@ func ExampleScheduler_TagsUnique() {

fmt.Println(err)
// Output:
// a non-unique tag was set on the job: foo
// gocron: a non-unique tag was set on the job: foo
}

func ExampleScheduler_TaskPresent() {
Expand Down Expand Up @@ -923,6 +924,24 @@ func ExampleScheduler_Weeks() {
_, _ = s.Every(2).Weeks().Monday().Wednesday().Friday().Do(task)
}

func ExampleScheduler_WithDistributedLocker() {
redisOptions := &redis.Options{
Addr: "localhost:6379",
}
redisClient := redis.NewClient(redisOptions)
locker, err := gocron.NewRedisLocker(redisClient)
if err != nil {
// handle the error
}

s := gocron.NewScheduler(time.UTC)
s.WithDistributedLocker(locker)
_, err = s.Every("500ms").Do(task)
if err != nil {
// handle the error
}
}

// ---------------------------------------------------------------------
// ---------------------OTHER-FUNCTIONS---------------------------------
// ---------------------------------------------------------------------
Expand Down
20 changes: 20 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"
)

const (
Expand Down Expand Up @@ -49,6 +50,8 @@ type executor struct {
limitModeQueue chan jobFunction // pass job functions to the limit mode workers
limitModeRunningJobs *atomic.Int64 // tracks the count of running jobs to check against the max
stopped *atomic.Bool // allow workers to drain the buffered limitModeQueue

distributedLocker Locker // support running jobs across multiple instances
}

func newExecutor() executor {
Expand Down Expand Up @@ -170,6 +173,23 @@ func (e *executor) run() {

switch f.runConfig.mode {
case defaultMode:
if e.distributedLocker != nil {
l, err := e.distributedLocker.Lock(f.ctx, f.name)
if err != nil || l == nil {
return
}
defer func() {
durationToNextRun := time.Until(f.jobFuncNextRun)
if durationToNextRun > time.Second*5 {
durationToNextRun = time.Second * 5
}
if durationToNextRun > time.Millisecond*100 {
timeToSleep := time.Duration(float64(durationToNextRun) * 0.9)
time.Sleep(timeToSleep)
}
_ = l.Unlock(f.ctx)
}()
}
runJob(f)
case singletonMode:
e.singletonWgs.Store(f.singletonWg, struct{}{})
Expand Down
41 changes: 39 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,52 @@ module github.com/go-co-op/gocron
go 1.20

require (
github.com/go-redsync/redsync/v4 v4.8.1
github.com/redis/go-redis/v9 v9.0.2
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.8.2
github.com/testcontainers/testcontainers-go/modules/redis v0.0.0-20230424150504-5185956fa1de
)

replace github.com/testcontainers/testcontainers-go v0.19.0 => github.com/testcontainers/testcontainers-go v0.0.0-20230424150504-5185956fa1de

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.6.19 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v23.0.3+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/klauspost/compress v1.11.13 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.0.0-20221128092401-c43b287e0e0f // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/testcontainers/testcontainers-go v0.19.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect
google.golang.org/grpc v1.47.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 019f6e1

Please sign in to comment.