Skip to content

Commit

Permalink
feat: jobs system (#15)
Browse files Browse the repository at this point in the history
* feat: jobs system

* lint
  • Loading branch information
23doors committed Aug 27, 2020
1 parent 96b2575 commit 1415298
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 0 deletions.
24 changes: 24 additions & 0 deletions jobs/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package jobs

import (
"context"
"fmt"
"time"
)

type Runnable interface {
fmt.Stringer
Run(ctx context.Context) (isDone bool, err error)
RunTimeout() time.Duration
}

type LockableRunnable interface {
Runnable
LockKey() string
}

type PeriodicRunnable interface {
LockableRunnable
RunPeriod() time.Duration
RunResolution() time.Duration
}
88 changes: 88 additions & 0 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package jobs

import (
"context"
"fmt"
"time"
)

type OneOffJob struct {
Name string
Func func(ctx context.Context) error
Timeout time.Duration // Passed as context.WithTimeout to Func.
}

func (j *OneOffJob) Run(ctx context.Context) (bool, error) {
return true, j.Func(ctx)
}

func (j *OneOffJob) RunTimeout() time.Duration {
return j.Timeout
}

func (j *OneOffJob) String() string {
return fmt.Sprintf("OneOffJob<Name=%s>", j.Name)
}

var _ Runnable = (*OneOffJob)(nil)

type PeriodicJob struct {
Func func(ctx context.Context) (isDone bool, err error)
Timeout time.Duration // Passed as context.WithTimeout to Func.
Period time.Duration
Resolution time.Duration // Time between attempts to grab a lock, defaults to max(min(1/4 of Perion, 1 Hour), 5 Min)
Name string
}

var _ PeriodicRunnable = (*PeriodicJob)(nil)

func (j *PeriodicJob) Run(ctx context.Context) (bool, error) {
return j.Func(ctx)
}

func (j *PeriodicJob) RunTimeout() time.Duration {
return j.Timeout
}

const (
minDefaultResolution = 5 * time.Minute
maxDefaultResolution = 1 * time.Hour
)

func DefaultResolution(period time.Duration) time.Duration {
res := period / 4

if res < minDefaultResolution {
return minDefaultResolution
}

if res > maxDefaultResolution {
return maxDefaultResolution
}

return res
}

func (j *PeriodicJob) RunResolution() time.Duration {
if j.Resolution == 0 {
return DefaultResolution(j.Period)
}

return j.Resolution
}

func (j *PeriodicJob) RunPeriod() time.Duration {
return j.Period
}

func (j *PeriodicJob) LockKey() string {
return j.Name
}

func (j *PeriodicJob) String() string {
return fmt.Sprintf("PeriodicJob<Name=%s>", j.Name)
}

func partition(t LockableRunnable) string {
return "0"
}
185 changes: 185 additions & 0 deletions jobs/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package jobs

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-redis/redis/v7"
"go.uber.org/zap"
)

type Runner struct {
log *zap.Logger
wg sync.WaitGroup
rc *redis.Client
stop chan struct{}
cfg Config

errorHandlerMu sync.RWMutex
errorHandler func(t Runnable, err error)
}

type Config struct {
ServiceKey string
Partition func(t LockableRunnable) string
}

var DefaultConfig = Config{
ServiceKey: "jobs",
Partition: partition,
}

type Option func(*Config)

func WithServiceKey(val string) Option {
return func(config *Config) {
config.ServiceKey = val
}
}
func WithPartition(f func(LockableRunnable) string) Option {
return func(config *Config) {
config.Partition = f
}
}

func New(log *zap.Logger, rc *redis.Client, opts ...Option) *Runner {
cfg := DefaultConfig

for _, opt := range opts {
opt(&cfg)
}

return &Runner{
log: log,
rc: rc,
stop: make(chan struct{}),
cfg: cfg,
}
}

func (p *Runner) createLockKey(t LockableRunnable) string {
return fmt.Sprintf("%s:%s:lock:%s", p.cfg.Partition(t), p.cfg.ServiceKey, t.LockKey())
}

func (p *Runner) runPeriodic(job PeriodicRunnable) {
initialRun := true

if initialRun {
p.wg.Add(1)
}

lockKey := p.createLockKey(job)
period := job.RunPeriod()

go func() {
ticker := time.NewTicker(job.RunResolution())

for {
if !initialRun {
p.wg.Add(1)
}

initialRun = false

ok, err := p.rc.SetNX(lockKey, 1, period).Result()
if !ok || err != nil {
if err != nil {
p.log.With(zap.Error(err)).Error("Could not obtain redis lock for periodic job")
}

p.wg.Done()

select {
case <-ticker.C:
continue
case <-p.stop:
return
}
}

isDone, _ := p.processJob(job)

if !isDone {
continue
}

select {
case <-ticker.C:
case <-p.stop:
return
}
}
}()
}

func (p *Runner) processJob(job Runnable) (bool, error) {
// Run job with optional timeout.
timeout := job.RunTimeout()

var (
ctx context.Context
cancel context.CancelFunc
)

if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
}

p.log.With(zap.Stringer("job", job)).Info("Running job")

isDone, err := job.Run(ctx)
if err != nil {
p.handleJobError(job, err)
}

cancel()

return isDone, err
}

func (p *Runner) Run(job Runnable) {
if periodic, ok := job.(PeriodicRunnable); ok {
p.runPeriodic(periodic)

return
}

p.wg.Add(1)

_, _ = p.processJob(job)
p.wg.Done()
}

func (p *Runner) Go(f func()) {
p.wg.Add(1)

go f()

p.wg.Done()
}

func (p *Runner) handleJobError(job Runnable, err error) {
p.errorHandlerMu.RLock()
defer p.errorHandlerMu.RUnlock()

if p.errorHandler != nil {
p.errorHandler(job, err)

return
}

p.log.Error("Job failed", zap.Error(err))
}

func (p *Runner) ErrorHandler(errorHandler func(t Runnable, err error)) {
p.errorHandlerMu.Lock()
p.errorHandler = errorHandler
p.errorHandlerMu.Unlock()
}

func (p *Runner) GracefulStop() {
close(p.stop)
p.wg.Wait()
}

0 comments on commit 1415298

Please sign in to comment.