Skip to content

Commit

Permalink
Refactor goroutine pool to use context for graceful exit
Browse files Browse the repository at this point in the history
- Replace `exitChan` with `context.Context` and `context.CancelFunc` in `goPool` struct.
- Create a cancellable context in `NewGoPool` function and store the context and cancel function in `goPool` struct.
- Call the cancel function in `Release` method to notify all goroutines to exit.
- Listen to the Done channel of the context in `adjustWorkers` method to exit the goroutine when the context is cancelled.

Signed-off-by: Daniel Hu <tao.hu@merico.dev>
  • Loading branch information
daniel-hutao committed Aug 9, 2023
1 parent 1d136d7 commit 2ea0ed3
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions gopool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gopool

import (
"context"
"sync"
"time"
)
Expand Down Expand Up @@ -30,12 +31,14 @@ type goPool struct {
errorCallback func(error)
// adjustInterval is the interval to adjust the number of workers. Default is 1 second.
adjustInterval time.Duration
// exitChan is used to notify the adjustWorkers goroutine to exit.
exitChan chan struct{}
ctx context.Context
// cancel is used to cancel the context. It is called when Release() is called.
cancel context.CancelFunc
}

// NewGoPool creates a new pool of workers.
func NewGoPool(maxWorkers int, opts ...Option) *goPool {
ctx, cancel := context.WithCancel(context.Background())
pool := &goPool{
maxWorkers: maxWorkers,
// Set minWorkers to maxWorkers by default
Expand All @@ -47,7 +50,8 @@ func NewGoPool(maxWorkers int, opts ...Option) *goPool {
lock: new(sync.Mutex),
timeout: 0,
adjustInterval: 1 * time.Second,
exitChan: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}
// Apply options
for _, opt := range opts {
Expand Down Expand Up @@ -83,7 +87,7 @@ func (p *goPool) Wait() {
// Release stops all workers and releases resources.
func (p *goPool) Release() {
close(p.taskQueue)
close(p.exitChan)
p.cancel()
p.cond.L.Lock()
for len(p.workerStack) != p.minWorkers {
p.cond.Wait()
Expand Down Expand Up @@ -136,7 +140,7 @@ func (p *goPool) adjustWorkers() {
p.workerStack = p.workerStack[:len(p.workerStack)-removeWorkers]
}
p.cond.L.Unlock()
case <-p.exitChan:
case <-p.ctx.Done():
return
}
}
Expand Down

0 comments on commit 2ea0ed3

Please sign in to comment.