Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP 实现复用Go协程的调度模式 #80

Closed
wants to merge 0 commits into from

Conversation

longyue0521
Copy link
Collaborator

@longyue0521 longyue0521 commented Sep 8, 2022

  1. 原有schedulingTasks重名为schedulingTasksOnDemand
  2. 现在schedulingTasks一次性开启maxGo个协程来接任务
  3. 暂时未改动接口,通过引入schedulingMod,并以硬编码的方式使调度流程走schedulingTasks即“复用Go协程”的调度模式,所以测试覆盖率会有所下降

@codecov
Copy link

codecov bot commented Sep 8, 2022

Codecov Report

Merging #80 (48bb2e1) into dev (a8dc2a7) will decrease coverage by 12.76%.
The diff coverage is 87.09%.

❗ Current head 48bb2e1 differs from pull request most recent head 74bd9bf. Consider uploading reports for the commit 74bd9bf to get more accurate results

@@             Coverage Diff              @@
##               dev      #80       +/-   ##
============================================
- Coverage   100.00%   87.23%   -12.77%     
============================================
  Files            1       20       +19     
  Lines           10      752      +742     
============================================
+ Hits            10      656      +646     
- Misses           0       88       +88     
- Partials         0        8        +8     
Impacted Files Coverage Δ
slice/contains.go 0.00% <0.00%> (ø)
slice/index.go 0.00% <0.00%> (ø)
slice/map.go 0.00% <0.00%> (ø)
sqlx/encrypt.go 0.00% <0.00%> (ø)
pool/task_pool.go 83.16% <83.16%> (ø)
sqlx/json.go 86.36% <86.36%> (ø)
bean/copier/pure_reflect_copier.go 88.00% <88.00%> (ø)
list/concurrent_list.go 88.88% <88.88%> (ø)
bean/copier/reflect_copier.go 92.80% <92.80%> (ø)
list/array_list.go 96.42% <96.42%> (ø)
... and 10 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@@ -23,6 +23,11 @@ import (
"sync/atomic"
)

const (
OnDemand = iota
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果我们改成 lazy 和 eager 呢?就是一个是懒惰,一个是饥渴。

不过这种命名有一个问题,后面我还想引入初始协程数,核心协程数,以及最大协程数的调度算法,就不太好叫什么了

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

另外一个我是觉得没有必要引入这种 mode 的概念,我们可以直接提供不同的实现。一般来说,如果我们没有公共的接口 TaskPool 的话,那么引入 mode 能够稍微减轻一点用户从一种池子切换到另外一种池子的负担。但是我们现在有这个公共接口,那么就可以要求用户切换不同的形态就创建不同的实现。

从我的个人设计倾向来说,我也觉得 mode 这种标记位总得来说不是很优雅,因为必然要有一堆 switch else。如果我们能够控制住 mode 只在一个分发的地方使用,那么还能控制住,否则就容易出现 Mode 满天飞,一大堆的 if-else 或者 switch case。

而相比之下,直接创建一个新的实现要清晰很多。

在这里,我觉得不同实现之间,完全就是 Start 那里不同而已。所以我们完全可以有一个类似抽象类的东西,它来完成生命周期和状态流转管理,完成任务提交和任务队列管理。

只有在你 switch 那里,让不同的实现传入一个方法。有点像:

type basePool struct {

}

type OnDemandPool struct {
    basePool
}

func NewOnDemandPool() *OnDemandPool {
    res :=  &OnDemandPool{
    }
    res.basePool.scheduleAlgorithm = res.schedule
}

实际上,如果是在有继承机制的语言里面,这里大概是用继承来实现的。Go 的组合还是差点意思

Copy link
Collaborator Author

@longyue0521 longyue0521 Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 当前这个PR是试验性的,主要想让你看看新的schedulingTasks的代码逻辑有没有问题
  2. 如果没有问题,讨论一下未来的需求”初始协程数,核心协程数,以及最大协程数的调度算法“ 我记得还有最大空闲/等待任务时间
  3. 确认一下API是否变动等(这些参数如何传进来Option模式,还是直接开放共有字段?),可以假定TaskPool已经实现,然后模拟用户使用TaskPool的场景来试着写写代码.
  4. 找到对实现未来需求更有利的那个方向,重构当前代码和测试(按需开协程和一次开启固定协程)
  5. 重构完成后,开始添加未来需求1,2,3等

我有种感觉,讨论完未来那些需求后,可能就不存在什么“按需创建”和“固定个数”模式了,而最终变成“混合”模式了,
根据那些参数在内部自动控制住go协程的数目,创建时机及销毁时机等

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

逻辑看起来没什么问题。

我觉得你的提议很好。选择公共字段还是 Option,取决于你要不要准备默认值,如果你要准备默认值,那么就用 Option 模式。当然如果你的构造过程很复杂,那么就可以用 Builder 模式。

你说的等待任务时间,是不是那种等待调度的时间超过一段时间,这个任务直接就被丢弃了?还是那种任务执行不能超过一分钟这种控制?。

最大空闲我倒是没理解

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一个go协程等了1小时(最大空闲时间)也没从队列里抢到任务运行,需不需要将其关闭,还是让它一直阻塞死等

选Option还是公开字段前我们需要讨论清楚,”初始协程数,核心协程数,以及最大协程数的调度算法“这些参数的意义,怎么用?以及对协程生命周期的影响(何时创建,何时销毁,还是只创建不销毁等)等问题, 这样才能知道是否提供默认参数,进而选择Option还是公开字段或builder模式.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

哦哦哦,就是类似连接的空闲时间。如果空闲了很久自然是要退出的,这个时间可以交给用户来选择。
初始:一开始就会创建出来
核心协程数:这个我在想可能不需要关掉协程。如果需要关掉的,我们可能还需要引入一个额外的最少活跃协程数,类似于最少活跃连接,确保在有任务的时候立刻可用。但是想来也不是有特别大的必要,因为 goroutine 比连接轻量多了,没有最少活跃数,核心协程也可以退出,看起来代价也不高;

  • 最大协程数:肯定不能超过这个协程数量

如果初始 goroutine 用完了,然后又有新的任务过来,那么可以启动新的 goroutine;如果达到了核心线程数,那么我觉得可以考虑允许任务放在队列里,等待任务多,我们就创建一个,直到达到上限。

按照 java 线程池的设计,他们是队列满了才会创建一个新协程,我倒是觉得满了才创建在实际中使用起来效果不是很好。

Copy link
Collaborator Author

@longyue0521 longyue0521 Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我举个例子,看看我是否正确理解了你的需求, 以及需求不明确的地方.

  1. 初始=10, 核心 = 20, 最大=30
  • Start后立即创建10个协程 —— 类似“固定创建”模式
  • 当发现此时len(b.queue) > X, X范围[0, queueSize)且当前协程数 < 核心协程数即20, 创建一个协程, 类似“按需创建“模式
  • 当发现此时当前协程数>=20且<30且len(b.queue) > Y, Y范围(0, queueSize). 创建一个协程, 类似“按需创建“模式
  • 当前协程数 = 最大协程数即30后,不再创建任何协程

X和Y的具体值是多少?

当协程数在(10, 20] 和 (20, 30]范围内时, 这些按需创建出的协程是常驻的与初始创建那10个等效,还是完成任务后自动退出?虽然你说核心协程数:这个我在想可能不需要关掉协程。如果需要关掉的但上面也提到用户指定“空闲时间”让协程自动退出.假设现在有了这个空闲时间,协程号在(10, 30]按需创建出来的协程需要监听空闲时间,如果达到就自动退出吗?

空闲时间是否要有一个检查,至少为多少吗?传0, -1等非法值在创建TaskPool阶段就报错吗?还是不管

  1. 当参数关系不合理时如何处理? 比如:初始>核心=最大; 初始>核心>最大; 初始>最大>核心; 核心>最大>初始; 等
    一刀切,只要不符合初始<=核心<=最大 这个关系,就按初始=核心=最大处理 还是分情况处理?
    初始 ==核心==最大, 我觉得合法, 相当于现有实现的“固定个数”模式
    初始<=核心==最大, 我觉得合法, 相当于现有实现的“按需创建”模式

  2. 是否允许在TaskPool运行中,动态地改变核心协程数及最大协程数?还是创建后不可改!

  3. 假设上面的需求已经实现,现在还有必要分出两个TaskPool实现吗?
    我觉得只需要一个TaskPool实现,让用户通过控制初始,核心,最大,空闲时间等参数就可以模拟出已实现的“按需创建”“固定个数”两个模式

  4. 使用Option还是公开字段?Option可以不破坏现有API(可选参数),但需要提供默认值,初始,核心,最大,空闲时间的默认值是多少?
    初始 ==核心==最大, 固定个数模式
    初始<=核心==最大, 初始<=核心<=最大, 固定个数+按需创建
    空闲时间默认值为多少合适呢?1h?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我思考了一下怎么控制这个东西,我觉得类比 Java 线程池和 go 连接池两个设计,能取得一个很不错的效果。我来描述一下这个过程,也就是 10, 20, 30 这个例子:

  • 最开始的时候创建 10 个协程;
  • 用户提交任务。如果此时 10 个协程都在繁忙中,那么用户提交任务,就直接开启一个 goroutine,直到达到 20(核心数);
  • 如果这个时候用户还继续提交任务,那么就会放到队列中,接下来我们会采用一个算法来判断要不要新建一个 goroutine:
    • 算法1:队列满了,我们直接创建一个 goroutine,直到达到 30
    • 算法2:队列满一半了,以 size/cap 的概率创建一个新的 goroutine
    • 算法3:直接创建新 goroutine,直到达到 30
  • 当 goroutine 数量在 20-30 的时候,一个 Goroutine 发现队列已经空了,那么它会直接退出,不需要等待空闲时间;
  • 当 goroutine 降低到 10-20 以下的时候,我们超过最大空闲时间,就关掉 goroutine
  • 当 goroutine 降低到 10 的时候,我们将不会关闭,即便一直空闲

这里面,有些地方是可以进一步讨论的:

  • 队列满一半了,以 size/cap 的概率创建一个新的 goroutine。这个我们甚至可以固定一个比较低的概率,比如说 0.3, 0.4 什么的。理论上来说,我们可以通过压测来观测队列中等待任务数量,和最终运行的 goroutine 数量来取一个概率,使得队列中不会有太多等待任务,goroutine 也不会有太多。注意,假如说我们的概率是 p,那么连续两次都没有触发创建 goroutine 的概率是 (1-p)^2
  • goroutine 降低到 10 的时候,可以一直保留,因为少量 goroutine 阻塞之后,只是占据了一点点内存。也可以达到最大空闲时间,就关掉;
  1. 参数问题,你可以自由决定校验还是不校验,以及是否兼容;
  2. 暂时不允许。从业务角度来说,用户可能希望能够监控队列,如果队列满了,他可能希望在运行期间动态调整。但是这个的前提是我们暴露了观测接口
  3. 你说得对,没必要,合并为一个就可以了
  4. 采用 Option 模式,默认值你可以根据自己的经验来设置,这方面其实我也没特别好的建议——我也是第一次设计这个。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flycash 我思考了两天还有些疑问

用户提交任务。如果此时 10 个协程都在繁忙中,那么用户提交任务,就直接开启一个 goroutine,直到达到 20(核心数);

  1. 能解释一下核心数的含义是什么? 这个概念我有点模糊, 核心是取自CPU内核的说法,还是go的GMP模型中的P,逻辑处理器个数的意思?

  2. 这个繁忙需要精准吗,就是协程必须处于运行task的过程中?
    因为10个常驻协程除了真正运行task,还可能在监听中断信号等并不一定正在运行task

如果不需要那么精准,下面这个判断是否可以:

初始 <= 当前协程总数 && 当前协程总数< 20(核心数) && len(b.queue) > 0

就是队列中有任务,没达到核心协程数20个就创建.

如果这个时候用户还继续提交任务,那么就会放到队列中,接下来我们会采用一个算法来判断要不要新建一个 goroutine:
算法1:队列满了,我们直接创建一个 goroutine,直到达到 30
算法2:队列满一半了,以 size/cap 的概率创建一个新的 goroutine
算法3:直接创建新 goroutine,直到达到 30

  1. 如果这个时候用户还继续提交任务,那么就会放到队列中读到这让我觉得上面那个需求我好像没理解对,上面那个需求说如果此时10个协程都在繁忙中,那么用户提交任务,就直接开启一个 goroutine,直到达到 20(核心数) 似乎任务不是先放到队列中的,并且用初始 <= 当前协程总数 && 当前协程总数< 20(核心数) && len(b.queue) > 0 这个判断来开goroutine直到核心数是不对的.

队列满一半了,以 size/cap 的概率创建一个新的 goroutine。这个我们甚至可以固定一个比较低的概率,比如说 0.3, 0.4 什么的。
理论上来说,我们可以通过压测来观测队列中等待任务数量,和最终运行的 goroutine 数量来取一个概率,使得队列中不会有太多等待任务,goroutine 也不会有太多。
注意,假如说我们的概率是 p,那么连续两次> 都没有触发创建 goroutine 的概率是 (1-p)^2

  1. 我们的目的是队列中不会有太多等待任务,goroutine也不会有太多但这个太多不好度量.我们只能提供一种机制即根据队列的len/cap概率创建协程,让用户自己去压测来自定义符合其业务场景的协程创建策略及概率.这也意味着len/cap也要抽象为入参.
    • 当len/cap传入1时, 意味着队列满了,就会开新协程,就是算法1
    • 当len/cap传入0.5时,意味着队列满一半了,就会开新协程,就是算法2
    • 当len/cap传入0时, 意味着只要队列中有任务,就会开新协程,就是算法3

当 goroutine 数量在 20-30 的时候,一个 Goroutine 发现队列已经空了,那么它会直接退出,不需要等待空闲时间;
当 goroutine 降低到 10-20 以下的时候,我们超过最大空闲时间,就关掉 goroutine
当 goroutine 降低到 10 的时候,我们将不会关闭,即便一直空闲

  1. 关于被创建出的协程的分类及退出策略的总结:
  • 在(0, 初始]即(0, 10]区间的协程为同一类,退出时机受Shutdown和ShutdownNow方法影响.
  • 在(初始, 核心]即(10, 20]区间的协程为同一类,退出时机受Shutdown和ShutdownNow方法、“最大空闲时间“参数、”队列中是否有任务“的影响.
    • 完成任务后,如果队列为空则等待“最大空闲时间”后退出,如果等待期间拿到任务,停掉计时器->执行任务,结束后—>队列不空,取任务运行;队列为空,则重置计时器.
  • 在(核心,最大]即(20, 30] 区间的协程为同一类,退出时机受到Shutdown和ShutdownNow、“队列中是否有任务”的影响
    • 如果队列有任务则拿任务运行,运行任务结束后,检查发现队列为空,则立即退出;队列不为空,重复前面的操作;
  1. 这意味着我们会有三类协程,它们内部监听着queue及不同退出策略.我需要给它们命名,你有什么好的建议吗?

    • 第一类(0, 10]叫它们,常驻协程(resident goroutine)
      • Resident Goroutine
    • 第二类(10, 20]叫它们,带有超时时间的临时协程
      • Temporary Goroutine With Timeout
    • 第三类(20, 30]叫它们,临时协程
      • Temporary Goroutine
  2. 上面这些理解与总结是否符合你的需求还有没有要补充的?

@@ -23,6 +23,11 @@ import (
"sync/atomic"
)

const (
OnDemand = iota
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

逻辑看起来没什么问题。

我觉得你的提议很好。选择公共字段还是 Option,取决于你要不要准备默认值,如果你要准备默认值,那么就用 Option 模式。当然如果你的构造过程很复杂,那么就可以用 Builder 模式。

你说的等待任务时间,是不是那种等待调度的时间超过一段时间,这个任务直接就被丢弃了?还是那种任务执行不能超过一分钟这种控制?。

最大空闲我倒是没理解

pool/task_pool.go Outdated Show resolved Hide resolved
@flycash flycash changed the title 实现复用Go协程的调度模式 WIP 实现复用Go协程的调度模式 Sep 9, 2022
@flycash
Copy link
Contributor

flycash commented Sep 9, 2022

我加了一个 WIP 前缀,等你都完成了就把它去掉

@longyue0521 longyue0521 marked this pull request as draft September 11, 2022 00:32
@longyue0521
Copy link
Collaborator Author

longyue0521 commented Sep 11, 2022

我研究了一下github的PR github提供将其转换成Draft,表示Work In Progress的功能. 看看我的留言你能否正常接到通知?如果不能我再将PR状态改回来.避免沟通不畅

@flycash
Copy link
Contributor

flycash commented Sep 11, 2022

我看到了!

@longyue0521
Copy link
Collaborator Author

我看到了!

好的

longyue0521 added a commit to longyue0521/ekit that referenced this pull request Sep 19, 2022
Signed-off-by: longyue0521 <longyueli0521@gmail.com>
flycash pushed a commit that referenced this pull request Sep 23, 2022
* 重构TaskPool实现 #80 中讨论的新需求

Signed-off-by: longyue0521 <longyueli0521@gmail.com>

* 删除测试中未使用参数并修改CHANGELOG

Signed-off-by: longyue0521 <longyueli0521@gmail.com>

* 将concurrency重命名为initGo,totalNumOfGo重命名为totalGo

* 去掉调度循环,创建协程的时机改为Submit及Start方法内,解决CPU空转问题

Signed-off-by: longyue0521 <longyueli0521@gmail.com>
@longyue0521 longyue0521 mentioned this pull request Sep 23, 2022
@longyue0521 longyue0521 mentioned this pull request Apr 11, 2023
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants