From 678a72b321924305249eb98d9d68bd460fa5b64f Mon Sep 17 00:00:00 2001 From: Daniel Hu Date: Fri, 14 Jul 2023 15:25:19 +0800 Subject: [PATCH] Implement condition variable in worker pool - Added a condition variable to the Pool struct to manage the pool. - Created a NewPool function to initialize the pool with the condition variable. - Modified the Submit method to wait if the pool is at capacity before adding a new worker. - Added a signal to the worker's run method to notify the condition variable when a worker is done. - Added a broadcast in the Pool's Release method to notify all waiting goroutines when the pool is released. Signed-off-by: Daniel Hu --- pool.go | 36 ++++++++++++++++++++++++------------ worker.go | 1 + 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/pool.go b/pool.go index 02721cf..14887a4 100644 --- a/pool.go +++ b/pool.go @@ -13,26 +13,37 @@ type Pool struct { workers WorkerStack // lock represents a mutex to protect the workers slice lock sync.Mutex + // cond represents a condition variable to manage the pool + cond *sync.Cond +} + +func NewPool(capacity int) *Pool { + p := &Pool{ + capacity: capacity, + workers: new(workerStack), + } + p.cond = sync.NewCond(&p.lock) + return p } func (p *Pool) Submit(task Task) { p.lock.Lock() defer p.lock.Unlock() - if p.running < p.capacity { - worker := p.workers.Pop() - if worker == nil { - worker = &Worker{ - pool: p, - task: make(chan Task), - } + for p.running == p.capacity { + p.cond.Wait() + } + + worker := p.workers.Pop() + if worker == nil { + worker = &Worker{ + pool: p, + task: make(chan Task), } - worker.task <- task - worker.run() - p.running++ - } else { - // TODO: handle the case when the pool is full } + worker.task <- task + worker.run() + p.running++ } func (p *Pool) Running() int { @@ -52,4 +63,5 @@ func (p *Pool) Release() { close(worker.task) } p.running = 0 + p.cond.Broadcast() } diff --git a/worker.go b/worker.go index 76cce6a..c7503b3 100644 --- a/worker.go +++ b/worker.go @@ -25,6 +25,7 @@ func (w *Worker) run() { w.pool.lock.Lock() w.pool.workers.Push(w) w.pool.running-- + w.pool.cond.Signal() w.pool.lock.Unlock() } }()