Skip to content

Commit

Permalink
Add Functional Options to GoPool and support for different lock types
Browse files Browse the repository at this point in the history
- Changed the type of GoPool's mutex to sync.Locker to support different lock types.
- Added a new Option type and a WithLock function to configure the lock used by GoPool.
- Updated NewGoPool to accept a variadic slice of Option functions and apply them to the GoPool.
- Updated all methods of GoPool that use the mutex to use the new lock.
- Added a dependency on github.com/daniel-hutao/spinlock for spinlock implementation.
- Updated the test cases to include tests for GoPool with sync.Mutex and spinlock.SpinLock.

Signed-off-by: Daniel Hu <tao.hu@merico.dev>
  • Loading branch information
daniel-hutao committed Jul 21, 2023
1 parent de737e9 commit f5de19a
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 47 deletions.
2 changes: 2 additions & 0 deletions go.mod
@@ -1,3 +1,5 @@
module github.com/devchat-ai/gopool

go 1.20

require github.com/daniel-hutao/spinlock v0.1.0
2 changes: 2 additions & 0 deletions go.sum
@@ -0,0 +1,2 @@
github.com/daniel-hutao/spinlock v0.1.0 h1:qk6v2L6mJLUmxzq1eJ5xUIlCh4q0wM+26Qy/KfH5c3U=
github.com/daniel-hutao/spinlock v0.1.0/go.mod h1:KkIAx91Qk/GDks3LcKXTwOJ7CS03ApkAHU4X4nrlwto=
29 changes: 22 additions & 7 deletions gopool.go
Expand Up @@ -11,18 +11,33 @@ type GoPool struct {
MaxWorkers int
workerStack []int
taskQueue chan Task
mutex sync.Mutex
lock sync.Locker
cond *sync.Cond
}

func NewGoPool(maxWorkers int) *GoPool {
type Option func(*GoPool)

func WithLock(lock sync.Locker) Option {
return func(p *GoPool) {
p.lock = lock
p.cond = sync.NewCond(p.lock)
}
}

func NewGoPool(maxWorkers int, opts ...Option) *GoPool {
pool := &GoPool{
MaxWorkers: maxWorkers,
Workers: make([]*Worker, maxWorkers),
workerStack: make([]int, maxWorkers),
taskQueue: make(chan Task, 1e6),
lock: new(sync.Mutex),
}
for _, opt := range opts {
opt(pool)
}
if pool.cond == nil {
pool.cond = sync.NewCond(pool.lock)
}
pool.cond = sync.NewCond(&pool.mutex)
for i := 0; i < maxWorkers; i++ {
worker := newWorker()
pool.Workers[i] = worker
Expand Down Expand Up @@ -52,17 +67,17 @@ func (p *GoPool) Release() {
}

func (p *GoPool) popWorker() int {
p.mutex.Lock()
p.lock.Lock()
workerIndex := p.workerStack[len(p.workerStack)-1]
p.workerStack = p.workerStack[:len(p.workerStack)-1]
p.mutex.Unlock()
p.lock.Unlock()
return workerIndex
}

func (p *GoPool) pushWorker(workerIndex int) {
p.mutex.Lock()
p.lock.Lock()
p.workerStack = append(p.workerStack, workerIndex)
p.mutex.Unlock()
p.lock.Unlock()
p.cond.Signal()
}

Expand Down
113 changes: 73 additions & 40 deletions gopool_test.go
@@ -1,51 +1,84 @@
package gopool

import (
"sync"
"testing"
"time"
"sync"
"testing"
"time"

"github.com/daniel-hutao/spinlock"
)

func TestGoPool(t *testing.T) {
pool := NewGoPool(100)
for i := 0; i < 1000; i++ {
pool.AddTask(func() {
time.Sleep(10 * time.Millisecond)
})
}
pool.Release()
func TestGoPoolWithMutex(t *testing.T) {
pool := NewGoPool(100, WithLock(new(sync.Mutex)))
for i := 0; i < 1000; i++ {
pool.AddTask(func() {
time.Sleep(10 * time.Millisecond)
})
}
pool.Release()
}

func TestGoPoolWithSpinLock(t *testing.T) {
pool := NewGoPool(100, WithLock(new(spinlock.SpinLock)))
for i := 0; i < 1000; i++ {
pool.AddTask(func() {
time.Sleep(10 * time.Millisecond)
})
}
pool.Release()
}

func BenchmarkGoPoolWithMutex(b *testing.B) {
var wg sync.WaitGroup
var taskNum = int(1e6)
pool := NewGoPool(5e4, WithLock(new(sync.Mutex)))

b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(taskNum)
for num := 0; num < taskNum; num++ {
pool.AddTask(func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
})
}
}
wg.Wait()
b.StopTimer()
pool.Release()
}

func BenchmarkGoPool(b *testing.B) {
var wg sync.WaitGroup
var taskNum = int(1e6)
pool := NewGoPool(1e4)

b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(taskNum)
for num := 0; num < taskNum; num++ {
pool.AddTask(func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
})
}
}
wg.Wait()
// pool.Release()
func BenchmarkGoPoolWithSpinLock(b *testing.B) {
var wg sync.WaitGroup
var taskNum = int(1e6)
pool := NewGoPool(5e4, WithLock(new(spinlock.SpinLock)))

b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(taskNum)
for num := 0; num < taskNum; num++ {
pool.AddTask(func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
})
}
}
wg.Wait()
b.StopTimer()
pool.Release()
}

func BenchmarkGoroutines(b *testing.B) {
var wg sync.WaitGroup
var taskNum = int(1e6)

for i := 0; i < b.N; i++ {
wg.Add(taskNum)
for num := 0; num < taskNum; num++ {
go func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
}()
}
}
var wg sync.WaitGroup
var taskNum = int(1e6)

for i := 0; i < b.N; i++ {
wg.Add(taskNum)
for num := 0; num < taskNum; num++ {
go func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
}()
}
}
}

0 comments on commit f5de19a

Please sign in to comment.