Skip to content

Commit

Permalink
add NewWithContext
Browse files Browse the repository at this point in the history
Before this patch, forwarding context cancellation to the workerpool
tasks was difficult and required to spawn a goroutine to call wp.Close
when the parent context was done.

This patch introduce NewWithContext allowing callers to provide a parent
context from which is derived the tasks context, removing the need to
manually forward context cancellation.

Signed-off-by: Alexandre Perrin <alex@isovalent.com>
  • Loading branch information
kaworu committed Feb 8, 2023
1 parent 113f474 commit c57e545
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
8 changes: 7 additions & 1 deletion workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,20 @@ type WorkerPool struct {
// New creates a new pool of workers where at most n workers process submitted
// tasks concurrently. New panics if n ≤ 0.
func New(n int) *WorkerPool {
return NewWithContext(context.Background(), n)
}

// NewWithContext creates a new pool of workers where at most n workers process submitted
// tasks concurrently. New panics if n ≤ 0. The context is used as the parent context to the context of the task func passed to Submit.
func NewWithContext(ctx context.Context, n int) *WorkerPool {
if n <= 0 {
panic(fmt.Sprintf("workerpool.New: n must be > 0, got %d", n))
}
wp := &WorkerPool{
workers: make(chan struct{}, n),
tasks: make(chan *task),
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
wp.cancel = cancel
go wp.run(ctx)
return wp
Expand Down
57 changes: 57 additions & 0 deletions workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,60 @@ func TestWorkerPoolClose(t *testing.T) {
}
wg.Wait() // all routines should have returned
}

func TestWorkerPoolNewWithContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
n := runtime.NumCPU()
wp := workerpool.NewWithContext(ctx, n)

// working is written to by each task as soon as possible.
working := make(chan struct{})
var wg sync.WaitGroup
// Create n tasks waiting on the context to be cancelled.
wg.Add(n)
for i := 0; i < n; i++ {
id := fmt.Sprintf("task #%2d", i)
err := wp.Submit(id, func(ctx context.Context) error {
working <- struct{}{}
<-ctx.Done()
wg.Done()
return ctx.Err()
})
if err != nil {
t.Errorf("failed to submit task '%s': %v", id, err)
}
}

// ensure n workers are busy
for i := 0; i < n; i++ {
<-working
}

// cancel the parent context, which should complete all tasks.
cancel()
wg.Wait()

// Submitting a task once the parent context has been cancelled should
// succeed and give a cancelled context to the task. This is not ideal and
// might change in the future.
wg.Add(1)
id := "last"
err := wp.Submit(id, func(ctx context.Context) error {
defer wg.Done()
select {
case <-ctx.Done():
default:
t.Errorf("last task expected context to be cancelled")
}
return nil
})
if err != nil {
t.Errorf("failed to submit task '%s': %v", id, err)
}

wg.Wait()

if err := wp.Close(); err != nil {
t.Errorf("close: got '%v', want no error", err)
}
}

0 comments on commit c57e545

Please sign in to comment.