Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add NewWithContext #44

Merged
merged 1 commit into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,20 @@ type WorkerPool struct {
// New creates a new pool of workers where at most n workers process submitted
// tasks concurrently. New panics if n ≤ 0.
func New(n int) *WorkerPool {
return NewWithContext(context.Background(), n)
}

// NewWithContext creates a new pool of workers where at most n workers process submitted
// tasks concurrently. New panics if n ≤ 0. The context is used as the parent context to the context of the task func passed to Submit.
func NewWithContext(ctx context.Context, n int) *WorkerPool {
if n <= 0 {
panic(fmt.Sprintf("workerpool.New: n must be > 0, got %d", n))
}
wp := &WorkerPool{
workers: make(chan struct{}, n),
tasks: make(chan *task),
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
wp.cancel = cancel
go wp.run(ctx)
return wp
Expand Down
57 changes: 57 additions & 0 deletions workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,60 @@ func TestWorkerPoolClose(t *testing.T) {
}
wg.Wait() // all routines should have returned
}

func TestWorkerPoolNewWithContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
n := runtime.NumCPU()
wp := workerpool.NewWithContext(ctx, n)

// working is written to by each task as soon as possible.
working := make(chan struct{})
var wg sync.WaitGroup
// Create n tasks waiting on the context to be cancelled.
wg.Add(n)
for i := 0; i < n; i++ {
id := fmt.Sprintf("task #%2d", i)
err := wp.Submit(id, func(ctx context.Context) error {
working <- struct{}{}
<-ctx.Done()
wg.Done()
return ctx.Err()
})
if err != nil {
t.Errorf("failed to submit task '%s': %v", id, err)
}
}

// ensure n workers are busy
for i := 0; i < n; i++ {
<-working
}

// cancel the parent context, which should complete all tasks.
cancel()
wg.Wait()

// Submitting a task once the parent context has been cancelled should
// succeed and give a cancelled context to the task. This is not ideal and
// might change in the future.
Comment on lines +438 to +440
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the problem with this? Why is it not ideal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO ideally Submit should return an error when the parent context is cancelled, because what would do the task do when given an already cancelled context?

To do that, Submit would have to check the context status but I didn't feel like keeping a context reference into the workerpool struct in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. Submit should submit tasks for processing and not deal with the context. If the context has been canceled by calling Close(), then Submit already returns an error. In other cases, then it's the parent context that is provided by the user that has been canceled and it should be the responsibility of the caller to correctly handle this case. Notably, how to handle the context in the task func is the responsibility of the caller.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel bad for the task to be given a cancelled context before it could do anything, but ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the context, that is provided by the caller, is cancelled, don't you think the caller should stop submitting tasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. If the caller stop submitting tasks as soon as the context is cancelled, then what happen on Submit doesn't matter. That's why I think this PR implementation is an acceptable behavior.

But if — for whatever reason — the caller still Submit when the context is cancelled, it would be ideal to shortcut and return an error. I think debugging would be easier if it was the case.

wg.Add(1)
id := "last"
err := wp.Submit(id, func(ctx context.Context) error {
defer wg.Done()
select {
case <-ctx.Done():
default:
t.Errorf("last task expected context to be cancelled")
}
return nil
})
if err != nil {
t.Errorf("failed to submit task '%s': %v", id, err)
}

wg.Wait()

if err := wp.Close(); err != nil {
t.Errorf("close: got '%v', want no error", err)
}
}