Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: 延迟队列 #111

Closed
wants to merge 4 commits into from
Closed

WIP: 延迟队列 #111

wants to merge 4 commits into from

Conversation

flycash
Copy link
Contributor

@flycash flycash commented Oct 11, 2022

延迟队列的大概设计。
有两个设计,第一个设计在 queue_1.go 中,第二个设计在 queue_2.go 中。

两个设计的最大问题都是 sync.Condition 并没有提供一个 Wait(time) 的 API,即我希望有一个 Wait 的API,它接收一个时间参数,到点了就会自己醒来。

但是以为 sync.Condition 没有,所以我们必须要要利用 sleep 或者 ticker 来达成类似的问题。

两个实现的基本理念都是类似的:

  • 入队的时候,如果新的元素被放过去了队首,那么就唤醒等待的 goroutine
  • 出队的时候,如果队首的元素时间还没到,那么就 sleep。但是以为在 sleep 的期间可能有元素入队,所以实际上这个地方需要监听三个:
    • ctx 超时
    • sleep 到期
    • Signal 信号

注意的是,sleep 到期,或者收到 signal 信号,都需要考虑再次检测队首元素,确保元素的确已经过期了。

两者实现的差异在于:

  • 第一种实现很难解决 ctx 超时之后, 调用者之前开启的 gorroutine 被唤醒的问题,以及 Dequeue 里面的 channel 关闭问题
  • 第二种实现的问题在于开启了一个 goroutine,那么意味着我们可能需要引入 Close 方法,否则这个 goroutine 难以退出。

总结就是:一切的根源都在于 sync.Condition 并不友好。所以另外一个思路是能不能利用 cgo 之类的工具,为 sync.Condition 提供额外的 Wait(time) 的方法

@flycash
Copy link
Contributor Author

flycash commented Oct 11, 2022

有没有谁了解其它语言是如何实现延时队列的,可以在这里讨论

@longyue0521
Copy link
Collaborator

longyue0521 commented Oct 12, 2022

DelayQueue需求澄清

假定T的Dealy方法返回1s,请问这1s是指从T,Enqueue成功到Dequeue成功这整个过程1s,这1s包含了T在优先级队列中排队、以及Dequeue的阻塞过程.还是说只是在Dequeue T的时候阻塞1s,这个时间就没法预测了,假定T前面有10个元素每个都1s那调度到T的时候至少10s后了.

我认为应该是前者,从入队到出队这整个过程1s.. 从使用者的角度,我在t1时刻Enqueue,在t1+1s去Dequeue时应该能拿到T(即便有插队元素,多次调用也能快速拿到T),或者Enqueue后立即Dequeue能够阻塞住直到t1+1s返回T

longyue0521
longyue0521 previously approved these changes Oct 12, 2022
if err != nil {
return err
}
head, err := d.q.peek()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

入队成功后为什么还要peek()? 是想确定入队操作是否完成?t.CompareTo(head) == 0表示t与队头“优先级相等”,以优先级队列的逻辑语义来说,返回哪个都是可以.只有优先级更高的t入队后才会影响队头.

另外,这里要是我实现我会封装并转换一下,将Delay()的time.Duration转换为Deadline. 即 deadline := time.Now().Add(t.Delay()) 换句话说d.q中都是用deadline比较.

那么在Enqueue只干三件事,转换deadline,入队,发信号(优先级高于当前队头)

               // t.Delay() 转换为deadline = time.Now().Add(t.Delay())
               // 入队
                err := d.q.Enqueue(ctx, t)
		if err != nil {
			return err
		}
                // 比较队头与新入队的优先级
		head, err := d.q.peek()
		if err != nil {
			return err
		}
                // t的截止日期更近,即t的优先级更高则发送信号
                // 队列由空,变为1时也需要传递信号,
		if d.q.Len() == 1 || t.CompareTo(head) < 0 {
			headOfQueueChanged <- struct{}{}
		}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

通过判断插入的元素是不是队首可以避免无意义唤醒等待 Dequeue 的。不过确实可以去掉的 CompareTo 方法,只需要判断插入元素的 Delay 是不是比队首的 Delay 更小就可以。

