From 0793cbad3aba0f22ea459a7e284f9aa6d40517f8 Mon Sep 17 00:00:00 2001 From: longyue0521 Date: Tue, 20 Sep 2022 00:35:23 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E9=87=8D=E6=9E=84TaskPool=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=20#80=20=E4=B8=AD=E8=AE=A8=E8=AE=BA=E7=9A=84=E6=96=B0?= =?UTF-8?q?=E9=9C=80=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: longyue0521 --- pool/task_pool.go | 325 ++++++++++++++++++++---- pool/task_pool_test.go | 549 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 795 insertions(+), 79 deletions(-) diff --git a/pool/task_pool.go b/pool/task_pool.go index 1b54982e..ca0e4e17 100644 --- a/pool/task_pool.go +++ b/pool/task_pool.go @@ -21,6 +21,9 @@ import ( "runtime" "sync" "sync/atomic" + "time" + + "github.com/gotomicro/ekit/bean/option" ) var ( @@ -41,6 +44,8 @@ var ( _ TaskPool = &OnDemandBlockTaskPool{} panicBuffLen = 2048 + + defaultMaxIdleTime = 10 * time.Second ) // TaskPool 任务池 @@ -98,6 +103,43 @@ func (tw *taskWrapper) Run(ctx context.Context) (err error) { return tw.t.Run(ctx) } +type group struct { + mp map[int]int + n int32 + mu sync.RWMutex +} + +func (g *group) isIn(id int) bool { + g.mu.RLock() + defer g.mu.RUnlock() + _, ok := g.mp[id] + return ok +} + +func (g *group) add(id int) { + g.mu.Lock() + defer g.mu.Unlock() + if _, ok := g.mp[id]; !ok { + g.mp[id] = 1 + g.n++ + } +} + +func (g *group) delete(id int) { + g.mu.Lock() + defer g.mu.Unlock() + if _, ok := g.mp[id]; ok { + g.n-- + } + delete(g.mp, id) +} + +func (g *group) size() int32 { + g.mu.RLock() + defer g.mu.RUnlock() + return g.n +} + // OnDemandBlockTaskPool 按需创建goroutine的并发阻塞的任务池 // 任务池使用的 goroutine 是按需创建,并且可以确保不会超过 concurrency 所规定的数量 // 每一个任务都会使用新的 goroutine 来处理,并且任务池本身处理了 panic 的场景 @@ -106,22 +148,37 @@ type OnDemandBlockTaskPool struct { // TaskPool内部状态 state int32 - queue chan Task - token chan struct{} - num int32 - wg sync.WaitGroup + queue chan Task + numGoRunningTasks int32 + + totalNumOfGo int32 + mutex sync.RWMutex + + // 初始协程数 + initGo int32 + // 核心协程数 + coreGo int32 + // 最大协程数 + maxGo int32 + // 超时组 + timeoutGroup *group + // 最大空闲时间 + maxIdleTime time.Duration + // 队列积压率 + queueBacklogRate float64 + shutdownOnce sync.Once // 外部信号 - done chan struct{} + shutdownDone chan struct{} // 内部中断信号 - ctx context.Context - cancelFunc context.CancelFunc + shutdownNowCtx context.Context + shutdownNowCancel context.CancelFunc } // NewOnDemandBlockTaskPool 创建一个新的 OnDemandBlockTaskPool // concurrency 是并发数 // queueSize 是队列大小,即最多有多少个任务在等待调度 -func NewOnDemandBlockTaskPool(concurrency int, queueSize int) (*OnDemandBlockTaskPool, error) { +func NewOnDemandBlockTaskPool(concurrency int, queueSize int, opts ...option.Option[OnDemandBlockTaskPool]) (*OnDemandBlockTaskPool, error) { if concurrency < 1 { return nil, fmt.Errorf("%w:concurrency应该大于0", errInvalidArgument) } @@ -129,15 +186,60 @@ func NewOnDemandBlockTaskPool(concurrency int, queueSize int) (*OnDemandBlockTas return nil, fmt.Errorf("%w:queueSize应该大于等于0", errInvalidArgument) } b := &OnDemandBlockTaskPool{ - queue: make(chan Task, queueSize), - token: make(chan struct{}, concurrency), - done: make(chan struct{}), + queue: make(chan Task, queueSize), + shutdownDone: make(chan struct{}, 1), + initGo: int32(concurrency), + coreGo: int32(concurrency), + maxGo: int32(concurrency), + maxIdleTime: defaultMaxIdleTime, } - b.ctx, b.cancelFunc = context.WithCancel(context.Background()) + + b.shutdownNowCtx, b.shutdownNowCancel = context.WithCancel(context.Background()) atomic.StoreInt32(&b.state, stateCreated) + + option.Apply(b, opts...) + + if b.coreGo != int32(concurrency) && b.maxGo == int32(concurrency) { + b.maxGo = b.coreGo + } else if b.coreGo == int32(concurrency) && b.maxGo != int32(concurrency) { + b.coreGo = b.maxGo + } + if !(b.initGo <= b.coreGo && b.coreGo <= b.maxGo) { + return nil, fmt.Errorf("%w : 需要满足concurrency <= coreGo <= maxGo条件", errInvalidArgument) + } + + b.timeoutGroup = &group{mp: make(map[int]int)} + + if b.queueBacklogRate < float64(0) || float64(1) < b.queueBacklogRate { + return nil, fmt.Errorf("%w :queueBacklogRate合法范围为[0,1.0]", errInvalidArgument) + } return b, nil } +func WithQueueBacklogRate(rate float64) option.Option[OnDemandBlockTaskPool] { + return func(pool *OnDemandBlockTaskPool) { + pool.queueBacklogRate = rate + } +} + +func WithCoreGo(n int32) option.Option[OnDemandBlockTaskPool] { + return func(pool *OnDemandBlockTaskPool) { + pool.coreGo = n + } +} + +func WithMaxGo(n int32) option.Option[OnDemandBlockTaskPool] { + return func(pool *OnDemandBlockTaskPool) { + pool.maxGo = n + } +} + +func WithMaxIdleTime(d time.Duration) option.Option[OnDemandBlockTaskPool] { + return func(pool *OnDemandBlockTaskPool) { + pool.maxIdleTime = d + } +} + // Submit 提交一个任务 // 如果此时队列已满,那么将会阻塞调用者。 // 如果因为 ctx 的原因返回,那么将会返回 ctx.Err() @@ -211,54 +313,187 @@ func (b *OnDemandBlockTaskPool) Start() error { } if atomic.CompareAndSwapInt32(&b.state, stateCreated, stateRunning) { - go b.schedulingTasks() + go b.scheduling() return nil } } } -// Schedule tasks -func (b *OnDemandBlockTaskPool) schedulingTasks() { - defer close(b.token) +func (b *OnDemandBlockTaskPool) scheduling() { + + id := 0 + + b.increaseTotalNumOfGo(b.initGo) + for i := int32(0); i < b.initGo; i++ { + go b.goroutine(id) + id++ + } for { + select { - case <-b.ctx.Done(): + case <-b.shutdownNowCtx.Done(): + // log.Println("Loop ShudownNow") return - case b.token <- struct{}{}: + case <-b.shutdownDone: + // log.Println("Loop Shudown") + return + default: - task, ok := <-b.queue + b.mutex.RLock() + + if b.totalNumOfGo == b.maxGo { + b.mutex.RUnlock() + continue + } + + allGoShouldBeBusy := atomic.LoadInt32(&b.numGoRunningTasks) == b.totalNumOfGo + if !allGoShouldBeBusy { + b.mutex.RUnlock() + continue + } + + rate := float64(len(b.queue)) / float64(cap(b.queue)) + if rate == 0 || rate < b.queueBacklogRate { + // log.Println("rate == 0", rate == 0, "rate", rate, " < ", b.queueBacklogRate) + b.mutex.RUnlock() + continue + } + + // time.Sleep(time.Second) + // log.Println("totalNumOfGo", b.totalNumOfGo) + + var n int32 + + // b.queueBacklogRate合法范围[0,1] + // 当b.queueBacklogRate = 0时,直接开n个 + // 当b.queueBacklogRate在(0, 1]区间时,每次开一个 + if 0 < b.queueBacklogRate && b.queueBacklogRate <= 1 { + n = 1 + } else if b.initGo <= b.totalNumOfGo && b.totalNumOfGo < b.coreGo { + n = b.coreGo - b.totalNumOfGo + } else if b.coreGo <= b.totalNumOfGo && b.totalNumOfGo < b.maxGo { + n = b.maxGo - b.totalNumOfGo + } + + // log.Println("开协程", n, "max-id", id+int(n-1), "totalNumOfGo", b.totalNumOfGo+n) + b.mutex.RUnlock() + + b.increaseTotalNumOfGo(n) + for i := int32(0); i < n; i++ { + go b.goroutine(id) + id++ + } + + } + } +} + +func (b *OnDemandBlockTaskPool) goroutine(id int) { + + // 刚启动的协程除非恰巧赶上Shutdown/ShutdownNow被调用,否则应该至少执行一个task + idleTimer := time.NewTimer(0) + if !idleTimer.Stop() { + <-idleTimer.C + } + + for { + // log.Println("id", id, "working for loop") + select { + case <-b.shutdownNowCtx.Done(): + // log.Printf("id %d shutdownNow, timeoutGroup.Size=%d left\n", id, b.timeoutGroup.size()) + b.decreaseTotalNumOfGo(1) + return + case <-idleTimer.C: + b.mutex.Lock() + b.totalNumOfGo-- + b.timeoutGroup.delete(id) + // log.Printf("id %d timeout, timeoutGroup.Size=%d left\n", id, b.timeoutGroup.size()) + b.mutex.Unlock() + return + case task, ok := <-b.queue: + + // log.Println("id", id, "running tasks") + if b.timeoutGroup.isIn(id) { + // timer只保证至少在等待X时间后才发送信号而不是在X时间内发送信号 + b.timeoutGroup.delete(id) + // timer的Stop方法不保证一定成功 + // 不加判断并将信号清除可能会导致协程下次在case<-idleTimer.C处退出 + if !idleTimer.Stop() { + <-idleTimer.C + } + // log.Println("id", id, "out timeoutGroup") + } + + atomic.AddInt32(&b.numGoRunningTasks, 1) if !ok { - // 调用Shutdown后,TaskPool处于Closing状态 - if atomic.CompareAndSwapInt32(&b.state, stateClosing, stateStopped) { - // 等待运行中的Task自然结束 - b.wg.Wait() - // 通知外部调用者 - close(b.done) + // b.numGoRunningTasks > 1表示虽然当前协程监听到了b.queue关闭但还有其他协程运行task,当前协程自己退出就好 + // b.numGoRunningTasks == 1表示只有当前协程"运行task"中,其他协程在一定在"拿到b.queue到已关闭",这一信号的路上 + // 绝不会处于运行task中 + if atomic.CompareAndSwapInt32(&b.numGoRunningTasks, 1, 0) && atomic.LoadInt32(&b.state) == stateClosing { + // 在b.queue关闭后,第一个检测到全部task已经自然结束的协程 + b.shutdownOnce.Do(func() { + // 状态迁移 + atomic.CompareAndSwapInt32(&b.state, stateClosing, stateStopped) + // 显示通知外部调用者 + b.shutdownDone <- struct{}{} + close(b.shutdownDone) + }) + + b.decreaseTotalNumOfGo(1) + return } + + // 有其他协程运行task中,自己退出就好。 + atomic.AddInt32(&b.numGoRunningTasks, -1) + b.decreaseTotalNumOfGo(1) return } - b.wg.Add(1) - atomic.AddInt32(&b.num, 1) + // todo handle error + _ = task.Run(b.shutdownNowCtx) + atomic.AddInt32(&b.numGoRunningTasks, -1) + + b.mutex.Lock() + // log.Println("id", id, "totalNumOfGo-mem", b.totalNumOfGo-b.timeoutGroup.size(), "totalNumOfGo", b.totalNumOfGo, "mem", b.timeoutGroup.size()) + if b.coreGo < b.totalNumOfGo && (len(b.queue) == 0 || int32(len(b.queue)) < b.totalNumOfGo) { + // 协程在(核心,最大]区间 + // 如果没有任务可以执行,或者被判定为可能抢不到任务的协程直接退出 + // 一定要在此处减1才能让此刻等待在mutex上的其他协程被正确地分区 + b.totalNumOfGo-- + // log.Println("id", id, "exits....") + b.mutex.Unlock() + return + } - go func() { - defer func() { - atomic.AddInt32(&b.num, -1) - b.wg.Done() - <-b.token - }() + if b.initGo < b.totalNumOfGo-b.timeoutGroup.size() /* && len(b.queue) == 0 */ { + // log.Println("id", id, "initGo", b.initGo, "totalNumOfGo-mem", b.totalNumOfGo-b.timeoutGroup.size(), "totalNumOfGo", b.totalNumOfGo) + // 协程在(初始,核心]区间,如果没有任务可以执行,重置计时器 + // 当len(b.queue) != 0时,即便协程属于(核心,最大]区间,其实也要给一个定时器 + // 因为现在看队列中有任务,等真去拿的时候可能恰好没任务,那么此时常驻协程(初始协程数/initGo)就会暂时增加 + // 直到队列再次有任务时才可能将协程数降至初始协程数,因为注释掉了len(b.queue) == 0判断条件 + idleTimer = time.NewTimer(b.maxIdleTime) + b.timeoutGroup.add(id) + // log.Println("id", id, "add timeoutGroup", "size", b.timeoutGroup.size()) + } - // todo: handle err - err := task.Run(b.ctx) - if err != nil { - return - } - }() + b.mutex.Unlock() } } } +func (b *OnDemandBlockTaskPool) increaseTotalNumOfGo(n int32) { + b.mutex.Lock() + b.totalNumOfGo += n + b.mutex.Unlock() +} + +func (b *OnDemandBlockTaskPool) decreaseTotalNumOfGo(n int32) { + b.mutex.Lock() + b.totalNumOfGo -= n + b.mutex.Unlock() +} + // Shutdown 将会拒绝提交新的任务,但是会继续执行已提交任务 // 当执行完毕后,会往返回的 chan 中丢入信号 // Shutdown 会负责关闭返回的 chan @@ -286,7 +521,7 @@ func (b *OnDemandBlockTaskPool) Shutdown() (<-chan struct{}, error) { // 先关闭等待队列不再允许提交 // 同时任务调度循环能够通过b.queue是否被关闭来终止循环 close(b.queue) - return b.done, nil + return b.shutdownDone, nil } } @@ -316,7 +551,7 @@ func (b *OnDemandBlockTaskPool) ShutdownNow() ([]Task, error) { close(b.queue) // 发送中断信号,中断任务启动循环 - b.cancelFunc() + b.shutdownNowCancel() // 清空队列并保存 tasks := make([]Task, 0, len(b.queue)) @@ -338,6 +573,10 @@ func (b *OnDemandBlockTaskPool) internalState() int32 { } } -func (b *OnDemandBlockTaskPool) NumGo() int32 { - return atomic.LoadInt32(&b.num) +func (b *OnDemandBlockTaskPool) numOfGo() int32 { + var n int32 + b.mutex.RLock() + n = b.totalNumOfGo + b.mutex.RUnlock() + return n } diff --git a/pool/task_pool_test.go b/pool/task_pool_test.go index 8f4a643d..ca2fdbdd 100644 --- a/pool/task_pool_test.go +++ b/pool/task_pool_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/gotomicro/ekit/bean/option" "github.com/stretchr/testify/assert" "golang.org/x/sync/errgroup" ) @@ -66,6 +67,84 @@ func TestOnDemandBlockTaskPool_In_Created_State(t *testing.T) { pool, err = NewOnDemandBlockTaskPool(1, 1) assert.NoError(t, err) assert.NotNil(t, pool) + + t.Run("With Options", func(t *testing.T) { + t.Parallel() + + concurrency := 10 + pool, err := NewOnDemandBlockTaskPool(concurrency, 10) + assert.NoError(t, err) + + assert.Equal(t, int32(concurrency), pool.initGo) + assert.Equal(t, int32(concurrency), pool.coreGo) + assert.Equal(t, int32(concurrency), pool.maxGo) + assert.Equal(t, defaultMaxIdleTime, pool.maxIdleTime) + + coreGo, maxGo, maxIdleTime := int32(20), int32(30), 10*time.Second + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) + assert.NoError(t, err) + + assert.Equal(t, int32(concurrency), pool.initGo) + assert.Equal(t, coreGo, pool.coreGo) + assert.Equal(t, maxGo, pool.maxGo) + assert.Equal(t, maxIdleTime, pool.maxIdleTime) + + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo)) + assert.NoError(t, err) + assert.Equal(t, pool.coreGo, pool.maxGo) + + concurrency, coreGo = 30, 20 + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo)) + assert.Nil(t, pool) + assert.ErrorIs(t, err, errInvalidArgument) + + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithMaxGo(maxGo)) + assert.NoError(t, err) + assert.Equal(t, pool.maxGo, pool.coreGo) + + concurrency, maxGo = 30, 10 + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithMaxGo(maxGo)) + assert.Nil(t, pool) + assert.ErrorIs(t, err, errInvalidArgument) + + concurrency, coreGo, maxGo = 30, 20, 10 + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + assert.Nil(t, pool) + assert.ErrorIs(t, err, errInvalidArgument) + + concurrency, coreGo, maxGo = 30, 10, 20 + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + assert.Nil(t, pool) + assert.ErrorIs(t, err, errInvalidArgument) + + concurrency, coreGo, maxGo = 20, 10, 30 + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + assert.Nil(t, pool) + assert.ErrorIs(t, err, errInvalidArgument) + + concurrency, coreGo, maxGo = 20, 30, 10 + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + assert.Nil(t, pool) + assert.ErrorIs(t, err, errInvalidArgument) + + concurrency, coreGo, maxGo = 10, 30, 20 + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + assert.Nil(t, pool) + assert.ErrorIs(t, err, errInvalidArgument) + + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithQueueBacklogRate(-0.1)) + assert.Nil(t, pool) + assert.ErrorIs(t, err, errInvalidArgument) + + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithQueueBacklogRate(1.0)) + assert.NotNil(t, pool) + assert.NoError(t, err) + + pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithQueueBacklogRate(1.1)) + assert.Nil(t, pool) + assert.ErrorIs(t, err, errInvalidArgument) + + }) }) // Start()导致TaskPool状态迁移,测试见TestTaskPool_In_Running_State/Start @@ -186,6 +265,294 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { // Shutdown()导致TaskPool状态迁移,TestTaskPool_In_Closing_State/Shutdown // ShutdownNow()导致TaskPool状态迁移,TestTestPool_In_Stopped_State/ShutdownNow + + t.Run("工作协程", func(t *testing.T) { + t.Parallel() + + t.Run("保持在初始数不变", func(t *testing.T) { + t.Parallel() + + concurrency, queueSize := 1, 3 + pool := testNewRunningStateTaskPool(t, concurrency, queueSize) + + n := queueSize + done1 := make(chan struct{}, n) + wait := make(chan struct{}, n) + + // 队列中有积压任务 + for i := 0; i < n; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done1 + return nil + })) + assert.NoError(t, err) + } + + // concurrency个tasks在运行中 + for i := 0; i < concurrency; i++ { + <-wait + } + + assert.Equal(t, int32(concurrency), pool.numOfGo()) + + // 使运行中的tasks结束 + for i := 0; i < concurrency; i++ { + done1 <- struct{}{} + } + + // 积压在队列中的任务开始运行 + for i := 0; i < n-concurrency; i++ { + <-wait + assert.Equal(t, int32(concurrency), pool.numOfGo()) + done1 <- struct{}{} + } + + }) + + t.Run("从初始数达到核心数", func(t *testing.T) { + t.Parallel() + + t.Run("一次性全开", func(t *testing.T) { + t.Parallel() + + t.Run("核心数比初始数多1个", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxIdleTime := int32(1), int32(2), 3*time.Millisecond + queueSize := int(coreGo) + testExtendNumGoFromInitGoToCoreGoAtOnce(t, concurrency, queueSize, coreGo, maxIdleTime) + }) + + t.Run("核心数比初始数多n个", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxIdleTime := int32(1), int32(3), 3*time.Millisecond + queueSize := int(coreGo) + testExtendNumGoFromInitGoToCoreGoAtOnce(t, concurrency, queueSize, coreGo, maxIdleTime) + }) + }) + + t.Run("一次一个开", func(t *testing.T) { + t.Parallel() + + t.Run("核心数比初始数多1个", func(t *testing.T) { + concurrency, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(3), 3*time.Millisecond, 0.1 + queueSize := int(coreGo) + testExtendNumGoFromInitGoToCoreGoStepByStep(t, concurrency, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) + }) + + t.Run("核心数比初始数多n个", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(5), 3*time.Millisecond, 0.1 + queueSize := int(coreGo) + testExtendNumGoFromInitGoToCoreGoStepByStep(t, concurrency, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) + + }) + }) + + t.Run("在(初始数,核心数]区间的协程运行完任务后,在等待退出期间再次抢到任务", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxIdleTime := int32(1), int32(6), 100*time.Millisecond + queueSize := int(coreGo) + + pool := testNewRunningStateTaskPool(t, int(concurrency), queueSize, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) + done := make(chan struct{}, queueSize) + wait := make(chan struct{}, queueSize) + + for i := 0; i < queueSize; i++ { + // i := i + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done + // t.Log("task done", i) + return nil + })) + assert.NoError(t, err) + } + + for i := 0; i < queueSize; i++ { + <-wait + } + assert.Equal(t, coreGo, pool.numOfGo()) + + close(done) + + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + <-done + // t.Log("task done [x]") + return nil + })) + assert.NoError(t, err) + + // <-time.After(maxIdleTime * 100) + for pool.numOfGo() > concurrency { + // t.Log("loop", "numOfGo", pool.numOfGo(), "timeoutGroup", pool.timeoutGroup.size()) + time.Sleep(maxIdleTime) + } + assert.Equal(t, concurrency, pool.numOfGo()) + }) + }) + + t.Run("从核心数到达最大数", func(t *testing.T) { + t.Parallel() + + t.Run("一次性全开", func(t *testing.T) { + t.Parallel() + + t.Run("最大数比核心数多1个", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxGo, maxIdleTime := int32(2), int32(4), int32(5), 3*time.Millisecond + queueSize := int(maxGo) + testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime) + }) + + t.Run("最大数比核心数多n个", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxGo, maxIdleTime := int32(2), int32(3), int32(5), 3*time.Millisecond + queueSize := int(maxGo) + testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime) + }) + + }) + + t.Run("一次一个开", func(t *testing.T) { + t.Parallel() + + t.Run("最大数比核心数多1个", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(2), int32(4), int32(5), 3*time.Millisecond, 0.1 + queueSize := int(maxGo) + testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) + }) + + t.Run("最大数比核心数多n个", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(6), 3*time.Millisecond, 0.1 + queueSize := int(maxGo) + testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) + }) + }) + }) + }) + +} + +type extendStrategyCheckFunc func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) + +func testExtendNumGoFromInitGoToCoreGoAtOnce(t *testing.T, concurrency int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { + extendToCoreGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { + assert.Equal(t, coreGo, pool.numOfGo()) + } + opts = append(opts, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, coreGo, maxIdleTime, extendToCoreGoAtOnce, nil, opts...) +} + +func testExtendNumGoFromInitGoToCoreGoStepByStep(t *testing.T, concurrency int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { + extendToCoreGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { + assert.Equal(t, i, pool.numOfGo()) + } + opts = append(opts, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, coreGo, maxIdleTime, extendToCoreGoAtOnce, nil, opts...) +} + +func testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t *testing.T, concurrency int32, queueSize int, coreGo int32, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { + extendToCoreGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { + assert.Equal(t, coreGo, pool.numOfGo()) + } + extendToMaxGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { + assert.Equal(t, maxGo, pool.numOfGo()) + } + opts = append(opts, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime, extendToCoreGoAtOnce, extendToMaxGoAtOnce, opts...) +} + +func testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t *testing.T, concurrency int32, queueSize int, coreGo, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { + extendStepByStep := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { + assert.Equal(t, i, pool.numOfGo()) + } + opts = append(opts, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime, extendStepByStep, extendStepByStep, opts...) +} + +func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queueSize int, coreGo, maxGo int32, maxIdleTime time.Duration, duringExtendToCoreGo extendStrategyCheckFunc, duringExtendToMaxGo extendStrategyCheckFunc, opts ...option.Option[OnDemandBlockTaskPool]) { + + pool := testNewRunningStateTaskPool(t, int(initGo), queueSize, opts...) + // waitTime := (maxIdleTime + 1) * 330 + + assert.LessOrEqual(t, initGo, coreGo) + assert.LessOrEqual(t, coreGo, maxGo) + + done := make(chan struct{}) + wait := make(chan struct{}, maxGo) + + // 稳定在concurrency + for i := int32(0); i < initGo; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done + return nil + })) + assert.NoError(t, err) + } + // t.Log("AA") + for i := int32(0); i < initGo; i++ { + <-wait + } + assert.Equal(t, initGo, pool.numOfGo()) + // t.Log("BB") + + // 逐步添加任务 + for i := int32(1); i <= coreGo-initGo; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done + return nil + })) + assert.NoError(t, err) + // t.Log("before wait", "m", m, "i", i, "m-n", m-n, "len(wait)", len(wait), "len(queue)", len(pool.queue), "numGO", pool.numOfGo(), "nextnumGO", pool.expectedNumGo) + <-wait + // t.Log("after wait coreGo", coreGo, i, pool.numOfGo()) + + duringExtendToCoreGo(t, i+initGo, pool) + // assert.Equal(t, i+n, pool.numOfGo()) + } + + // t.Log("CC") + + assert.Equal(t, coreGo, pool.numOfGo()) + + for i := int32(1); i <= maxGo-coreGo; i++ { + + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done + return nil + })) + assert.NoError(t, err) + // t.Log("before wait", "m", m, "i", i, "m-n", m-n, "len(wait)", len(wait), "len(queue)", len(pool.queue), "numGO", pool.numOfGo(), "nextnumGO", pool.expectedNumGo) + <-wait + // t.Log("after wait maxGo", maxGo, i, pool.numOfGo()) + + duringExtendToMaxGo(t, i+coreGo, pool) + } + + // t.Log("DD") + + assert.Equal(t, maxGo, pool.numOfGo()) + close(done) + + // 等待最大空闲时间后,稳定在n + // <-time.After(waitTime) + for pool.numOfGo() > initGo { + } + assert.Equal(t, initGo, pool.numOfGo()) } func TestOnDemandBlockTaskPool_In_Closing_State(t *testing.T) { @@ -200,58 +567,90 @@ func TestOnDemandBlockTaskPool_In_Closing_State(t *testing.T) { // 模拟阻塞提交 n := concurrency + queueSize*2 eg := new(errgroup.Group) - waitChan := make(chan struct{}) + waitChan := make(chan struct{}, n) + taskDone := make(chan struct{}) for i := 0; i < n; i++ { eg.Go(func() error { return pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { - <-waitChan + waitChan <- struct{}{} + <-taskDone return nil })) }) } - - // 调用Shutdown使TaskPool状态发生迁移 - type ShutdownResult struct { - done <-chan struct{} - err error + for i := 0; i < concurrency; i++ { + <-waitChan } - resultChan := make(chan ShutdownResult) - go func() { - time.Sleep(100 * time.Millisecond) - done, err := pool.Shutdown() - resultChan <- ShutdownResult{done: done, err: err} - }() - r := <-resultChan - + done, err := pool.Shutdown() + assert.NoError(t, err) // Closing过程中Submit会报错间接证明TaskPool处于StateClosing状态 assert.ErrorIs(t, eg.Wait(), errTaskPoolIsClosing) - // Shutdown调用成功 - assert.NoError(t, r.err) - select { - case <-r.done: - break - default: - // 第二次调用 - done2, err2 := pool.Shutdown() - assert.Nil(t, done2) - assert.ErrorIs(t, err2, errTaskPoolIsClosing) - assert.Equal(t, stateClosing, pool.internalState()) - } + // 第二次调用 + done2, err2 := pool.Shutdown() + assert.Nil(t, done2) + assert.ErrorIs(t, err2, errTaskPoolIsClosing) + assert.Equal(t, stateClosing, pool.internalState()) - assert.Equal(t, int32(concurrency), pool.NumGo()) + assert.Equal(t, int32(concurrency), pool.numOfGo()) - close(waitChan) - <-r.done + close(taskDone) + <-done assert.Equal(t, stateStopped, pool.internalState()) // 第一个Shutdown将状态迁移至StateStopped // 第三次调用 - done, err := pool.Shutdown() - assert.Nil(t, done) + done3, err := pool.Shutdown() + assert.Nil(t, done3) assert.ErrorIs(t, err, errTaskPoolIsStopped) }) + t.Run("Shutdown —— 协程数仍能按需扩展,调度循环也能自然退出", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(5), 10*time.Millisecond, 0.1 + queueSize := int(maxGo) + pool := testNewRunningStateTaskPool(t, int(concurrency), queueSize, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime), WithQueueBacklogRate(queueBacklogRate)) + + assert.LessOrEqual(t, concurrency, coreGo) + assert.LessOrEqual(t, coreGo, maxGo) + + taskDone := make(chan struct{}) + wait := make(chan struct{}) + + for i := int32(0); i < maxGo; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-taskDone + return nil + })) + assert.NoError(t, err) + } + + // 提交任务后立即Shutdown + shutdownDone, err := pool.Shutdown() + assert.NoError(t, err) + + // 等待close(b.queue)信号传递到各个协程 + time.Sleep(1 * time.Second) + + // 调度循环应该正常工作,一直按需开协程直到maxGo + for i := int32(0); i < maxGo; i++ { + <-wait + } + assert.Equal(t, maxGo, pool.numOfGo()) + + // 让所有任务结束 + close(taskDone) + <-shutdownDone + + // 用循环取代time.After/time.Sleep + for pool.numOfGo() != 0 { + + } + assert.Equal(t, int32(0), pool.numOfGo()) + }) + t.Run("Start", func(t *testing.T) { t.Parallel() @@ -338,6 +737,56 @@ func TestOnDemandBlockTaskPool_In_Stopped_State(t *testing.T) { assert.Equal(t, stateStopped, pool.internalState()) }) + t.Run("ShutdownNow —— 工作协程数不再扩展,调度循环立即退出", func(t *testing.T) { + t.Parallel() + + concurrency, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(5), 10*time.Millisecond, 0.1 + queueSize := int(maxGo) + pool := testNewRunningStateTaskPool(t, int(concurrency), queueSize, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime), WithQueueBacklogRate(queueBacklogRate)) + + assert.LessOrEqual(t, concurrency, coreGo) + assert.LessOrEqual(t, coreGo, maxGo) + + taskDone := make(chan struct{}) + wait := make(chan struct{}, queueSize) + + for i := 0; i < queueSize; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-taskDone + return nil + })) + assert.NoError(t, err) + } + + // 使调度循环进入default分支 + for i := int32(0); i < coreGo; i++ { + <-wait + } + + tasks, err := pool.ShutdownNow() + assert.NoError(t, err) + // 见下方双重检查 + assert.GreaterOrEqual(t, len(tasks)+int(pool.numOfGo()), queueSize) + + // 让所有任务结束 + close(taskDone) + + // 用循环取代time.After/time.Sleep + // 特殊场景需要双重检查 + // 协程1工作中,调度循环处于default分支准备扩展协程(新增一个),此时调用ShutdownNow() + // 协程1完成工作接收到ShutdownNow()信号退出,而协程2还未开启可以使pool.numOfGo()短暂为0 + // 协程2启动后直接收到ShutdownNow()信号退出 + for pool.numOfGo() != 0 { + + } + for pool.numOfGo() != 0 { + + } + + assert.Equal(t, int32(0), pool.numOfGo()) + }) + t.Run("Start", func(t *testing.T) { t.Parallel() @@ -408,8 +857,8 @@ type ShutdownNowResult struct { err error } -func testNewRunningStateTaskPool(t *testing.T, concurrency int, queueSize int) *OnDemandBlockTaskPool { - pool, _ := NewOnDemandBlockTaskPool(concurrency, queueSize) +func testNewRunningStateTaskPool(t *testing.T, concurrency int, queueSize int, opts ...option.Option[OnDemandBlockTaskPool]) *OnDemandBlockTaskPool { + pool, _ := NewOnDemandBlockTaskPool(concurrency, queueSize, opts...) assert.Equal(t, stateCreated, pool.internalState()) assert.NoError(t, pool.Start()) assert.Equal(t, stateRunning, pool.internalState()) @@ -442,6 +891,34 @@ func testNewRunningStateTaskPoolWithQueueFullFilled(t *testing.T, concurrency in return pool, wait } -type FakeTask struct{} +func TestGroup(t *testing.T) { + n := 10 -func (f *FakeTask) Run(_ context.Context) error { return nil } + // g := &sliceGroup{members: make([]int, n, n)} + g := &group{mp: make(map[int]int)} + + for i := 0; i < n; i++ { + assert.False(t, g.isIn(i)) + g.add(i) + assert.True(t, g.isIn(i)) + assert.Equal(t, int32(i+1), g.size()) + } + + assert.Equal(t, int32(n), g.size()) + + for i := 0; i < n; i++ { + g.delete(i) + assert.Equal(t, int32(n-i-1), g.size()) + } + + assert.Equal(t, int32(0), g.size()) + + assert.False(t, g.isIn(n+1)) + + id := 100 + g.add(id) + assert.Equal(t, int32(1), g.size()) + assert.True(t, g.isIn(id)) + g.delete(id) + assert.Equal(t, int32(0), g.size()) +} From f22e81f44e33651401b7c2889a1894ecb4200893 Mon Sep 17 00:00:00 2001 From: longyue0521 Date: Tue, 20 Sep 2022 01:41:57 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E4=B8=AD=E6=9C=AA=E4=BD=BF=E7=94=A8=E5=8F=82=E6=95=B0=E5=B9=B6?= =?UTF-8?q?=E4=BF=AE=E6=94=B9CHANGELOG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: longyue0521 --- .CHANGELOG.md | 3 ++- pool/task_pool_test.go | 10 +++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.CHANGELOG.md b/.CHANGELOG.md index e1b3df33..4a6540e2 100644 --- a/.CHANGELOG.md +++ b/.CHANGELOG.md @@ -28,4 +28,5 @@ - [syncx: 使用泛型封装 sync.Map](https://github.com/gotomicro/ekit/pull/79) - [slice: 支持 Diff*, Intersection*, Union*, Index* 类方法](https://github.com/gotomicro/ekit/pull/83) - [slice: 聚合函数 Max, Min 和 Sum](https://github.com/gotomicro/ekit/pull/82) -- [slice: FilterMap 和 Delete 方法](https://github.com/gotomicro/ekit/pull/91) \ No newline at end of file +- [slice: FilterMap 和 Delete 方法](https://github.com/gotomicro/ekit/pull/91) +- [pool: 重构TaskPool](https://github.com/gotomicro/ekit/pull/93) \ No newline at end of file diff --git a/pool/task_pool_test.go b/pool/task_pool_test.go index ca2fdbdd..d5f8d5a7 100644 --- a/pool/task_pool_test.go +++ b/pool/task_pool_test.go @@ -451,7 +451,7 @@ func testExtendNumGoFromInitGoToCoreGoAtOnce(t *testing.T, concurrency int32, qu assert.Equal(t, coreGo, pool.numOfGo()) } opts = append(opts, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, coreGo, maxIdleTime, extendToCoreGoAtOnce, nil, opts...) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, coreGo, extendToCoreGoAtOnce, nil, opts...) } func testExtendNumGoFromInitGoToCoreGoStepByStep(t *testing.T, concurrency int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { @@ -459,7 +459,7 @@ func testExtendNumGoFromInitGoToCoreGoStepByStep(t *testing.T, concurrency int32 assert.Equal(t, i, pool.numOfGo()) } opts = append(opts, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, coreGo, maxIdleTime, extendToCoreGoAtOnce, nil, opts...) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, coreGo, extendToCoreGoAtOnce, nil, opts...) } func testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t *testing.T, concurrency int32, queueSize int, coreGo int32, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { @@ -470,7 +470,7 @@ func testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t *testing.T, concurrency i assert.Equal(t, maxGo, pool.numOfGo()) } opts = append(opts, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime, extendToCoreGoAtOnce, extendToMaxGoAtOnce, opts...) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, maxGo, extendToCoreGoAtOnce, extendToMaxGoAtOnce, opts...) } func testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t *testing.T, concurrency int32, queueSize int, coreGo, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { @@ -478,10 +478,10 @@ func testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t *testing.T, concurren assert.Equal(t, i, pool.numOfGo()) } opts = append(opts, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime, extendStepByStep, extendStepByStep, opts...) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, maxGo, extendStepByStep, extendStepByStep, opts...) } -func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queueSize int, coreGo, maxGo int32, maxIdleTime time.Duration, duringExtendToCoreGo extendStrategyCheckFunc, duringExtendToMaxGo extendStrategyCheckFunc, opts ...option.Option[OnDemandBlockTaskPool]) { +func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queueSize int, coreGo, maxGo int32, duringExtendToCoreGo extendStrategyCheckFunc, duringExtendToMaxGo extendStrategyCheckFunc, opts ...option.Option[OnDemandBlockTaskPool]) { pool := testNewRunningStateTaskPool(t, int(initGo), queueSize, opts...) // waitTime := (maxIdleTime + 1) * 330 From 3a0306c4778f0d478abd45f7eae9dea8cbe2ef20 Mon Sep 17 00:00:00 2001 From: longyue0521 Date: Tue, 20 Sep 2022 17:25:19 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E5=B0=86concurrency=E9=87=8D=E5=91=BD?= =?UTF-8?q?=E5=90=8D=E4=B8=BAinitGo=EF=BC=8CtotalNumOfGo=E9=87=8D=E5=91=BD?= =?UTF-8?q?=E5=90=8D=E4=B8=BAtotalGo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pool/task_pool.go | 72 +++++++++--------- pool/task_pool_test.go | 167 ++++++++++++++++++++--------------------- 2 files changed, 119 insertions(+), 120 deletions(-) diff --git a/pool/task_pool.go b/pool/task_pool.go index ca0e4e17..f98c2909 100644 --- a/pool/task_pool.go +++ b/pool/task_pool.go @@ -151,8 +151,8 @@ type OnDemandBlockTaskPool struct { queue chan Task numGoRunningTasks int32 - totalNumOfGo int32 - mutex sync.RWMutex + totalGo int32 + mutex sync.RWMutex // 初始协程数 initGo int32 @@ -176,11 +176,11 @@ type OnDemandBlockTaskPool struct { } // NewOnDemandBlockTaskPool 创建一个新的 OnDemandBlockTaskPool -// concurrency 是并发数 +// initGo 是初始协程数 // queueSize 是队列大小,即最多有多少个任务在等待调度 -func NewOnDemandBlockTaskPool(concurrency int, queueSize int, opts ...option.Option[OnDemandBlockTaskPool]) (*OnDemandBlockTaskPool, error) { - if concurrency < 1 { - return nil, fmt.Errorf("%w:concurrency应该大于0", errInvalidArgument) +func NewOnDemandBlockTaskPool(initGo int, queueSize int, opts ...option.Option[OnDemandBlockTaskPool]) (*OnDemandBlockTaskPool, error) { + if initGo < 1 { + return nil, fmt.Errorf("%w:initGo应该大于0", errInvalidArgument) } if queueSize < 0 { return nil, fmt.Errorf("%w:queueSize应该大于等于0", errInvalidArgument) @@ -188,9 +188,9 @@ func NewOnDemandBlockTaskPool(concurrency int, queueSize int, opts ...option.Opt b := &OnDemandBlockTaskPool{ queue: make(chan Task, queueSize), shutdownDone: make(chan struct{}, 1), - initGo: int32(concurrency), - coreGo: int32(concurrency), - maxGo: int32(concurrency), + initGo: int32(initGo), + coreGo: int32(initGo), + maxGo: int32(initGo), maxIdleTime: defaultMaxIdleTime, } @@ -199,13 +199,13 @@ func NewOnDemandBlockTaskPool(concurrency int, queueSize int, opts ...option.Opt option.Apply(b, opts...) - if b.coreGo != int32(concurrency) && b.maxGo == int32(concurrency) { + if b.coreGo != b.initGo && b.maxGo == b.initGo { b.maxGo = b.coreGo - } else if b.coreGo == int32(concurrency) && b.maxGo != int32(concurrency) { + } else if b.coreGo == b.initGo && b.maxGo != b.initGo { b.coreGo = b.maxGo } if !(b.initGo <= b.coreGo && b.coreGo <= b.maxGo) { - return nil, fmt.Errorf("%w : 需要满足concurrency <= coreGo <= maxGo条件", errInvalidArgument) + return nil, fmt.Errorf("%w : 需要满足initGo <= coreGo <= maxGo条件", errInvalidArgument) } b.timeoutGroup = &group{mp: make(map[int]int)} @@ -323,7 +323,7 @@ func (b *OnDemandBlockTaskPool) scheduling() { id := 0 - b.increaseTotalNumOfGo(b.initGo) + b.increaseTotalGo(b.initGo) for i := int32(0); i < b.initGo; i++ { go b.goroutine(id) id++ @@ -342,12 +342,12 @@ func (b *OnDemandBlockTaskPool) scheduling() { b.mutex.RLock() - if b.totalNumOfGo == b.maxGo { + if b.totalGo == b.maxGo { b.mutex.RUnlock() continue } - allGoShouldBeBusy := atomic.LoadInt32(&b.numGoRunningTasks) == b.totalNumOfGo + allGoShouldBeBusy := atomic.LoadInt32(&b.numGoRunningTasks) == b.totalGo if !allGoShouldBeBusy { b.mutex.RUnlock() continue @@ -361,7 +361,7 @@ func (b *OnDemandBlockTaskPool) scheduling() { } // time.Sleep(time.Second) - // log.Println("totalNumOfGo", b.totalNumOfGo) + // log.Println("totalGo", b.totalGo) var n int32 @@ -370,16 +370,16 @@ func (b *OnDemandBlockTaskPool) scheduling() { // 当b.queueBacklogRate在(0, 1]区间时,每次开一个 if 0 < b.queueBacklogRate && b.queueBacklogRate <= 1 { n = 1 - } else if b.initGo <= b.totalNumOfGo && b.totalNumOfGo < b.coreGo { - n = b.coreGo - b.totalNumOfGo - } else if b.coreGo <= b.totalNumOfGo && b.totalNumOfGo < b.maxGo { - n = b.maxGo - b.totalNumOfGo + } else if b.initGo <= b.totalGo && b.totalGo < b.coreGo { + n = b.coreGo - b.totalGo + } else if b.coreGo <= b.totalGo && b.totalGo < b.maxGo { + n = b.maxGo - b.totalGo } - // log.Println("开协程", n, "max-id", id+int(n-1), "totalNumOfGo", b.totalNumOfGo+n) + // log.Println("开协程", n, "max-id", id+int(n-1), "totalGo", b.totalGo+n) b.mutex.RUnlock() - b.increaseTotalNumOfGo(n) + b.increaseTotalGo(n) for i := int32(0); i < n; i++ { go b.goroutine(id) id++ @@ -402,11 +402,11 @@ func (b *OnDemandBlockTaskPool) goroutine(id int) { select { case <-b.shutdownNowCtx.Done(): // log.Printf("id %d shutdownNow, timeoutGroup.Size=%d left\n", id, b.timeoutGroup.size()) - b.decreaseTotalNumOfGo(1) + b.decreaseTotalGo(1) return case <-idleTimer.C: b.mutex.Lock() - b.totalNumOfGo-- + b.totalGo-- b.timeoutGroup.delete(id) // log.Printf("id %d timeout, timeoutGroup.Size=%d left\n", id, b.timeoutGroup.size()) b.mutex.Unlock() @@ -440,13 +440,13 @@ func (b *OnDemandBlockTaskPool) goroutine(id int) { close(b.shutdownDone) }) - b.decreaseTotalNumOfGo(1) + b.decreaseTotalGo(1) return } // 有其他协程运行task中,自己退出就好。 atomic.AddInt32(&b.numGoRunningTasks, -1) - b.decreaseTotalNumOfGo(1) + b.decreaseTotalGo(1) return } @@ -455,19 +455,19 @@ func (b *OnDemandBlockTaskPool) goroutine(id int) { atomic.AddInt32(&b.numGoRunningTasks, -1) b.mutex.Lock() - // log.Println("id", id, "totalNumOfGo-mem", b.totalNumOfGo-b.timeoutGroup.size(), "totalNumOfGo", b.totalNumOfGo, "mem", b.timeoutGroup.size()) - if b.coreGo < b.totalNumOfGo && (len(b.queue) == 0 || int32(len(b.queue)) < b.totalNumOfGo) { + // log.Println("id", id, "totalGo-mem", b.totalGo-b.timeoutGroup.size(), "totalGo", b.totalGo, "mem", b.timeoutGroup.size()) + if b.coreGo < b.totalGo && (len(b.queue) == 0 || int32(len(b.queue)) < b.totalGo) { // 协程在(核心,最大]区间 // 如果没有任务可以执行,或者被判定为可能抢不到任务的协程直接退出 // 一定要在此处减1才能让此刻等待在mutex上的其他协程被正确地分区 - b.totalNumOfGo-- + b.totalGo-- // log.Println("id", id, "exits....") b.mutex.Unlock() return } - if b.initGo < b.totalNumOfGo-b.timeoutGroup.size() /* && len(b.queue) == 0 */ { - // log.Println("id", id, "initGo", b.initGo, "totalNumOfGo-mem", b.totalNumOfGo-b.timeoutGroup.size(), "totalNumOfGo", b.totalNumOfGo) + if b.initGo < b.totalGo-b.timeoutGroup.size() /* && len(b.queue) == 0 */ { + // log.Println("id", id, "initGo", b.initGo, "totalGo-mem", b.totalGo-b.timeoutGroup.size(), "totalGo", b.totalGo) // 协程在(初始,核心]区间,如果没有任务可以执行,重置计时器 // 当len(b.queue) != 0时,即便协程属于(核心,最大]区间,其实也要给一个定时器 // 因为现在看队列中有任务,等真去拿的时候可能恰好没任务,那么此时常驻协程(初始协程数/initGo)就会暂时增加 @@ -482,15 +482,15 @@ func (b *OnDemandBlockTaskPool) goroutine(id int) { } } -func (b *OnDemandBlockTaskPool) increaseTotalNumOfGo(n int32) { +func (b *OnDemandBlockTaskPool) increaseTotalGo(n int32) { b.mutex.Lock() - b.totalNumOfGo += n + b.totalGo += n b.mutex.Unlock() } -func (b *OnDemandBlockTaskPool) decreaseTotalNumOfGo(n int32) { +func (b *OnDemandBlockTaskPool) decreaseTotalGo(n int32) { b.mutex.Lock() - b.totalNumOfGo -= n + b.totalGo -= n b.mutex.Unlock() } @@ -576,7 +576,7 @@ func (b *OnDemandBlockTaskPool) internalState() int32 { func (b *OnDemandBlockTaskPool) numOfGo() int32 { var n int32 b.mutex.RLock() - n = b.totalNumOfGo + n = b.totalGo b.mutex.RUnlock() return n } diff --git a/pool/task_pool_test.go b/pool/task_pool_test.go index d5f8d5a7..ffef8073 100644 --- a/pool/task_pool_test.go +++ b/pool/task_pool_test.go @@ -71,76 +71,76 @@ func TestOnDemandBlockTaskPool_In_Created_State(t *testing.T) { t.Run("With Options", func(t *testing.T) { t.Parallel() - concurrency := 10 - pool, err := NewOnDemandBlockTaskPool(concurrency, 10) + initGo := 10 + pool, err := NewOnDemandBlockTaskPool(initGo, 10) assert.NoError(t, err) - assert.Equal(t, int32(concurrency), pool.initGo) - assert.Equal(t, int32(concurrency), pool.coreGo) - assert.Equal(t, int32(concurrency), pool.maxGo) + assert.Equal(t, int32(initGo), pool.initGo) + assert.Equal(t, int32(initGo), pool.coreGo) + assert.Equal(t, int32(initGo), pool.maxGo) assert.Equal(t, defaultMaxIdleTime, pool.maxIdleTime) coreGo, maxGo, maxIdleTime := int32(20), int32(30), 10*time.Second - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) assert.NoError(t, err) - assert.Equal(t, int32(concurrency), pool.initGo) + assert.Equal(t, int32(initGo), pool.initGo) assert.Equal(t, coreGo, pool.coreGo) assert.Equal(t, maxGo, pool.maxGo) assert.Equal(t, maxIdleTime, pool.maxIdleTime) - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo)) + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithCoreGo(coreGo)) assert.NoError(t, err) assert.Equal(t, pool.coreGo, pool.maxGo) - concurrency, coreGo = 30, 20 - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo)) + initGo, coreGo = 30, 20 + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithCoreGo(coreGo)) assert.Nil(t, pool) assert.ErrorIs(t, err, errInvalidArgument) - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithMaxGo(maxGo)) + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithMaxGo(maxGo)) assert.NoError(t, err) assert.Equal(t, pool.maxGo, pool.coreGo) - concurrency, maxGo = 30, 10 - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithMaxGo(maxGo)) + initGo, maxGo = 30, 10 + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithMaxGo(maxGo)) assert.Nil(t, pool) assert.ErrorIs(t, err, errInvalidArgument) - concurrency, coreGo, maxGo = 30, 20, 10 - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + initGo, coreGo, maxGo = 30, 20, 10 + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) assert.Nil(t, pool) assert.ErrorIs(t, err, errInvalidArgument) - concurrency, coreGo, maxGo = 30, 10, 20 - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + initGo, coreGo, maxGo = 30, 10, 20 + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) assert.Nil(t, pool) assert.ErrorIs(t, err, errInvalidArgument) - concurrency, coreGo, maxGo = 20, 10, 30 - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + initGo, coreGo, maxGo = 20, 10, 30 + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) assert.Nil(t, pool) assert.ErrorIs(t, err, errInvalidArgument) - concurrency, coreGo, maxGo = 20, 30, 10 - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + initGo, coreGo, maxGo = 20, 30, 10 + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) assert.Nil(t, pool) assert.ErrorIs(t, err, errInvalidArgument) - concurrency, coreGo, maxGo = 10, 30, 20 - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) + initGo, coreGo, maxGo = 10, 30, 20 + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithCoreGo(coreGo), WithMaxGo(maxGo)) assert.Nil(t, pool) assert.ErrorIs(t, err, errInvalidArgument) - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithQueueBacklogRate(-0.1)) + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithQueueBacklogRate(-0.1)) assert.Nil(t, pool) assert.ErrorIs(t, err, errInvalidArgument) - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithQueueBacklogRate(1.0)) + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithQueueBacklogRate(1.0)) assert.NotNil(t, pool) assert.NoError(t, err) - pool, err = NewOnDemandBlockTaskPool(concurrency, 10, WithQueueBacklogRate(1.1)) + pool, err = NewOnDemandBlockTaskPool(initGo, 10, WithQueueBacklogRate(1.1)) assert.Nil(t, pool) assert.ErrorIs(t, err, errInvalidArgument) @@ -272,8 +272,8 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { t.Run("保持在初始数不变", func(t *testing.T) { t.Parallel() - concurrency, queueSize := 1, 3 - pool := testNewRunningStateTaskPool(t, concurrency, queueSize) + initGo, queueSize := 1, 3 + pool := testNewRunningStateTaskPool(t, initGo, queueSize) n := queueSize done1 := make(chan struct{}, n) @@ -289,22 +289,22 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { assert.NoError(t, err) } - // concurrency个tasks在运行中 - for i := 0; i < concurrency; i++ { + // initGo个tasks在运行中 + for i := 0; i < initGo; i++ { <-wait } - assert.Equal(t, int32(concurrency), pool.numOfGo()) + assert.Equal(t, int32(initGo), pool.numOfGo()) // 使运行中的tasks结束 - for i := 0; i < concurrency; i++ { + for i := 0; i < initGo; i++ { done1 <- struct{}{} } // 积压在队列中的任务开始运行 - for i := 0; i < n-concurrency; i++ { + for i := 0; i < n-initGo; i++ { <-wait - assert.Equal(t, int32(concurrency), pool.numOfGo()) + assert.Equal(t, int32(initGo), pool.numOfGo()) done1 <- struct{}{} } @@ -319,17 +319,17 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { t.Run("核心数比初始数多1个", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxIdleTime := int32(1), int32(2), 3*time.Millisecond + initGo, coreGo, maxIdleTime := int32(1), int32(2), 3*time.Millisecond queueSize := int(coreGo) - testExtendNumGoFromInitGoToCoreGoAtOnce(t, concurrency, queueSize, coreGo, maxIdleTime) + testExtendNumGoFromInitGoToCoreGoAtOnce(t, initGo, queueSize, coreGo, maxIdleTime) }) t.Run("核心数比初始数多n个", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxIdleTime := int32(1), int32(3), 3*time.Millisecond + initGo, coreGo, maxIdleTime := int32(1), int32(3), 3*time.Millisecond queueSize := int(coreGo) - testExtendNumGoFromInitGoToCoreGoAtOnce(t, concurrency, queueSize, coreGo, maxIdleTime) + testExtendNumGoFromInitGoToCoreGoAtOnce(t, initGo, queueSize, coreGo, maxIdleTime) }) }) @@ -337,17 +337,17 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { t.Parallel() t.Run("核心数比初始数多1个", func(t *testing.T) { - concurrency, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(3), 3*time.Millisecond, 0.1 + initGo, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(3), 3*time.Millisecond, 0.1 queueSize := int(coreGo) - testExtendNumGoFromInitGoToCoreGoStepByStep(t, concurrency, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) + testExtendNumGoFromInitGoToCoreGoStepByStep(t, initGo, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) }) t.Run("核心数比初始数多n个", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(5), 3*time.Millisecond, 0.1 + initGo, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(5), 3*time.Millisecond, 0.1 queueSize := int(coreGo) - testExtendNumGoFromInitGoToCoreGoStepByStep(t, concurrency, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) + testExtendNumGoFromInitGoToCoreGoStepByStep(t, initGo, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) }) }) @@ -355,10 +355,10 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { t.Run("在(初始数,核心数]区间的协程运行完任务后,在等待退出期间再次抢到任务", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxIdleTime := int32(1), int32(6), 100*time.Millisecond + initGo, coreGo, maxIdleTime := int32(1), int32(6), 100*time.Millisecond queueSize := int(coreGo) - pool := testNewRunningStateTaskPool(t, int(concurrency), queueSize, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) + pool := testNewRunningStateTaskPool(t, int(initGo), queueSize, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) done := make(chan struct{}, queueSize) wait := make(chan struct{}, queueSize) @@ -388,11 +388,11 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { assert.NoError(t, err) // <-time.After(maxIdleTime * 100) - for pool.numOfGo() > concurrency { + for pool.numOfGo() > initGo { // t.Log("loop", "numOfGo", pool.numOfGo(), "timeoutGroup", pool.timeoutGroup.size()) time.Sleep(maxIdleTime) } - assert.Equal(t, concurrency, pool.numOfGo()) + assert.Equal(t, initGo, pool.numOfGo()) }) }) @@ -405,17 +405,17 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { t.Run("最大数比核心数多1个", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxGo, maxIdleTime := int32(2), int32(4), int32(5), 3*time.Millisecond + initGo, coreGo, maxGo, maxIdleTime := int32(2), int32(4), int32(5), 3*time.Millisecond queueSize := int(maxGo) - testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime) + testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t, initGo, queueSize, coreGo, maxGo, maxIdleTime) }) t.Run("最大数比核心数多n个", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxGo, maxIdleTime := int32(2), int32(3), int32(5), 3*time.Millisecond + initGo, coreGo, maxGo, maxIdleTime := int32(2), int32(3), int32(5), 3*time.Millisecond queueSize := int(maxGo) - testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime) + testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t, initGo, queueSize, coreGo, maxGo, maxIdleTime) }) }) @@ -426,17 +426,17 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { t.Run("最大数比核心数多1个", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(2), int32(4), int32(5), 3*time.Millisecond, 0.1 + initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(2), int32(4), int32(5), 3*time.Millisecond, 0.1 queueSize := int(maxGo) - testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) + testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t, initGo, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) }) t.Run("最大数比核心数多n个", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(6), 3*time.Millisecond, 0.1 + initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(6), 3*time.Millisecond, 0.1 queueSize := int(maxGo) - testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t, concurrency, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) + testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t, initGo, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) }) }) }) @@ -446,23 +446,23 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { type extendStrategyCheckFunc func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) -func testExtendNumGoFromInitGoToCoreGoAtOnce(t *testing.T, concurrency int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { +func testExtendNumGoFromInitGoToCoreGoAtOnce(t *testing.T, initGo int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { extendToCoreGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { assert.Equal(t, coreGo, pool.numOfGo()) } opts = append(opts, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, coreGo, extendToCoreGoAtOnce, nil, opts...) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, coreGo, extendToCoreGoAtOnce, nil, opts...) } -func testExtendNumGoFromInitGoToCoreGoStepByStep(t *testing.T, concurrency int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { +func testExtendNumGoFromInitGoToCoreGoStepByStep(t *testing.T, initGo int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { extendToCoreGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { assert.Equal(t, i, pool.numOfGo()) } opts = append(opts, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, coreGo, extendToCoreGoAtOnce, nil, opts...) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, coreGo, extendToCoreGoAtOnce, nil, opts...) } -func testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t *testing.T, concurrency int32, queueSize int, coreGo int32, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { +func testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t *testing.T, initGo int32, queueSize int, coreGo int32, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { extendToCoreGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { assert.Equal(t, coreGo, pool.numOfGo()) } @@ -470,21 +470,20 @@ func testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t *testing.T, concurrency i assert.Equal(t, maxGo, pool.numOfGo()) } opts = append(opts, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, maxGo, extendToCoreGoAtOnce, extendToMaxGoAtOnce, opts...) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, maxGo, extendToCoreGoAtOnce, extendToMaxGoAtOnce, opts...) } -func testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t *testing.T, concurrency int32, queueSize int, coreGo, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { +func testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t *testing.T, initGo int32, queueSize int, coreGo, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { extendStepByStep := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { assert.Equal(t, i, pool.numOfGo()) } opts = append(opts, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, concurrency, queueSize, coreGo, maxGo, extendStepByStep, extendStepByStep, opts...) + testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, maxGo, extendStepByStep, extendStepByStep, opts...) } func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queueSize int, coreGo, maxGo int32, duringExtendToCoreGo extendStrategyCheckFunc, duringExtendToMaxGo extendStrategyCheckFunc, opts ...option.Option[OnDemandBlockTaskPool]) { pool := testNewRunningStateTaskPool(t, int(initGo), queueSize, opts...) - // waitTime := (maxIdleTime + 1) * 330 assert.LessOrEqual(t, initGo, coreGo) assert.LessOrEqual(t, coreGo, maxGo) @@ -492,7 +491,7 @@ func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queue done := make(chan struct{}) wait := make(chan struct{}, maxGo) - // 稳定在concurrency + // 稳定在initGo for i := int32(0); i < initGo; i++ { err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { wait <- struct{}{} @@ -561,11 +560,11 @@ func TestOnDemandBlockTaskPool_In_Closing_State(t *testing.T) { t.Run("Shutdown —— 使TaskPool状态由Running变为Closing", func(t *testing.T) { t.Parallel() - concurrency, queueSize := 2, 4 - pool := testNewRunningStateTaskPool(t, concurrency, queueSize) + initGo, queueSize := 2, 4 + pool := testNewRunningStateTaskPool(t, initGo, queueSize) // 模拟阻塞提交 - n := concurrency + queueSize*2 + n := initGo + queueSize*2 eg := new(errgroup.Group) waitChan := make(chan struct{}, n) taskDone := make(chan struct{}) @@ -578,7 +577,7 @@ func TestOnDemandBlockTaskPool_In_Closing_State(t *testing.T) { })) }) } - for i := 0; i < concurrency; i++ { + for i := 0; i < initGo; i++ { <-waitChan } done, err := pool.Shutdown() @@ -592,7 +591,7 @@ func TestOnDemandBlockTaskPool_In_Closing_State(t *testing.T) { assert.ErrorIs(t, err2, errTaskPoolIsClosing) assert.Equal(t, stateClosing, pool.internalState()) - assert.Equal(t, int32(concurrency), pool.numOfGo()) + assert.Equal(t, int32(initGo), pool.numOfGo()) close(taskDone) <-done @@ -608,11 +607,11 @@ func TestOnDemandBlockTaskPool_In_Closing_State(t *testing.T) { t.Run("Shutdown —— 协程数仍能按需扩展,调度循环也能自然退出", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(5), 10*time.Millisecond, 0.1 + initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(5), 10*time.Millisecond, 0.1 queueSize := int(maxGo) - pool := testNewRunningStateTaskPool(t, int(concurrency), queueSize, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime), WithQueueBacklogRate(queueBacklogRate)) + pool := testNewRunningStateTaskPool(t, int(initGo), queueSize, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime), WithQueueBacklogRate(queueBacklogRate)) - assert.LessOrEqual(t, concurrency, coreGo) + assert.LessOrEqual(t, initGo, coreGo) assert.LessOrEqual(t, coreGo, maxGo) taskDone := make(chan struct{}) @@ -700,8 +699,8 @@ func TestOnDemandBlockTaskPool_In_Stopped_State(t *testing.T) { t.Run("ShutdownNow —— 使TaskPool状态由Running变为Stopped", func(t *testing.T) { t.Parallel() - concurrency, queueSize := 2, 4 - pool, wait := testNewRunningStateTaskPoolWithQueueFullFilled(t, concurrency, queueSize) + initGo, queueSize := 2, 4 + pool, wait := testNewRunningStateTaskPoolWithQueueFullFilled(t, initGo, queueSize) // 模拟阻塞提交 eg := new(errgroup.Group) @@ -740,11 +739,11 @@ func TestOnDemandBlockTaskPool_In_Stopped_State(t *testing.T) { t.Run("ShutdownNow —— 工作协程数不再扩展,调度循环立即退出", func(t *testing.T) { t.Parallel() - concurrency, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(5), 10*time.Millisecond, 0.1 + initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(5), 10*time.Millisecond, 0.1 queueSize := int(maxGo) - pool := testNewRunningStateTaskPool(t, int(concurrency), queueSize, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime), WithQueueBacklogRate(queueBacklogRate)) + pool := testNewRunningStateTaskPool(t, int(initGo), queueSize, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime), WithQueueBacklogRate(queueBacklogRate)) - assert.LessOrEqual(t, concurrency, coreGo) + assert.LessOrEqual(t, initGo, coreGo) assert.LessOrEqual(t, coreGo, maxGo) taskDone := make(chan struct{}) @@ -857,16 +856,16 @@ type ShutdownNowResult struct { err error } -func testNewRunningStateTaskPool(t *testing.T, concurrency int, queueSize int, opts ...option.Option[OnDemandBlockTaskPool]) *OnDemandBlockTaskPool { - pool, _ := NewOnDemandBlockTaskPool(concurrency, queueSize, opts...) +func testNewRunningStateTaskPool(t *testing.T, initGo int, queueSize int, opts ...option.Option[OnDemandBlockTaskPool]) *OnDemandBlockTaskPool { + pool, _ := NewOnDemandBlockTaskPool(initGo, queueSize, opts...) assert.Equal(t, stateCreated, pool.internalState()) assert.NoError(t, pool.Start()) assert.Equal(t, stateRunning, pool.internalState()) return pool } -func testNewStoppedStateTaskPool(t *testing.T, concurrency int, queueSize int) *OnDemandBlockTaskPool { - pool := testNewRunningStateTaskPool(t, concurrency, queueSize) +func testNewStoppedStateTaskPool(t *testing.T, initGo int, queueSize int) *OnDemandBlockTaskPool { + pool := testNewRunningStateTaskPool(t, initGo, queueSize) tasks, err := pool.ShutdownNow() assert.NoError(t, err) assert.Equal(t, 0, len(tasks)) @@ -874,10 +873,10 @@ func testNewStoppedStateTaskPool(t *testing.T, concurrency int, queueSize int) * return pool } -func testNewRunningStateTaskPoolWithQueueFullFilled(t *testing.T, concurrency int, queueSize int) (*OnDemandBlockTaskPool, chan struct{}) { - pool := testNewRunningStateTaskPool(t, concurrency, queueSize) +func testNewRunningStateTaskPoolWithQueueFullFilled(t *testing.T, initGo int, queueSize int) (*OnDemandBlockTaskPool, chan struct{}) { + pool := testNewRunningStateTaskPool(t, initGo, queueSize) wait := make(chan struct{}) - for i := 0; i < concurrency+queueSize; i++ { + for i := 0; i < initGo+queueSize; i++ { func() { err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { <-wait From 4fbc02abf06f0616fb7f49222b2e5a8ab246e707 Mon Sep 17 00:00:00 2001 From: longyue0521 Date: Wed, 21 Sep 2022 10:01:40 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=8E=BB=E6=8E=89=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E5=BE=AA=E7=8E=AF=EF=BC=8C=E5=88=9B=E5=BB=BA=E5=8D=8F=E7=A8=8B?= =?UTF-8?q?=E7=9A=84=E6=97=B6=E6=9C=BA=E6=94=B9=E4=B8=BASubmit=E5=8F=8ASta?= =?UTF-8?q?rt=E6=96=B9=E6=B3=95=E5=86=85=EF=BC=8C=E8=A7=A3=E5=86=B3CPU?= =?UTF-8?q?=E7=A9=BA=E8=BD=AC=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pool/task_pool.go | 114 ++++++------- pool/task_pool_test.go | 363 ++++++++++++++++++++++++----------------- 2 files changed, 265 insertions(+), 212 deletions(-) diff --git a/pool/task_pool.go b/pool/task_pool.go index f98c2909..afba7717 100644 --- a/pool/task_pool.go +++ b/pool/task_pool.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "log" "runtime" "sync" "sync/atomic" @@ -168,6 +169,9 @@ type OnDemandBlockTaskPool struct { queueBacklogRate float64 shutdownOnce sync.Once + // 协程id方便调试程序 + id int32 + // 外部信号 shutdownDone chan struct{} // 内部中断信号 @@ -285,6 +289,12 @@ func (b *OnDemandBlockTaskPool) trySubmit(ctx context.Context, task Task, state case <-ctx.Done(): return false, fmt.Errorf("%w", ctx.Err()) case b.queue <- task: + if state == stateRunning && b.allowToCreateGoroutine() { + b.increaseTotalGo(1) + go b.goroutine(int(atomic.LoadInt32(&b.id))) + newId := atomic.AddInt32(&b.id, 1) + log.Println("create go ", newId-1) + } return true, nil default: // 不能阻塞在临界区,要给Shutdown和ShutdownNow机会 @@ -294,6 +304,30 @@ func (b *OnDemandBlockTaskPool) trySubmit(ctx context.Context, task Task, state return false, nil } +func (b *OnDemandBlockTaskPool) allowToCreateGoroutine() bool { + b.mutex.RLock() + defer b.mutex.RUnlock() + + if b.totalGo == b.maxGo { + return false + } + + // 这个判断可能太苛刻了,经常导致开协程失败,先注释掉 + // allGoShouldBeBusy := atomic.LoadInt32(&b.numGoRunningTasks) == b.totalGo + // if !allGoShouldBeBusy { + // return false + // } + + rate := float64(len(b.queue)) / float64(cap(b.queue)) + if rate == 0 || rate < b.queueBacklogRate { + log.Println("rate == 0", rate == 0, "rate", rate, " < ", b.queueBacklogRate) + return false + } + + // b.totalGo < b.maxGo && rate != 0 && rate >= b.queueBacklogRate + return true +} + // Start 开始调度任务执行 // Start 之后,调用者可以继续使用 Submit 提交任务 func (b *OnDemandBlockTaskPool) Start() error { @@ -312,79 +346,27 @@ func (b *OnDemandBlockTaskPool) Start() error { return fmt.Errorf("%w", errTaskPoolIsStarted) } - if atomic.CompareAndSwapInt32(&b.state, stateCreated, stateRunning) { - go b.scheduling() - return nil - } - } -} - -func (b *OnDemandBlockTaskPool) scheduling() { - - id := 0 + if atomic.CompareAndSwapInt32(&b.state, stateCreated, stateLocked) { - b.increaseTotalGo(b.initGo) - for i := int32(0); i < b.initGo; i++ { - go b.goroutine(id) - id++ - } - - for { + n := b.initGo - select { - case <-b.shutdownNowCtx.Done(): - // log.Println("Loop ShudownNow") - return - case <-b.shutdownDone: - // log.Println("Loop Shudown") - return - default: - - b.mutex.RLock() - - if b.totalGo == b.maxGo { - b.mutex.RUnlock() - continue - } - - allGoShouldBeBusy := atomic.LoadInt32(&b.numGoRunningTasks) == b.totalGo - if !allGoShouldBeBusy { - b.mutex.RUnlock() - continue - } - - rate := float64(len(b.queue)) / float64(cap(b.queue)) - if rate == 0 || rate < b.queueBacklogRate { - // log.Println("rate == 0", rate == 0, "rate", rate, " < ", b.queueBacklogRate) - b.mutex.RUnlock() - continue - } - - // time.Sleep(time.Second) - // log.Println("totalGo", b.totalGo) - - var n int32 - - // b.queueBacklogRate合法范围[0,1] - // 当b.queueBacklogRate = 0时,直接开n个 - // 当b.queueBacklogRate在(0, 1]区间时,每次开一个 - if 0 < b.queueBacklogRate && b.queueBacklogRate <= 1 { - n = 1 - } else if b.initGo <= b.totalGo && b.totalGo < b.coreGo { - n = b.coreGo - b.totalGo - } else if b.coreGo <= b.totalGo && b.totalGo < b.maxGo { - n = b.maxGo - b.totalGo + allowGo := b.maxGo - b.initGo + needGo := int32(len(b.queue)) - b.initGo + if needGo > 0 { + if needGo <= allowGo { + n += needGo + } else { + n += allowGo + } } - // log.Println("开协程", n, "max-id", id+int(n-1), "totalGo", b.totalGo+n) - b.mutex.RUnlock() - b.increaseTotalGo(n) for i := int32(0); i < n; i++ { - go b.goroutine(id) - id++ + go b.goroutine(int(atomic.LoadInt32(&b.id))) + atomic.AddInt32(&b.id, 1) } - + atomic.CompareAndSwapInt32(&b.state, stateLocked, stateRunning) + return nil } } } diff --git a/pool/task_pool_test.go b/pool/task_pool_test.go index ffef8073..dcb402b9 100644 --- a/pool/task_pool_test.go +++ b/pool/task_pool_test.go @@ -234,6 +234,120 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { assert.Equal(t, stateRunning, pool.internalState()) }) + t.Run("Start —— 在TaskPool启动前队列中已有任务,启动后不再Submit", func(t *testing.T) { + + t.Run("WithCoreGo,WithMaxIdleTime,所需要协程数 <= 允许创建的协程数", func(t *testing.T) { + + initGo, coreGo, maxIdleTime := 1, 3, 3*time.Millisecond + queueSize := coreGo + + needGo, allowGo := queueSize-initGo, coreGo-initGo + assert.LessOrEqual(t, needGo, allowGo) + + pool, err := NewOnDemandBlockTaskPool(initGo, queueSize, WithCoreGo(int32(coreGo)), WithMaxIdleTime(maxIdleTime)) + assert.NoError(t, err) + + assert.Equal(t, int32(0), pool.numOfGo()) + + done := make(chan struct{}, coreGo) + wait := make(chan struct{}, coreGo) + + for i := 0; i < coreGo; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done + return nil + })) + assert.NoError(t, err) + } + + assert.Equal(t, int32(0), pool.numOfGo()) + + assert.NoError(t, pool.Start()) + + for i := 0; i < coreGo; i++ { + <-wait + } + assert.Equal(t, int32(coreGo), pool.numOfGo()) + }) + + t.Run("WithMaxGo, 所需要协程数 > 允许创建的协程数", func(t *testing.T) { + initGo, maxGo := 3, 5 + queueSize := maxGo + 1 + + needGo, allowGo := queueSize-initGo, maxGo-initGo + assert.Greater(t, needGo, allowGo) + + pool, err := NewOnDemandBlockTaskPool(initGo, queueSize, WithMaxGo(int32(maxGo))) + assert.NoError(t, err) + + assert.Equal(t, int32(0), pool.numOfGo()) + + done := make(chan struct{}, queueSize) + wait := make(chan struct{}, queueSize) + + for i := 0; i < queueSize; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done + return nil + })) + assert.NoError(t, err) + } + + assert.Equal(t, int32(0), pool.numOfGo()) + + assert.NoError(t, pool.Start()) + + for i := 0; i < maxGo; i++ { + <-wait + } + assert.Equal(t, int32(maxGo), pool.numOfGo()) + }) + }) + + t.Run("Start —— 与Submit并发调用,WithCoreGo,WithMaxIdleTime,WithMaxGo,所需要协程数 < 允许创建的协程数", func(t *testing.T) { + + initGo, coreGo, maxGo, maxIdleTime := 2, 4, 6, 3*time.Millisecond + queueSize := coreGo + + needGo, allowGo := queueSize-initGo, maxGo-initGo + assert.Less(t, needGo, allowGo) + + pool, err := NewOnDemandBlockTaskPool(initGo, queueSize, WithCoreGo(int32(coreGo)), WithMaxGo(int32(maxGo)), WithMaxIdleTime(maxIdleTime)) + assert.NoError(t, err) + + assert.Equal(t, int32(0), pool.numOfGo()) + + done := make(chan struct{}, queueSize) + wait := make(chan struct{}, queueSize) + + // 与下方阻塞提交并发调用 + errChan := make(chan error) + go func() { + time.Sleep(10 * time.Millisecond) + errChan <- pool.Start() + }() + + // 模拟阻塞提交 + for i := 0; i < maxGo; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done + return nil + })) + assert.NoError(t, err) + } + + assert.NoError(t, <-errChan) + + for i := 0; i < maxGo; i++ { + <-wait + } + + assert.Equal(t, int32(maxGo), pool.numOfGo()) + }) + t.Run("Submit", func(t *testing.T) { t.Parallel() @@ -313,43 +427,20 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { t.Run("从初始数达到核心数", func(t *testing.T) { t.Parallel() - t.Run("一次性全开", func(t *testing.T) { + t.Run("核心数比初始数多1个", func(t *testing.T) { t.Parallel() - t.Run("核心数比初始数多1个", func(t *testing.T) { - t.Parallel() - - initGo, coreGo, maxIdleTime := int32(1), int32(2), 3*time.Millisecond - queueSize := int(coreGo) - testExtendNumGoFromInitGoToCoreGoAtOnce(t, initGo, queueSize, coreGo, maxIdleTime) - }) - - t.Run("核心数比初始数多n个", func(t *testing.T) { - t.Parallel() - - initGo, coreGo, maxIdleTime := int32(1), int32(3), 3*time.Millisecond - queueSize := int(coreGo) - testExtendNumGoFromInitGoToCoreGoAtOnce(t, initGo, queueSize, coreGo, maxIdleTime) - }) + initGo, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(3), 3*time.Millisecond, 0.1 + queueSize := int(coreGo) + testExtendGoFromInitGoToCoreGo(t, initGo, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) }) - t.Run("一次一个开", func(t *testing.T) { + t.Run("核心数比初始数多n个", func(t *testing.T) { t.Parallel() - t.Run("核心数比初始数多1个", func(t *testing.T) { - initGo, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(3), 3*time.Millisecond, 0.1 - queueSize := int(coreGo) - testExtendNumGoFromInitGoToCoreGoStepByStep(t, initGo, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) - }) - - t.Run("核心数比初始数多n个", func(t *testing.T) { - t.Parallel() - - initGo, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(5), 3*time.Millisecond, 0.1 - queueSize := int(coreGo) - testExtendNumGoFromInitGoToCoreGoStepByStep(t, initGo, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) - - }) + initGo, coreGo, maxIdleTime, queueBacklogRate := int32(2), int32(5), 3*time.Millisecond, 0.1 + queueSize := int(coreGo) + testExtendGoFromInitGoToCoreGo(t, initGo, queueSize, coreGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) }) t.Run("在(初始数,核心数]区间的协程运行完任务后,在等待退出期间再次抢到任务", func(t *testing.T) { @@ -359,37 +450,41 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { queueSize := int(coreGo) pool := testNewRunningStateTaskPool(t, int(initGo), queueSize, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) + + assert.Equal(t, initGo, pool.numOfGo()) + t.Log("1") done := make(chan struct{}, queueSize) wait := make(chan struct{}, queueSize) for i := 0; i < queueSize; i++ { - // i := i + i := i err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { wait <- struct{}{} <-done - // t.Log("task done", i) + t.Log("task done", i) return nil })) assert.NoError(t, err) } - + t.Log("2") for i := 0; i < queueSize; i++ { + t.Log("wait ", i) <-wait } assert.Equal(t, coreGo, pool.numOfGo()) close(done) - + t.Log("3") err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { <-done - // t.Log("task done [x]") + t.Log("task done [x]") return nil })) assert.NoError(t, err) - + t.Log("4") // <-time.After(maxIdleTime * 100) for pool.numOfGo() > initGo { - // t.Log("loop", "numOfGo", pool.numOfGo(), "timeoutGroup", pool.timeoutGroup.size()) + t.Log("loop", "numOfGo", pool.numOfGo(), "timeoutGroup", pool.timeoutGroup.size()) time.Sleep(maxIdleTime) } assert.Equal(t, initGo, pool.numOfGo()) @@ -399,92 +494,91 @@ func TestOnDemandBlockTaskPool_In_Running_State(t *testing.T) { t.Run("从核心数到达最大数", func(t *testing.T) { t.Parallel() - t.Run("一次性全开", func(t *testing.T) { + t.Run("最大数比核心数多1个", func(t *testing.T) { t.Parallel() - t.Run("最大数比核心数多1个", func(t *testing.T) { - t.Parallel() - - initGo, coreGo, maxGo, maxIdleTime := int32(2), int32(4), int32(5), 3*time.Millisecond - queueSize := int(maxGo) - testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t, initGo, queueSize, coreGo, maxGo, maxIdleTime) - }) - - t.Run("最大数比核心数多n个", func(t *testing.T) { - t.Parallel() - - initGo, coreGo, maxGo, maxIdleTime := int32(2), int32(3), int32(5), 3*time.Millisecond - queueSize := int(maxGo) - testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t, initGo, queueSize, coreGo, maxGo, maxIdleTime) - }) - + initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(2), int32(4), int32(5), 3*time.Millisecond, 0.1 + queueSize := int(maxGo) + testExtendGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) }) - t.Run("一次一个开", func(t *testing.T) { + t.Run("最大数比核心数多n个", func(t *testing.T) { t.Parallel() - t.Run("最大数比核心数多1个", func(t *testing.T) { - t.Parallel() - - initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(2), int32(4), int32(5), 3*time.Millisecond, 0.1 - queueSize := int(maxGo) - testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t, initGo, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) - }) - - t.Run("最大数比核心数多n个", func(t *testing.T) { - t.Parallel() - - initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(6), 3*time.Millisecond, 0.1 - queueSize := int(maxGo) - testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t, initGo, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) - }) + initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(6), 3*time.Millisecond, 0.1 + queueSize := int(maxGo) + testExtendGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, maxGo, maxIdleTime, WithQueueBacklogRate(queueBacklogRate)) }) }) }) } -type extendStrategyCheckFunc func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) +func testExtendGoFromInitGoToCoreGo(t *testing.T, initGo int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { -func testExtendNumGoFromInitGoToCoreGoAtOnce(t *testing.T, initGo int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { - extendToCoreGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { - assert.Equal(t, coreGo, pool.numOfGo()) - } opts = append(opts, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, coreGo, extendToCoreGoAtOnce, nil, opts...) -} + pool := testNewRunningStateTaskPool(t, int(initGo), queueSize, opts...) + + assert.Equal(t, initGo, pool.numOfGo()) -func testExtendNumGoFromInitGoToCoreGoStepByStep(t *testing.T, initGo int32, queueSize int, coreGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { - extendToCoreGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { - assert.Equal(t, i, pool.numOfGo()) + assert.LessOrEqual(t, initGo, coreGo) + + done := make(chan struct{}) + wait := make(chan struct{}, coreGo) + + // 稳定在initGo + t.Log("XX") + for i := int32(0); i < initGo; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done + return nil + })) + assert.NoError(t, err) + t.Log("submit ", i) } - opts = append(opts, WithCoreGo(coreGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, coreGo, extendToCoreGoAtOnce, nil, opts...) -} -func testExtendNumGoFromInitGoToCoreGoAndMaxGoAtOnce(t *testing.T, initGo int32, queueSize int, coreGo int32, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { - extendToCoreGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { - assert.Equal(t, coreGo, pool.numOfGo()) + t.Log("YY") + for i := int32(0); i < initGo; i++ { + <-wait } - extendToMaxGoAtOnce := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { - assert.Equal(t, maxGo, pool.numOfGo()) + + // 至少initGo个协程 + assert.GreaterOrEqual(t, pool.numOfGo(), initGo) + + t.Log("ZZ") + + // 逐步添加任务 + for i := int32(1); i <= coreGo-initGo; i++ { + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + wait <- struct{}{} + <-done + return nil + })) + assert.NoError(t, err) + <-wait + t.Log("after wait coreGo", coreGo, i, pool.numOfGo()) } - opts = append(opts, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, maxGo, extendToCoreGoAtOnce, extendToMaxGoAtOnce, opts...) -} -func testExtendNumGoFromInitGoToCoreGoAndMaxGoStepByStep(t *testing.T, initGo int32, queueSize int, coreGo, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { - extendStepByStep := func(t *testing.T, i int32, pool *OnDemandBlockTaskPool) { - assert.Equal(t, i, pool.numOfGo()) + t.Log("UU") + + assert.Equal(t, pool.numOfGo(), coreGo) + close(done) + + // 等待最大空闲时间后稳定在initGo + for pool.numOfGo() > initGo { } - opts = append(opts, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) - testExtendNumGoFromInitGoToCoreGoAndMaxGo(t, initGo, queueSize, coreGo, maxGo, extendStepByStep, extendStepByStep, opts...) + + assert.Equal(t, initGo, pool.numOfGo()) } -func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queueSize int, coreGo, maxGo int32, duringExtendToCoreGo extendStrategyCheckFunc, duringExtendToMaxGo extendStrategyCheckFunc, opts ...option.Option[OnDemandBlockTaskPool]) { +func testExtendGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queueSize int, coreGo, maxGo int32, maxIdleTime time.Duration, opts ...option.Option[OnDemandBlockTaskPool]) { + opts = append(opts, WithCoreGo(coreGo), WithMaxGo(maxGo), WithMaxIdleTime(maxIdleTime)) pool := testNewRunningStateTaskPool(t, int(initGo), queueSize, opts...) + assert.Equal(t, initGo, pool.numOfGo()) + assert.LessOrEqual(t, initGo, coreGo) assert.LessOrEqual(t, coreGo, maxGo) @@ -492,6 +586,7 @@ func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queue wait := make(chan struct{}, maxGo) // 稳定在initGo + t.Log("00") for i := int32(0); i < initGo; i++ { err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { wait <- struct{}{} @@ -499,13 +594,16 @@ func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queue return nil })) assert.NoError(t, err) + t.Log("submit ", i) } - // t.Log("AA") + t.Log("AA") for i := int32(0); i < initGo; i++ { <-wait } - assert.Equal(t, initGo, pool.numOfGo()) - // t.Log("BB") + + assert.GreaterOrEqual(t, pool.numOfGo(), initGo) + + t.Log("BB") // 逐步添加任务 for i := int32(1); i <= coreGo-initGo; i++ { @@ -515,17 +613,13 @@ func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queue return nil })) assert.NoError(t, err) - // t.Log("before wait", "m", m, "i", i, "m-n", m-n, "len(wait)", len(wait), "len(queue)", len(pool.queue), "numGO", pool.numOfGo(), "nextnumGO", pool.expectedNumGo) <-wait - // t.Log("after wait coreGo", coreGo, i, pool.numOfGo()) - - duringExtendToCoreGo(t, i+initGo, pool) - // assert.Equal(t, i+n, pool.numOfGo()) + t.Log("after wait coreGo", coreGo, i, pool.numOfGo()) } - // t.Log("CC") + t.Log("CC") - assert.Equal(t, coreGo, pool.numOfGo()) + assert.GreaterOrEqual(t, pool.numOfGo(), coreGo) for i := int32(1); i <= maxGo-coreGo; i++ { @@ -535,20 +629,16 @@ func testExtendNumGoFromInitGoToCoreGoAndMaxGo(t *testing.T, initGo int32, queue return nil })) assert.NoError(t, err) - // t.Log("before wait", "m", m, "i", i, "m-n", m-n, "len(wait)", len(wait), "len(queue)", len(pool.queue), "numGO", pool.numOfGo(), "nextnumGO", pool.expectedNumGo) <-wait - // t.Log("after wait maxGo", maxGo, i, pool.numOfGo()) - - duringExtendToMaxGo(t, i+coreGo, pool) + t.Log("after wait maxGo", maxGo, i, pool.numOfGo()) } - // t.Log("DD") + t.Log("DD") - assert.Equal(t, maxGo, pool.numOfGo()) + assert.Equal(t, pool.numOfGo(), maxGo) close(done) - // 等待最大空闲时间后,稳定在n - // <-time.After(waitTime) + // 等待最大空闲时间后稳定在initGo for pool.numOfGo() > initGo { } assert.Equal(t, initGo, pool.numOfGo()) @@ -604,7 +694,7 @@ func TestOnDemandBlockTaskPool_In_Closing_State(t *testing.T) { assert.ErrorIs(t, err, errTaskPoolIsStopped) }) - t.Run("Shutdown —— 协程数仍能按需扩展,调度循环也能自然退出", func(t *testing.T) { + t.Run("Shutdown —— 协程数按需扩展至maxGo,调用Shutdown成功后,所有协程运行完任务后可以自动退出", func(t *testing.T) { t.Parallel() initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(5), 10*time.Millisecond, 0.1 @@ -630,10 +720,7 @@ func TestOnDemandBlockTaskPool_In_Closing_State(t *testing.T) { shutdownDone, err := pool.Shutdown() assert.NoError(t, err) - // 等待close(b.queue)信号传递到各个协程 - time.Sleep(1 * time.Second) - - // 调度循环应该正常工作,一直按需开协程直到maxGo + // 已提交的任务应该正常运行并能扩展至maxGo for i := int32(0); i < maxGo; i++ { <-wait } @@ -647,6 +734,8 @@ func TestOnDemandBlockTaskPool_In_Closing_State(t *testing.T) { for pool.numOfGo() != 0 { } + + // 最终全部退出 assert.Equal(t, int32(0), pool.numOfGo()) }) @@ -736,7 +825,7 @@ func TestOnDemandBlockTaskPool_In_Stopped_State(t *testing.T) { assert.Equal(t, stateStopped, pool.internalState()) }) - t.Run("ShutdownNow —— 工作协程数不再扩展,调度循环立即退出", func(t *testing.T) { + t.Run("ShutdownNow —— 工作协程数扩展至maxGo后,调用ShutdownNow成功,所有协程能够接收到信号", func(t *testing.T) { t.Parallel() initGo, coreGo, maxGo, maxIdleTime, queueBacklogRate := int32(1), int32(3), int32(5), 10*time.Millisecond, 0.1 @@ -758,29 +847,15 @@ func TestOnDemandBlockTaskPool_In_Stopped_State(t *testing.T) { assert.NoError(t, err) } - // 使调度循环进入default分支 - for i := int32(0); i < coreGo; i++ { - <-wait - } - tasks, err := pool.ShutdownNow() assert.NoError(t, err) - // 见下方双重检查 - assert.GreaterOrEqual(t, len(tasks)+int(pool.numOfGo()), queueSize) + assert.GreaterOrEqual(t, len(tasks), 0) // 让所有任务结束 close(taskDone) // 用循环取代time.After/time.Sleep - // 特殊场景需要双重检查 - // 协程1工作中,调度循环处于default分支准备扩展协程(新增一个),此时调用ShutdownNow() - // 协程1完成工作接收到ShutdownNow()信号退出,而协程2还未开启可以使pool.numOfGo()短暂为0 - // 协程2启动后直接收到ShutdownNow()信号退出 for pool.numOfGo() != 0 { - - } - for pool.numOfGo() != 0 { - } assert.Equal(t, int32(0), pool.numOfGo()) @@ -877,15 +952,11 @@ func testNewRunningStateTaskPoolWithQueueFullFilled(t *testing.T, initGo int, qu pool := testNewRunningStateTaskPool(t, initGo, queueSize) wait := make(chan struct{}) for i := 0; i < initGo+queueSize; i++ { - func() { - err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { - <-wait - return nil - })) - if err != nil { - return - } - }() + err := pool.Submit(context.Background(), TaskFunc(func(ctx context.Context) error { + <-wait + return nil + })) + assert.NoError(t, err) } return pool, wait }