Skip to content

Commit

Permalink
Implement condition variable in worker pool
Browse files Browse the repository at this point in the history
- Added a condition variable to the Pool struct to manage the pool.
- Created a NewPool function to initialize the pool with the condition variable.
- Modified the Submit method to wait if the pool is at capacity before adding a new worker.
- Added a signal to the worker's run method to notify the condition variable when a worker is done.
- Added a broadcast in the Pool's Release method to notify all waiting goroutines when the pool is released.

Signed-off-by: Daniel Hu <tao.hu@merico.dev>
  • Loading branch information
daniel-hutao committed Jul 14, 2023
1 parent aa15268 commit 678a72b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
36 changes: 24 additions & 12 deletions pool.go
Expand Up @@ -13,26 +13,37 @@ type Pool struct {
workers WorkerStack
// lock represents a mutex to protect the workers slice
lock sync.Mutex
// cond represents a condition variable to manage the pool
cond *sync.Cond
}

func NewPool(capacity int) *Pool {
p := &Pool{
capacity: capacity,
workers: new(workerStack),
}
p.cond = sync.NewCond(&p.lock)
return p
}

func (p *Pool) Submit(task Task) {
p.lock.Lock()
defer p.lock.Unlock()

if p.running < p.capacity {
worker := p.workers.Pop()
if worker == nil {
worker = &Worker{
pool: p,
task: make(chan Task),
}
for p.running == p.capacity {
p.cond.Wait()
}

worker := p.workers.Pop()
if worker == nil {
worker = &Worker{
pool: p,
task: make(chan Task),
}
worker.task <- task
worker.run()
p.running++
} else {
// TODO: handle the case when the pool is full
}
worker.task <- task
worker.run()
p.running++
}

func (p *Pool) Running() int {
Expand All @@ -52,4 +63,5 @@ func (p *Pool) Release() {
close(worker.task)
}
p.running = 0
p.cond.Broadcast()
}
1 change: 1 addition & 0 deletions worker.go
Expand Up @@ -25,6 +25,7 @@ func (w *Worker) run() {
w.pool.lock.Lock()
w.pool.workers.Push(w)
w.pool.running--
w.pool.cond.Signal()
w.pool.lock.Unlock()
}
}()
Expand Down

0 comments on commit 678a72b

Please sign in to comment.