这里应该不需要使用 deadline,因为我们过期 Delay 表达的都是基于当前时间点的相对量,可以少一个 time 的系统调用

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不用 deadline 也可以用newElemComing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里应该不需要使用 deadline,因为我们过期 Delay 表达的都是基于当前时间点的相对量,可以少一个 time 的系统调用

如果按Java中的设计思路, 那么Delay方法的实现是deadline.Sub(time.Now()) ,其中deadline = time.Now().Add(expire), , 而expire就是用户NewT时要传入的期望延时; 这种做法只是将系统调用从队列内部移动到Delay方法中,因为time.Now也是一个系统调用

我上面的做法,假设的前提是,Delay方法返回的是固定值,即上面的expire

queue/delay_queue_1.go Outdated Show resolved Hide resolved
@flycash
Copy link
Contributor Author

flycash commented Oct 12, 2022

DelayQueue需求澄清

假定T的Dealy方法返回1s,请问这1s是指从T,Enqueue成功到Dequeue成功这整个过程1s,这1s包含了T在优先级队列中排队、以及Dequeue的阻塞过程.还是说只是在Dequeue T的时候阻塞1s,这个时间就没法预测了,假定T前面有10个元素每个都1s那调度到T的时候至少10s后了.

我认为应该是前者,从入队到出队这整个过程1s.. 从使用者的角度,我在t1时刻Enqueue,在t1+1s去Dequeue时应该能拿到T(即便有插队元素,多次调用也能快速拿到T),或者Enqueue后立即Dequeue能够阻塞住直到t1+1s返回T

延时队列和调用者被阻塞多久没有关系,也和入队出队多久没有关系。我举个例子,我设计了一个本地缓存,那么我可以将每一个元素丢进去优先级队列里面,Delay 返回的就是还有多久我这个缓存的键值对会过期。

因此这样一来,如果我们的队列本身一直阻塞,以至于这个键值对其实已经过期很久了,但是因为一直没有入队成功,也不能出队,那么就会导致无法及时删除过期键值对的问题。

另外一方面来说,严格的时间控制基本上是不现实的。如果用户希望避免阻塞引起这种问题,那么可以考虑使用无界的延时队列。但是即便如此,依旧会有时间不精确的问题,比如说我过期,但是过了几毫秒之后,才把我从队头取出来。但是这种不精确我觉得可以忍受。

@longyue0521
Copy link
Collaborator

longyue0521 commented Oct 12, 2022

我设计了一个本地缓存,那么我可以将每一个元素丢进去优先级队列里面,Delay 返回的就是还有多久我这个缓存的键值对会过期。

这里的意思是说,对于元素T来说,在不同时刻多次调用T.Delay()返回的值是逐渐递减的?即1s->900ms -> 800ms ->0,“过期”的判定标准就是T.Delay() <= 0

这是Java中DelayQueue的做法

https://segmentfault.com/a/1190000022718540
https://www.baeldung.com/java-delay-queue

我觉得这种做法存在“逻辑重复”,每个要用DelayQueue的类型需要多声明两个方法且不同类型实现这两个方法的逻辑基本相同。

  • Delay方法,不断和time.Now比较动态地返回剩余的delay时间
  • CompareTo方法,用Delay方法或者那个过期时间来比较以得出优先级

函数是Go的一等公民,类比于正在实现的PriorityQueue,我觉得用NewDelayQueue[T Delayable](cap int, compare Comparator[T])更符合习惯,如果没有复杂的优先级比较策略,那么直接NewDelayQueue[T Delayable](cap int),内部用Dealy()来比较,因为延迟队列本身就限定了语义——延迟小的先出队。

如果我们的队列本身一直阻塞,以至于这个键值对其实已经过期很久了,但是因为一直没有入队成功,也不能出队,那么就会导致无法及时删除过期键值对的问题

队列阻塞导致键值不能入队,可以用无界延迟队列解决,或者试着将一个队列改为多个队列,即将大锁改为多个小锁

另外一方面来说,严格的时间控制基本上是不现实的。如果用户希望避免阻塞引起这种问题,那么可以考虑使用无界的延时队列。但是即便如此,依旧会有时间不精确的问题,比如说我过期,但是过了几毫秒之后,才把我从队头取出来。但是这种不精确我觉得可以忍受。

是的,我在 https://github.com/gotomicro/ekit/issues/106#issuecomment-1274129336 提到了时间误差问题

@flycash
Copy link
Contributor Author

flycash commented Oct 20, 2022

@longyue0521 Delay 和 Compare 的我再想想。先看看这个写法,大概帮我捋一下看看有没有并发问题。

我这几天也会分析一下,然后加上测试。

@flyhigher139 一起来捋一捋。

这里面有一个很关键的点,就是两个 channel 都至少有一个缓存,也就是如果对面没有人等,那么这边至少会放一个信号。后面有人等了,立刻就能拿到这个信号。这个是解决时序问题的一个关键点。

@flycash
Copy link
Contributor Author

flycash commented Oct 20, 2022

这里面有一个基本假设,就是用户是能够容许一定的时间误差的。这个误差在这里主要就是体现为 go runtime 唤醒 goroutine 并且调度执行,以及在 time.Ticker 计时的问题

@flyhigher139
Copy link
Collaborator

@longyue0521 Delay 和 Compare 的我再想想。先看看这个写法,大概帮我捋一下看看有没有并发问题。

我这几天也会分析一下,然后加上测试。

@flyhigher139 一起来捋一捋。

这里面有一个很关键的点,就是两个 channel 都至少有一个缓存,也就是如果对面没有人等,那么这边至少会放一个信号。后面有人等了,立刻就能拿到这个信号。这个是解决时序问题的一个关键点。

好的~ 我先把代码和前面讨论的内容,好好消化一下

@flycash
Copy link
Contributor Author

flycash commented Oct 21, 2022

我已经发现了 BUG,我觉得因为 Conn 没有 awaitWait 的调用,没有办法说睡一段时间之后醒过来,我只能考虑用类似于连接池的解决思路了

queue/delay_queue.go Show resolved Hide resolved
queue/delay_queue.go Outdated Show resolved Hide resolved
queue/delay_queue.go Outdated Show resolved Hide resolved
queue/delay_queue.go Show resolved Hide resolved
@codecov
Copy link

codecov bot commented Oct 31, 2022

Codecov Report

Merging #111 (2aa08d0) into dev (27357ee) will decrease coverage by 0.38%.
The diff coverage is 90.38%.

@@            Coverage Diff             @@
##              dev     #111      +/-   ##
==========================================
- Coverage   95.42%   95.04%   -0.39%     
==========================================
  Files          28       29       +1     
  Lines        1269     1373     +104     
==========================================
+ Hits         1211     1305      +94     
- Misses         43       50       +7     
- Partials       15       18       +3     
Impacted Files Coverage Δ
queue/delay_queue.go 90.38% <90.38%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

head, err := d.q.Peek()
if err != nil && err != queue.ErrEmptyQueue {
var t T
return t, err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return前需要解锁d.mutex.Unlock()

q: *queue.NewPriorityQueue[T](c, func(src T, dst T) int {
srcDelay := src.Delay()
dstDelay := dst.Delay()
if srcDelay > dstDelay {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delay()内部依赖time.Until(deadline)系统调用,底层优先级队列需要维持内部不变性,每次操作需要在堆中比较O(logN)次,而每次比较又需要两次系统调用(src和dst).
当N=10时,每次操作最多要有3 * 2= 6次系统调用.
当N=100时,每次操作最多要有6 * 2 = 12次系统调用.

每次入队/出队时,因为调整堆结构而引入的至少两次额外的系统调用,是否可接受?是否会有性能问题?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

实际上 Delay 内部究竟依赖不依赖 time.Until 调用,这个是用户自己决定的。比如说如果他们能够做到 Delay 实现不调用,那就不调用。
我突然想到,要是我们这个接口改成返回 Deadline 呢?
对于用户来说,只需要计算一次 Deadline,我们优先级队列把 Deadline 比较大的放在后面

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得可以,其实我那个实现就是这么干的,Delayable接口名字我没改,将方法Delay改为了Deadline了。返回time.Time,这个问题就解决了。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=.= 我试了一下,用 deadline 的话,那么就是我们自己调用 time.Now,始终都是需要有人调用。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.Now是一定要调用的,因为需要计算delay。但是在向底层堆插入/删除数据的时候即d.q.Enqueue/d.q.Dequeue就没有系统调用了(comparator内调用Deadline方法比较)。

case <-ticker.C:
var t T
d.mutex.Lock()
t, err = d.q.Dequeue()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 不能直接d.q.Dequeue().
    假设queue= {10, 100}, G1和G2并发,Peek到10, G1等待前调用ticker.Reset(10)和G2等待前调用ticker.Reset(8)其中各自的delay是不同的但来自同一个head(10). 当G1先被唤醒,直接将10出队后.G2被唤醒后,原head(10)被G1拿走,而直接将新head(100)出队,此时head(100)的Delay()可能不为0. 所以出队前一定要判定head.Delay()<= 0
  2. 队头插队敏感性及过度延迟问题
    queue = {100}, G1和G2并发等待出队,此时G3 Enqueue(10),queue={10, 100}, 当G1/G2返回元素10时已经远远超出预计的截止日期, 因为G1和G2是按照元素100的delay在等待,G3 Enqueue(10)后并没有唤醒G1/G2重新等待新队头.

这里即便加入判断 head.Delay()<= 0再出队,仍会有插入高优先级元素后过度延迟出队的问题.

// 再拿出长时间的
ele, err = q.Dequeue(ctx)
require.NoError(t, err)
require.Equal(t, 234, ele.val)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

出队元素不但要验证(ID),还要验证是否过期. 加上过期验证的require后234这个case会报错.
如果没有报错,将123的deadline继续调小,增大两者之间的差值.

@flycash
Copy link
Contributor Author

flycash commented Nov 1, 2022

这种实现方式不太稳定,而且引入了很多中间结构。我再考虑考虑

@longyue0521
Copy link
Collaborator

这种实现方式不太稳定,而且引入了很多中间结构。我再考虑考虑

我曾花大量时间分析和尝试,分享一下我的结论:在Dequeue调用者协程内部等待对头元素过期这种实现方式是不可行的。

这种实现有如下问题:

  1. 频繁唤醒/阻塞Dequeue调用者协程,大部分做的都是无用功,空耗CPU资源。
    举例:queue={10, 100}, N个Dequeue调用者协程,等待元素10,N个ticker同时过期,却只有一个能拿到10,剩下的N-1个协程再去抢元素100,接着再N-1个ticker同时过期,N-1个协程被唤醒,只有1个拿到100,剩下N-2阻塞/ctx超时/永久阻塞。
    中间这些2×N-3次的唤醒是可以省掉的。
  2. 大量的锁竞争,效率低下。
    M个Enqueue和N个Dequeue竞争同一把锁,其中N个Dequeue中有N-1个都是无意义的竞争者,却导致Enqueue效率下降。
  3. 队头插队敏感性及元素过度延迟出队的问题
    queue = {100, 1000}, 当Enqueue(10)的时候,N个阻塞等待100出队的Dequeue调用者协程,该唤醒谁?唤醒几个?
    只唤醒一个?在等待10过期期间,恰巧它的ctx超时了呢?是不是又会导致元素10过度延迟出队。全部唤醒?又是上面1的问题。

针对这三个问题,我采用了如下应对方法:

  1. Enqueue和Dequeue使用代理协程来代替调用者协程去插入数据和等待队头过期,有新队头入队时只需唤醒代理协程,即便用户故意刁难,采用元素deadline倒序插入,协程的唤醒次数也是可以接受的。
  2. 将一把锁拆成三把锁,Enqueue调用者协程与enqueueProxy协程竞争一把锁(Channel),Dequeue调用者协程与dequeueProxy协程竞争一把锁(Channel),两个代理协程竞争一把锁来操作底层队列,其入队/出队时的读写竞争从M:N 降为1:1。
  3. Enqueue后发现队头更新,只需通知dequeueProxy代理协程,让其重新等待队头。不会出现“队头插队敏感性及元素过度延迟出队的问题”

以上解决方法行得通的重要前提:

  1. Dequeue代理协程必须是单例的,Enqueue代理协程最好也是
  2. 两个代理协程的生命周期要正确管理
  3. 代理协程中途异常退出后要能被检测到并重启。

因此我在 #114 的实现中使用了不好理解的代码来保证这些前提。关于代理协程的单例问题,生命周期管理问题,异常退出后检测重启的问题,详见 https://github.com/gotomicro/ekit/pull/114#issuecomment-1297918364

@flycash
Copy link
Contributor Author

flycash commented Nov 1, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants