Skip to content

Commit

Permalink
Improve worker pool scaling logic and update README
Browse files Browse the repository at this point in the history
- Update the worker pool scaling logic to consider the size of the workerStack and the length of the taskQueue.
- Adjust the scaling logic to double the workerStack when the taskQueue length exceeds 75% of the workerStack size, but not exceeding maxWorkers.
- Adjust the scaling logic to halve the workerStack when the taskQueue is empty, but not less than minWorkers.
- Update the README to reflect these changes and improve the feature list format.
- Add comments to the code for better readability.

Signed-off-by: Daniel Hu <tao.hu@merico.dev>
  • Loading branch information
daniel-hutao committed Jul 25, 2023
1 parent 4974182 commit c219d43
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 15 deletions.
20 changes: 10 additions & 10 deletions README.md
Expand Up @@ -6,25 +6,25 @@ GoPool is a high-performance, feature-rich, and easy-to-use worker pool library

## Features

- **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue.
- [x] **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue.

- **Dynamic Worker Adjustment**: GoPool can dynamically adjust the number of workers based on the number of tasks and system load.
- [x] **Concurrency Control**: GoPool can control the number of concurrent tasks to prevent system overload.

- **Graceful Shutdown**: GoPool can shut down gracefully. It stops accepting new tasks and waits for all ongoing tasks to complete before shutting down when there are no more tasks or a shutdown signal is received.
- [x] **Dynamic Worker Adjustment**: GoPool can dynamically adjust the number of workers based on the number of tasks and system load.

- **Task Error Handling**: GoPool can handle errors that occur during task execution.
- [x] **Graceful Shutdown**: GoPool can shut down gracefully. It stops accepting new tasks and waits for all ongoing tasks to complete before shutting down when there are no more tasks or a shutdown signal is received.

- **Task Timeout Handling**: GoPool can handle task execution timeouts. If a task is not completed within the specified timeout period, the task is considered failed and a timeout error is returned.
- [x] **Task Error Handling**: GoPool can handle errors that occur during task execution.

- **Task Priority**: GoPool supports task priority. Tasks with higher priority are processed first.
- [x] **Task Timeout Handling**: GoPool can handle task execution timeouts. If a task is not completed within the specified timeout period, the task is considered failed and a timeout error is returned.

- **Task Result Retrieval**: GoPool provides a way to retrieve task results.
- [x] **Task Result Retrieval**: GoPool provides a way to retrieve task results.

- **Task Retry**: GoPool provides a retry mechanism for failed tasks.
- [x] **Task Retry**: GoPool provides a retry mechanism for failed tasks.

- **Concurrency Control**: GoPool can control the number of concurrent tasks to prevent system overload.
- [x] **Lock Customization**: GoPool supports different types of locks. You can use the built-in `sync.Mutex` or a custom lock such as `spinlock.SpinLock`.

- **Lock Customization**: GoPool supports different types of locks. You can use the built-in `sync.Mutex` or a custom lock such as `spinlock.SpinLock`.
- [ ] **Task Priority**: GoPool supports task priority. Tasks with higher priority are processed first.

## Installation

Expand Down
22 changes: 17 additions & 5 deletions gopool.go
Expand Up @@ -12,24 +12,32 @@ type task func() (interface{}, error)
// goPool represents a pool of workers.
type goPool struct {
workers []*worker
workerStack []int
maxWorkers int
// Set by WithMinWorkers(), used to adjust the number of workers. Default equals to maxWorkers.
minWorkers int
workerStack []int
// tasks are added to this channel first, then dispatched to workers. Default buffer size is 1 million.
taskQueue chan task
// Set by WithRetryCount(), used to retry a task when it fails. Default is 0.
retryCount int
lock sync.Locker
cond *sync.Cond
// Set by WithTimeout(), used to set a timeout for a task. Default is 0, which means no timeout.
timeout time.Duration
// Set by WithResultCallback(), used to handle the result of a task. Default is nil.
resultCallback func(interface{})
// Set by WithErrorCallback(), used to handle the error of a task. Default is nil.
errorCallback func(error)
// adjustInterval is the interval to adjust the number of workers. Default is 1 second.
adjustInterval time.Duration
}

