Skip to content

Commit

Permalink
Refactor GoPool and update README.md
Browse files Browse the repository at this point in the history
- Added comments to all public methods.
- Changed public methods and attributes to private where necessary.
- Moved Option related code to a new source file.
- Updated README.md to include information about Dynamic Worker Adjustment.

Signed-off-by: Daniel Hu <tao.hu@merico.dev>
  • Loading branch information
daniel-hutao committed Jul 24, 2023
1 parent 0aa3f68 commit 758b86d
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 56 deletions.
28 changes: 28 additions & 0 deletions README.md
Expand Up @@ -84,6 +84,34 @@ func main() {
}
```

## Dynamic Worker Adjustment

GoPool supports dynamic worker adjustment. This means that the number of workers in the pool can increase or decrease based on the number of tasks in the queue. This feature can be enabled by setting the MinWorkers option when creating the pool.

Here is an example of how to use GoPool with dynamic worker adjustment:

```go
package main

import (
"time"

"github.com/devchat-ai/gopool"
)

func main() {
pool := gopool.NewGoPool(100, gopool.WithMinWorkers(50))
for i := 0; i < 1000; i++ {
pool.AddTask(func() {
time.Sleep(10 * time.Millisecond)
})
}
pool.Release()
}
```

In this example, the pool starts with 50 workers. If the number of tasks in the queue exceeds (MaxWorkers - MinWorkers) / 2 + MinWorkers, the pool will add more workers. If the number of tasks in the queue is less than MinWorkers, the pool will remove some workers.

## Performance Testing

We have conducted several performance tests to evaluate the efficiency and performance of GoPool. Here are the results:
Expand Down
84 changes: 37 additions & 47 deletions gopool.go
Expand Up @@ -4,40 +4,28 @@ import (
"sync"
)

type Task func()
// Task represents a function that will be executed by a worker.
type task func()

type GoPool struct {
Workers []*Worker
MaxWorkers int
MinWorkers int
// goPool represents a pool of workers.
type goPool struct {
workers []*worker
maxWorkers int
minWorkers int
workerStack []int
taskQueue chan Task
taskQueue chan task
lock sync.Locker
cond *sync.Cond
}

type Option func(*GoPool)

func WithLock(lock sync.Locker) Option {
return func(p *GoPool) {
p.lock = lock
p.cond = sync.NewCond(p.lock)
}
}

func WithMinWorkers(minWorkers int) Option {
return func(p *GoPool) {
p.MinWorkers = minWorkers
}
}

func NewGoPool(maxWorkers int, opts ...Option) *GoPool {
pool := &GoPool{
MaxWorkers: maxWorkers,
MinWorkers: maxWorkers, // Set MinWorkers to MaxWorkers by default
Workers: make([]*Worker, maxWorkers),
// NewGoPool creates a new pool of workers.
func NewGoPool(maxWorkers int, opts ...Option) *goPool {
pool := &goPool{
maxWorkers: maxWorkers,
minWorkers: maxWorkers, // Set minWorkers to maxWorkers by default
workers: make([]*worker, maxWorkers),
workerStack: make([]int, maxWorkers),
taskQueue: make(chan Task, 1e6),
taskQueue: make(chan task, 1e6),
lock: new(sync.Mutex),
}
for _, opt := range opts {
Expand All @@ -46,65 +34,67 @@ func NewGoPool(maxWorkers int, opts ...Option) *GoPool {
if pool.cond == nil {
pool.cond = sync.NewCond(pool.lock)
}
for i := 0; i < pool.MinWorkers; i++ {
for i := 0; i < pool.minWorkers; i++ {
worker := newWorker()
pool.Workers[i] = worker
pool.workers[i] = worker
pool.workerStack[i] = i
worker.start(pool, i)
}
go pool.dispatch()
return pool
}

func (p *GoPool) AddTask(task Task) {
p.taskQueue <- task
// AddTask adds a task to the pool.
func (p *goPool) AddTask(t task) {
p.taskQueue <- t
}

func (p *GoPool) Release() {
// Release stops all workers and releases resources.
func (p *goPool) Release() {
close(p.taskQueue)
p.cond.L.Lock()
for len(p.workerStack) != p.MaxWorkers {
for len(p.workerStack) != p.maxWorkers {
p.cond.Wait()
}
p.cond.L.Unlock()
for _, worker := range p.Workers {
close(worker.TaskQueue)
for _, worker := range p.workers {
close(worker.taskQueue)
}
p.Workers = nil
p.workers = nil
p.workerStack = nil
}

func (p *GoPool) popWorker() int {
func (p *goPool) popWorker() int {
p.lock.Lock()
workerIndex := p.workerStack[len(p.workerStack)-1]
p.workerStack = p.workerStack[:len(p.workerStack)-1]
p.lock.Unlock()
return workerIndex
}

func (p *GoPool) pushWorker(workerIndex int) {
func (p *goPool) pushWorker(workerIndex int) {
p.lock.Lock()
p.workerStack = append(p.workerStack, workerIndex)
p.lock.Unlock()
p.cond.Signal()
}

func (p *GoPool) dispatch() {
for task := range p.taskQueue {
func (p *goPool) dispatch() {
for t := range p.taskQueue {
p.cond.L.Lock()
for len(p.workerStack) == 0 {
p.cond.Wait()
}
p.cond.L.Unlock()
workerIndex := p.popWorker()
p.Workers[workerIndex].TaskQueue <- task
if len(p.taskQueue) > (p.MaxWorkers-p.MinWorkers)/2+p.MinWorkers && len(p.workerStack) < p.MaxWorkers {
p.workers[workerIndex].taskQueue <- t
if len(p.taskQueue) > (p.maxWorkers-p.minWorkers)/2+p.minWorkers && len(p.workerStack) < p.maxWorkers {
worker := newWorker()
p.Workers = append(p.Workers, worker)
p.workerStack = append(p.workerStack, len(p.Workers)-1)
worker.start(p, len(p.Workers)-1)
} else if len(p.taskQueue) < p.MinWorkers && len(p.workerStack) > p.MinWorkers {
p.Workers = p.Workers[:len(p.Workers)-1]
p.workers = append(p.workers, worker)
p.workerStack = append(p.workerStack, len(p.workers)-1)
worker.start(p, len(p.workers)-1)
} else if len(p.taskQueue) < p.minWorkers && len(p.workerStack) > p.minWorkers {
p.workers = p.workers[:len(p.workers)-1]
p.workerStack = p.workerStack[:len(p.workerStack)-1]
}
}
Expand Down
21 changes: 21 additions & 0 deletions option.go
@@ -0,0 +1,21 @@
package gopool

import "sync"

// Option represents an option for the pool.
type Option func(*goPool)

// WithLock sets the lock for the pool.
func WithLock(lock sync.Locker) Option {
return func(p *goPool) {
p.lock = lock
p.cond = sync.NewCond(p.lock)
}
}

// WithMinWorkers sets the minimum number of workers for the pool.
func WithMinWorkers(minWorkers int) Option {
return func(p *goPool) {
p.minWorkers = minWorkers
}
}
19 changes: 10 additions & 9 deletions worker.go
@@ -1,20 +1,21 @@
package gopool

type Worker struct {
TaskQueue chan Task
// worker represents a worker in the pool.
type worker struct {
taskQueue chan task
}

func newWorker() *Worker {
return &Worker{
TaskQueue: make(chan Task, 1),
func newWorker() *worker {
return &worker{
taskQueue: make(chan task, 1),
}
}

func (w *Worker) start(pool *GoPool, workerIndex int) {
func (w *worker) start(pool *goPool, workerIndex int) {
go func() {
for task := range w.TaskQueue {
if task != nil {
task()
for t := range w.taskQueue {
if t != nil {
t()
}
pool.pushWorker(workerIndex)
}
Expand Down

0 comments on commit 758b86d

Please sign in to comment.