Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
guilhem committed Sep 21, 2021
1 parent e396721 commit f5ea411
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 143 deletions.
2 changes: 1 addition & 1 deletion go.mod
@@ -1,3 +1,3 @@
module github.com/catmullet/go-workers

go 1.15
go 1.16
235 changes: 93 additions & 142 deletions workers.go
Expand Up @@ -16,234 +16,185 @@ var defaultWatchSignals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.S
// Return nil if you want the Runner to continue otherwise any error will cause the Runner to shutdown and return the
// error.
type Worker interface {
Work(in interface{}, out chan<- interface{}) error
Work(ctx context.Context, in interface{}, out chan<- interface{}) error
}

// Runner Handles the running the Worker logic.
type Runner interface {
BeforeFunc(func(ctx context.Context) error) Runner
AfterFunc(func(ctx context.Context, err error) error) Runner
SetDeadline(t time.Time) Runner
SetTimeout(duration time.Duration) Runner
SetFollower()
Send(in interface{})
InFrom(w ...Runner) Runner
SetOut(chan interface{})
Start() Runner
Stop() chan error
Wait() error
}
type Runner struct {
ctx context.Context
cancel context.CancelFunc
inChan chan interface{}
outChan chan interface{}
limiter chan struct{}

type runner struct {
ctx context.Context
cancel context.CancelFunc
inChan chan interface{}
outChan chan interface{}
errChan chan error
signalChan chan os.Signal
limiter chan struct{}

afterFunc func(ctx context.Context, err error) error
workFunc func(in interface{}, out chan<- interface{}) error
afterFunc func(ctx context.Context, err error)
workFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error
beforeFunc func(ctx context.Context) error

timeout time.Duration
deadline time.Duration

isLeader bool
stopCalled bool
timeout time.Duration

numWorkers int64
lock *sync.RWMutex
started *sync.Once
wg *sync.WaitGroup
done *sync.Once
once *sync.Once
}

// NewRunner Factory function for a new Runner. The Runner will handle running the workers logic.
func NewRunner(ctx context.Context, w Worker, numWorkers int64) Runner {
var runnerCtx, runnerCancel = context.WithCancel(ctx)
var runner = &runner{
func NewRunner(ctx context.Context, w Worker, numWorkers int64) *Runner {
runnerCtx, runnerCancel := signal.NotifyContext(ctx, defaultWatchSignals...)
runner := &Runner{
ctx: runnerCtx,
cancel: runnerCancel,
inChan: make(chan interface{}, numWorkers),
outChan: nil,
errChan: make(chan error, 1),
signalChan: make(chan os.Signal, 1),
limiter: make(chan struct{}, numWorkers),
afterFunc: func(ctx context.Context, err error) error { return err },
afterFunc: func(ctx context.Context, err error) {},
workFunc: w.Work,
beforeFunc: func(ctx context.Context) error { return nil },
numWorkers: numWorkers,
isLeader: true,
lock: new(sync.RWMutex),
wg: new(sync.WaitGroup),
once: new(sync.Once),
done: new(sync.Once),
started: new(sync.Once),
}
runner.waitForSignal(defaultWatchSignals...)
return runner
}

var ErrWorkerDone = errors.New("worker done")

// Send Send an object to the worker for processing.
func (r *runner) Send(in interface{}) {
func (r *Runner) Send(in interface{}) error {
select {
case <-r.ctx.Done():
return
return ErrWorkerDone
case r.inChan <- in:
}
return nil
}

// InFrom Set a worker to accept output from another worker(s).
func (r *runner) InFrom(w ...Runner) Runner {
r.SetFollower()
func (r *Runner) InFrom(w ...Runner) *Runner {
for _, wr := range w {
wr.SetOut(r.inChan)
// create a chan for producer to close it without impacting other producers
aggChan := make(chan interface{})
go func() {
for {
select {
case <-r.ctx.Done():
// consumer is ending. Close chan to inform producer
close(aggChan)
return
case msg, ok := <-aggChan:
if !ok {
return
}
r.inChan <- msg
}
}
}()
wr.SetOut(aggChan)
}
return r
}

// SetFollower Sets the worker as a follower and does not need to close it's in channel.
func (r *runner) SetFollower() {
r.lock.Lock()
r.isLeader = false
r.lock.Unlock()
}

// Start Starts the worker on processing.
func (r *runner) Start() Runner {
r.startWork()
return r
func (r *Runner) Start() error {
var err error
r.started.Do(func() {
err = r.beforeFunc(r.ctx)
go r.work()
})
return err
}

// BeforeFunc Function to be run before worker starts processing.
func (r *runner) BeforeFunc(f func(ctx context.Context) error) Runner {
func (r *Runner) BeforeFunc(f func(ctx context.Context) error) *Runner {
r.beforeFunc = f
return r
}

// AfterFunc Function to be run after worker has stopped.
func (r *runner) AfterFunc(f func(ctx context.Context, err error) error) Runner {
func (r *Runner) AfterFunc(f func(ctx context.Context, err error)) *Runner {
r.afterFunc = f
return r
}

var ErrOutAlready = errors.New("out already set")

// SetOut Allows the setting of a workers out channel, if not already set.
func (r *runner) SetOut(c chan interface{}) {
func (r *Runner) SetOut(c chan interface{}) error {
if r.outChan != nil {
return
return ErrOutAlready
}
r.outChan = c
return nil
}

// SetDeadline allows a time to be set when the workers should stop.
// Deadline needs to be handled by the IsDone method.
func (r *runner) SetDeadline(t time.Time) Runner {
r.lock.Lock()
defer r.lock.Unlock()
func (r *Runner) SetDeadline(t time.Time) *Runner {
r.ctx, r.cancel = context.WithDeadline(r.ctx, t)
return r
}

// SetTimeout allows a time duration to be set when the workers should stop.
// Timeout needs to be handled by the IsDone method.
func (r *runner) SetTimeout(duration time.Duration) Runner {
r.lock.Lock()
defer r.lock.Unlock()
func (r *Runner) SetTimeout(duration time.Duration) *Runner {
r.timeout = duration
return r
}

// Wait calls stop on workers and waits for the channel to drain.
// !!Should only be called when certain nothing will send to worker.
func (r *runner) Wait() error {
r.waitForDrain()
if err := <-r.Stop(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
func (r *Runner) Wait() {
<-r.ctx.Done()
}

// Stop Stops the processing of a worker and closes it's channel in.
// Returns a blocking channel with type error.
// !!Should only be called when certain nothing will send to worker.
func (r *runner) Stop() chan error {
r.done.Do(func() {
if r.inChan != nil && r.isLeader {
close(r.inChan)
}
})
return r.errChan
}

// IsDone returns a channel signaling the workers context has been canceled.
func (r *runner) IsDone() <-chan struct{} {
return r.ctx.Done()
func (r *Runner) Stop() {
r.cancel()
close(r.inChan)
}

// waitForSignal make sure we wait for a term signal and shutdown correctly
func (r *runner) waitForSignal(signals ...os.Signal) {
go func() {
signal.Notify(r.signalChan, signals...)
<-r.signalChan
if r.cancel != nil {
r.cancel()
}
}()
}
type InputKey struct{}

// waitForDrain Waits for the limiter to be zeroed out and the in channel to be empty.
func (r *runner) waitForDrain() {
for len(r.limiter) > 0 || len(r.inChan) > 0 {
// Wait for the drain.
}
}

// startWork Runs the before function and starts processing until one of three things happen.
// work Runs the before function and starts processing until one of three things happen.
// 1. A term signal is received or cancellation of context.
// 2. Stop function is called.
// 3. Worker returns an error.
func (r *runner) startWork() {
var err error
if err = r.beforeFunc(r.ctx); err != nil {
r.errChan <- err
return
}
if r.timeout > 0 {
r.ctx, r.cancel = context.WithTimeout(r.ctx, r.timeout)
}
r.wg.Add(1)
go func() {
var workerWG = new(sync.WaitGroup)
var closeOnce = new(sync.Once)

// write out error if not nil on exit.
defer func() {
workerWG.Wait()
r.errChan <- err
closeOnce.Do(func() {
if r.outChan != nil {
close(r.outChan)
}
})
r.wg.Done()
}()
for in := range r.inChan {
input := in
func (r *Runner) work() {
defer func() {
// wait for workers to end
r.wg.Wait()
// indicate work is done
r.cancel()
// close outChan to indicate followers
if r.outChan != nil {
close(r.outChan)
}
}()

for {
select {
case <-r.ctx.Done():
return
case input, ok := <-r.inChan:
if !ok {
return
}
r.limiter <- struct{}{}
workerWG.Add(1)
inputCtx, inputCancel := context.WithCancel(r.ctx)
inputCtx = context.WithValue(inputCtx, InputKey{}, input)
if r.timeout > 0 {
inputCtx, inputCancel = context.WithTimeout(inputCtx, r.timeout)
}
r.wg.Add(1)
go func() {
defer func() {
<-r.limiter
workerWG.Done()
r.wg.Done()
inputCancel()
}()
if err := r.afterFunc(r.ctx, r.workFunc(input, r.outChan)); err != nil {
r.once.Do(func() {
r.errChan <- err
r.cancel()
})
}
r.afterFunc(inputCtx, r.workFunc(inputCtx, input, r.outChan))
}()
}
}()
}
}

0 comments on commit f5ea411

Please sign in to comment.