// NewGoPool creates a new pool of workers.
func NewGoPool(maxWorkers int, opts ...Option) *goPool {
pool := &goPool{
maxWorkers: maxWorkers,
minWorkers: maxWorkers, // Set minWorkers to maxWorkers by default
// Set minWorkers to maxWorkers by default
minWorkers: maxWorkers,
workers: make([]*worker, maxWorkers),
workerStack: make([]int, maxWorkers),
taskQueue: make(chan task, 1e6),
Expand All @@ -38,12 +46,14 @@ func NewGoPool(maxWorkers int, opts ...Option) *goPool {
timeout: 0,
adjustInterval: 1 * time.Second,
}
// Apply options
for _, opt := range opts {
opt(pool)
}
if pool.cond == nil {
pool.cond = sync.NewCond(pool.lock)
}
// Create workers with the minimum number. Don't use pushWorker() here.
for i := 0; i < pool.minWorkers; i++ {
worker := newWorker()
pool.workers[i] = worker
Expand All @@ -61,7 +71,7 @@ func (p *goPool) AddTask(t task) {
}

// Release stops all workers and releases resources.
func (p *goPool) Release() {
func (p *goPool) Release() {
close(p.taskQueue)
p.cond.L.Lock()
for len(p.workerStack) != p.minWorkers {
Expand Down Expand Up @@ -90,13 +100,14 @@ func (p *goPool) pushWorker(workerIndex int) {
p.cond.Signal()
}

// adjustWorkers adjusts the number of workers according to the number of tasks in the queue.
func (p *goPool) adjustWorkers() {
ticker := time.NewTicker(p.adjustInterval)
defer ticker.Stop()

for range ticker.C {
p.cond.L.Lock()
if len(p.taskQueue) > (p.maxWorkers-p.minWorkers)/2+p.minWorkers && len(p.workerStack) < p.maxWorkers {
if len(p.taskQueue) > len(p.workerStack)*3/4 && len(p.workerStack) < p.maxWorkers {
// Double the number of workers until it reaches the maximum
newWorkers := min(len(p.workerStack)*2, p.maxWorkers) - len(p.workerStack)
for i := 0; i < newWorkers; i++ {
Expand All @@ -105,7 +116,7 @@ func (p *goPool) adjustWorkers() {
p.workerStack = append(p.workerStack, len(p.workers)-1)
worker.start(p, len(p.workers)-1)
}
} else if len(p.taskQueue) < p.minWorkers && len(p.workerStack) > p.minWorkers {
} else if len(p.taskQueue) == 0 && len(p.workerStack) > p.minWorkers {
// Halve the number of workers until it reaches the minimum
removeWorkers := max((len(p.workerStack)-p.minWorkers)/2, p.minWorkers)
p.workers = p.workers[:len(p.workers)-removeWorkers]
Expand All @@ -115,6 +126,7 @@ func (p *goPool) adjustWorkers() {
}
}

// dispatch dispatches tasks to workers.
func (p *goPool) dispatch() {
for t := range p.taskQueue {
p.cond.L.Lock()
Expand Down
8 changes: 8 additions & 0 deletions worker.go
Expand Up @@ -16,6 +16,9 @@ func newWorker() *worker {
}
}

// start starts the worker in a separate goroutine.
// The worker will run tasks from its taskQueue until the taskQueue is closed.
// For the length of the taskQueue is 1, the worker will be pushed back to the pool after executing 1 task.
func (w *worker) start(pool *goPool, workerIndex int) {
go func() {
for t := range w.taskQueue {
Expand All @@ -28,6 +31,8 @@ func (w *worker) start(pool *goPool, workerIndex int) {
}()
}

// executeTask executes a task and returns the result and error.
// If the task fails, it will be retried according to the retryCount of the pool.
func (w *worker) executeTask(t task, pool *goPool) (result interface{}, err error) {
for i := 0; i <= pool.retryCount; i++ {
if pool.timeout > 0 {
Expand All @@ -42,6 +47,7 @@ func (w *worker) executeTask(t task, pool *goPool) (result interface{}, err erro
return
}

// executeTaskWithTimeout executes a task with a timeout and returns the result and error.
func (w *worker) executeTaskWithTimeout(t task, pool *goPool) (result interface{}, err error) {
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), pool.timeout)
Expand Down Expand Up @@ -72,6 +78,8 @@ func (w *worker) executeTaskWithoutTimeout(t task, pool *goPool) (result interfa
return t()
}


// handleResult handles the result of a task.
func (w *worker) handleResult(result interface{}, err error, pool *goPool) {
if err != nil && pool.errorCallback != nil {
pool.errorCallback(err)
Expand Down

0 comments on commit c219d43

Please sign in to comment.