Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

重构TaskPool实现 #80 中讨论的新需求 #93

Merged
merged 4 commits into from
Sep 23, 2022

Conversation

longyue0521
Copy link
Collaborator

根据 #80 中讨论尤其是 讨论1讨论2

最终我没有采取在讨论2提到的按协程分类创建策略.原因如下:

  1. 代码结构上的重复,只有微小差别,还不好抽出方法来复用.
  2. 沿用 WIP 实现复用Go协程的调度模式 #80 中的例子,当协程数达到最大协程数即30时,此时初始协程数的协程们(10个)恰好运行任务结束,它们既没有任务运行也不能退出.当初始数、核心数及最大数的数值很大时更是一个问题.

本次重构采用的是在协程运行任务结束后,根据当前协程总数(totalNumOfGo)与初始数(10, initGo),核心数(20, coreGo)的关系来动态随机划分协程.并保证最终稳定在“初始数”即10个. 算法如下:

  1. 如果当前协程满足b.coreGo < b.totalNumOfGo && (len(b.queue) == 0 || int32(len(b.queue)) < b.totalNumOfGo) 则更新协程总数并直接退出
  2. 如果当前协程满足b.initGo < b.totalNumOfGo-b.timeoutGroup.size()那么当前协程会被设置定时器,在“最大空闲时间”内没接到任务会因为超时而退出.
  3. 其他情况要么因为队列中有任务,要么因为当前协程处于“初始数”组中不处理.

timeoutGroup是超时组,有两个作用:

  1. 通过协程id将其标记为“核心组”/超时组,使b.totalNumOfGo-b.timeoutGroup.size()能够准确反应结果保证动态分类的正确性
  2. timeoutGroup明确表示意图,而不是用idleTimer来表示这个逻辑分组.不再依赖于idleTimer.Stop这个不确定性大的方法

Signed-off-by: longyue0521 <longyueli0521@gmail.com>
@codecov
Copy link

codecov bot commented Sep 19, 2022

Codecov Report

Merging #93 (4fbc02a) into dev (25b0743) will decrease coverage by 0.04%.
The diff coverage is 96.84%.

@@            Coverage Diff             @@
##              dev      #93      +/-   ##
==========================================
- Coverage   96.31%   96.26%   -0.05%     
==========================================
  Files          23       23              
  Lines         895     1044     +149     
==========================================
+ Hits          862     1005     +143     
- Misses         26       30       +4     
- Partials        7        9       +2     
Impacted Files Coverage Δ
pool/task_pool.go 98.12% <96.84%> (-1.88%) ⬇️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

Signed-off-by: longyue0521 <longyueli0521@gmail.com>
@flycash
Copy link
Contributor

flycash commented Sep 20, 2022

可以画一个流程图吗?我确实有点没太看懂

Copy link
Contributor

@flycash flycash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

按照我的理解:

  • 要不要开 goroutine:应该是在 submit task 的时候。每次 submit 之后看一下队列和goroutine 的数量,然后决定开不开;
  • 要不要关掉 goroutine 是在每个 goroutine 从队列里面拿 task 的时候。相当于最大空闲时间创建一个 context,作为一个 case,和队列里面拿一个 task 作为一个 case,进行 select
  • 在开 goroutine 的时候,每次应该最多开一个。而 initGo 应该是在 Start 的时候就创建起来了
  • 在关 goroutine 的时候,则需要判断有没有下降到 initGo。

Comment on lines 152 to 154
numGoRunningTasks int32

totalNumOfGo int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个可以稍微对齐一下下面的 initGo, coreGo, maxGo?比如说 runningGo, totalGo

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numGoRunningTasks 表示有多少个goroutine处于case task, ok := <-queue分支中, 要不workingGo?
totalNumOfGo已经改为totalGo

pool/task_pool.go Outdated Show resolved Hide resolved
pool/task_pool.go Outdated Show resolved Hide resolved
pool/task_pool.go Outdated Show resolved Hide resolved
pool/task_pool.go Outdated Show resolved Hide resolved
@longyue0521
Copy link
Collaborator Author

longyue0521 commented Sep 20, 2022

@flycash

  • 要不要开 goroutine:应该是在 submit task 的时候。每次 submit 之后看一下队列和goroutine 的数量,然后决定开不开;

submit根据需求是可以在未start的情况下直接调用的即一次性填满队列再调用start若此后不再submit任何任务,那么此时“开新协程”的逻辑在start种也要有.

  • 在开 goroutine 的时候,每次应该最多开一个。而 initGo 应该是在 Start 的时候就创建起来了

为什么每次应该最多开一个? 这种决策处于什么考量?

举例,initGo=10, coreGo=20, maxGo=30,queueSize = 50,用户在未start的情况下将queue填满,再调用start并在此后不再submit任何任务.此时start除了要开initGo个协程外(此时totalGo=initGo=10),还要再开多少个协程?怎么开,一个一个开还是开多个?什么时机开,submit不再调用,start一旦返回不可重复调用?

  1. 根据totalGo,maxGo与len(queue)的关系一次开一个直到关系不满足?
  2. 还是根据三者之间的关系一次开多个?

依照上面的例子,当totalGo < maxGo && len(queue) != 0 成立时,为什么不直接开 min(maxGo-totalGo, len(queue)) = min(30-10,40) = 20个新协程呢? 如果不在Start方法内开到maxGo个,后续将不再有机会开协程,因为用户不再submit任务

  • 要不要关掉 goroutine 是在每个 goroutine 从队列里面拿 task 的时候。相当于最大空闲时间创建一个 context,作为一个 case,和队列里面拿一个 task 作为一个 case,进行 select
func goroutine() {
    ctx, _ := context.WithTimeout(context.Background(), maxIdleTime)
    select {
       case <-ctx.Done():
           // 根据最大空闲时间创建的ctx
           return
       case task, ok := <-b.queue:
          // 执行任务
    }
}

是上面这段伪代码的意思吗? 如果是,你把context当成现有代码中的idleTimer就行了,思路是一样的.

之所以用timer而不是context详见 #65 以及“最大空闲时间”的定义.当前协程如果处在(初始,核心]范围内并且一直没有抢到任务进入case task, ok := <-b.queue:case,则因上面为代码中的case <-ctx.Done():或现有实现中的case <-idleTimer.C而退出.

判断当前goroutine是否要关闭的时机是在task.Run()之后但还没有离开case task, ok := <-b.queue:之前

  • 在关 goroutine 的时候,则需要判断有没有下降到 initGo。

task.Run()之后的代码逻辑,就是判断goroutine的总数是如何变化的

  1. 如果coreGo < totalGo(当前协程总数)且len(queue)==0,那么说明当前协程处于(coreGo,maxGo]区间,根据需求 WIP 实现复用Go协程的调度模式 #80 中的描述——直接退出
  2. 如果coreGo < totalGo有任务,那么继续去抢任务.
  3. 如果initGo < totalGo 表明当前协程处于(initGo, coreGo]区间,根据需求要给它一个最大空闲时间计时器.防止没有任务一直等待. 这里之所没有用initGo < totalGo && len(queue) == 0是因为queue中任务数量是随时变化的,现在看有任务但可能真到case task, ok := <-b.queue:的时候又拿不到,所以只要超过initGo初始协程数这个范围的协程,都会得到一个最大空闲时间计时器. 这也符合需求的描述,initGo个协程是常驻的,(initGo,coreGo]协程是有最大空闲时间的,(coreGo,maxGo]协程是“一次性的”运行完任务没有下一个可以运行的任务立即退出.

@longyue0521
Copy link
Collaborator Author

可以画一个流程图吗?我确实有点没太看懂

先看最新提交,具体说说哪里没看懂

@flycash
Copy link
Contributor

flycash commented Sep 22, 2022

@flycash

  • 要不要开 goroutine:应该是在 submit task 的时候。每次 submit 之后看一下队列和goroutine 的数量,然后决定开不开;

submit根据需求是可以在未start的情况下直接调用的即一次性填满队列再调用start若此后不再submit任何任务,那么此时“开新协程”的逻辑在start种也要有.

  • 在开 goroutine 的时候,每次应该最多开一个。而 initGo 应该是在 Start 的时候就创建起来了

为什么每次应该最多开一个? 这种决策处于什么考量?

举例,initGo=10, coreGo=20, maxGo=30,queueSize = 50,用户在未start的情况下将queue填满,再调用start并在此后不再submit任何任务.此时start除了要开initGo个协程外(此时totalGo=initGo=10),还要再开多少个协程?怎么开,一个一个开还是开多个?什么时机开,submit不再调用,start一旦返回不可重复调用?

  1. 根据totalGo,maxGo与len(queue)的关系一次开一个直到关系不满足?
  2. 还是根据三者之间的关系一次开多个?

依照上面的例子,当totalGo < maxGo && len(queue) != 0 成立时,为什么不直接开 min(maxGo-totalGo, len(queue)) = min(30-10,40) = 20个新协程呢? 如果不在Start方法内开到maxGo个,后续将不再有机会开协程,因为用户不再submit任务

  • 要不要关掉 goroutine 是在每个 goroutine 从队列里面拿 task 的时候。相当于最大空闲时间创建一个 context,作为一个 case,和队列里面拿一个 task 作为一个 case,进行 select
func goroutine() {
    ctx, _ := context.WithTimeout(context.Background(), maxIdleTime)
    select {
       case <-ctx.Done():
           // 根据最大空闲时间创建的ctx
           return
       case task, ok := <-b.queue:
          // 执行任务
    }
}

是上面这段伪代码的意思吗? 如果是,你把context当成现有代码中的idleTimer就行了,思路是一样的.

之所以用timer而不是context详见 #65 以及“最大空闲时间”的定义.当前协程如果处在(初始,核心]范围内并且一直没有抢到任务进入case task, ok := <-b.queue:case,则因上面为代码中的case <-ctx.Done():或现有实现中的case <-idleTimer.C而退出.

判断当前goroutine是否要关闭的时机是在task.Run()之后但还没有离开case task, ok := <-b.queue:之前

  • 在关 goroutine 的时候,则需要判断有没有下降到 initGo。

task.Run()之后的代码逻辑,就是判断goroutine的总数是如何变化的

  1. 如果coreGo < totalGo(当前协程总数)且len(queue)==0,那么说明当前协程处于(coreGo,maxGo]区间,根据需求 WIP 实现复用Go协程的调度模式 #80 中的描述——直接退出
  2. 如果coreGo < totalGo有任务,那么继续去抢任务.
  3. 如果initGo < totalGo 表明当前协程处于(initGo, coreGo]区间,根据需求要给它一个最大空闲时间计时器.防止没有任务一直等待. 这里之所没有用initGo < totalGo && len(queue) == 0是因为queue中任务数量是随时变化的,现在看有任务但可能真到case task, ok := <-b.queue:的时候又拿不到,所以只要超过initGo初始协程数这个范围的协程,都会得到一个最大空闲时间计时器. 这也符合需求的描述,initGo个协程是常驻的,(initGo,coreGo]协程是有最大空闲时间的,(coreGo,maxGo]协程是“一次性的”运行完任务没有下一个可以运行的任务立即退出.

我草,我都忘了可以提交完任务之后再开始了,你忽略

Copy link
Contributor

@flycash flycash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

稍等一下,我试试在你的合并请求的基础上,我看看能不能去掉 timeoutGroup 和 timer,因为这两个是代码难以理解的根源。

基本想法是在进来 goroutine 方法的时候使用 context 来控制最大空闲时间,然后每次超时看一下自己是不是能够被关掉,能的话就关掉。

Comment on lines +206 to +210
if b.coreGo != b.initGo && b.maxGo == b.initGo {
b.maxGo = b.coreGo
} else if b.coreGo == b.initGo && b.maxGo != b.initGo {
b.coreGo = b.maxGo
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得这个调整和下面的校验,可以只保留一种做法。也就是说,如果你传入的参数不对,我自己去调整对,这是一种做法;另外一种做法就是我完全不会帮你调整,直接报错。

这里面就相当于,我调整了,但是我只调整了一点点

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我希望WithCoreGo(coreGo)和WithMaxGo(maxGo)是相互对立的且可以单独使用的,所以这里的调整是必要的.

默认情况下initGo=coreGo=maxGo, 当用户只使用WithCoreGo(coreGo)时,走下面的校验就可能会因为initGo=maxGo < coreGo报错, 这样用户就会比较困惑.

@longyue0521
Copy link
Collaborator Author

longyue0521 commented Sep 22, 2022

基本想法是在进来 goroutine 方法的时候使用 context 来控制最大空闲时间,然后每次超时看一下自己是不是能够被关掉,能的话就关掉。

起初我考虑过使用context但想到上次的讨论 #65 以及这次需求场景才改为timer的.

有这样一种情况,当执行流程进入case task, ok := <-queue后,如果此时不“关闭”context,那么再次select的时候就很可能因为超时进入case <-context.Done()而此时queue中是有任务的,我们是希望它继续执行任务的. 这即不符合需求,也不符合“最大空闲时间”的定义,因为当前协程已拿到了task,再次select的时候需要重新计时.虽然可以在case task, ok := <-queue中重新赋予一个新的context表示重新计时,但这显然不是context的用法,所以才改为了timer.

timer也符合语义,运行完任务就得到一个计时器——如果得到的是stop的timer,那它就是initGo区间内的常驻协程.如果是未stop的timer,倒计时到了你就退出.

@longyue0521
Copy link
Collaborator Author

稍等一下,我试试在你的合并请求的基础上,我看看能不能去掉 timeoutGroup 和 timer,因为这两个是代码难以理解的根源。

timeoutGroup原本是一个int32原子变量用于计数,用if idleTimer.Stop() { atomic.AddInt32(&timeoutGroup, -1)},也能达到相同效果.但是idleTimer的Stop方法偏偏不能保证一定成功——刚进入case task, ok := <-queue后就超时了,此时它已经拿到任务了应该从timeoutGroup中减一,但因为恰好超时没走减1的流程就会导致后续逻辑不对.所以改为依赖稳定的timeoutGroup,它即解决了“计数”问题又解决了“timer.Stop”不保证一定成功的问题.

@flycash
Copy link
Contributor

flycash commented Sep 23, 2022

确实没办法去掉。我先合并了,后面我准备弄一个模糊测试来测试长时间运行的这个 pool 会不会出现没有调度或者死锁之类的问题

@flycash flycash merged commit 644ced1 into ecodeclub:dev Sep 23, 2022
@longyue0521 longyue0521 mentioned this pull request Sep 23, 2022
@longyue0521 longyue0521 deleted the refactor/TaskPool branch September 24, 2022 03:44
@longyue0521 longyue0521 mentioned this pull request Apr 11, 2023
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants