Skip to content

Commit

Permalink
Merge pull request #28 from donutloop/opt/fifo
Browse files Browse the repository at this point in the history
schedule: optimized spawning of goroutines
  • Loading branch information
donutloop committed Oct 7, 2017
2 parents de9cf5b + d2f33e3 commit 9e1642a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 19 deletions.
63 changes: 45 additions & 18 deletions schedule/schedule.go
Expand Up @@ -110,6 +110,8 @@ func (f *Fifo) run() {
close(f.resume)
}()

async := newAsync(f.ctx, f.PanicHandler)

for {
var job Job
f.mu.Lock()
Expand All @@ -128,15 +130,13 @@ func (f *Fifo) run() {
f.mu.Unlock()
// clean up pending jobs
for _, job := range pendings {
done := asyncDo(f.ctx, f.PanicHandler, job)
<-done
async.Do(job)
}
async.Close()
return
}
} else {
done := asyncDo(f.ctx, f.PanicHandler, job)
<-done

async.Do(job)
f.finishCond.L.Lock()
f.finished++
f.pendings = f.pendings[1:]
Expand All @@ -146,18 +146,45 @@ func (f *Fifo) run() {
}
}

// AsyncDo is a basic promise implementation: it wraps calls a function in a goroutine
func asyncDo(ctx context.Context, panicHandler func(stack DebugStack), f func(ctx context.Context)) <-chan struct{} {
ch := make(chan struct{}, 1)
go func(ch chan struct{}, ctx context.Context, panicHandler func(stack DebugStack)) {
defer func() {
if err := recover(); err != nil {
panicHandler(DebugStack(debug.Stack()))
}
}()
func newAsync(ctx context.Context, panicHandler func(stack DebugStack)) *async {
a := &async{
Ctx: ctx,
PanicHandler: panicHandler,
Jobs: make(chan func(ctx context.Context)),
}
a.init()
return a
}

type async struct {
Ctx context.Context
Jobs chan func(ctx context.Context)
PanicHandler func(stack DebugStack)
}

func (a *async) init() {
go func(jobs chan func(ctx context.Context), ctx context.Context, panicHandler func(stack DebugStack)) {
for job := range jobs {
do(job, ctx, panicHandler)
}
}(a.Jobs, a.Ctx, a.PanicHandler)
}

func (a *async) Do(f func(ctx context.Context)) {
a.Jobs <- f
}

func (a *async) Close() {
close(a.Jobs)
}

func do(job func(ctx context.Context), ctx context.Context, panicHandler func(stack DebugStack)) {

defer func() {
if err := recover(); err != nil {
panicHandler(DebugStack(debug.Stack()))
}
}()

f(ctx)
ch <- struct{}{}
}(ch, ctx, panicHandler)
return ch
job(ctx)
}
26 changes: 25 additions & 1 deletion schedule/schedule_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"github.com/donutloop/toolkit/schedule"
"testing"
"time"
)

func TestFIFOSchedule(t *testing.T) {
Expand Down Expand Up @@ -40,6 +41,30 @@ func TestFIFOSchedule(t *testing.T) {
}
}

func TestFIFOScheduleCtx(t *testing.T) {
s := schedule.NewFIFOScheduler()

s.Schedule(func(ctx context.Context) {
<-time.After(250 * time.Millisecond)
err := ctx.Err()
if err == nil {
t.Fatal("unexpected nil error")
}

expectedErrorMessage := "context canceled"
if err.Error() != expectedErrorMessage {
t.Fatal(err)
}
})

s.Stop()

expectedJobCount := 1
if s.Scheduled() != expectedJobCount {
t.Fatalf("scheduled (actual: %d, expected: %d)", s.Scheduled(), expectedJobCount)
}
}

func BenchmarkFIFOSchedule(b *testing.B) {
for n := 0; n < b.N; n++ {
s := schedule.NewFIFOScheduler()
Expand Down Expand Up @@ -69,5 +94,4 @@ func BenchmarkFIFOSchedule(b *testing.B) {
}
s.Stop()
}

}

0 comments on commit 9e1642a

Please sign in to comment.