Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

基于SingleFlight思想实现基于时间的线程安全优先级队列 #114

Closed
wants to merge 14 commits into from

Conversation

longyue0521
Copy link
Collaborator

@longyue0521 longyue0521 commented Oct 28, 2022

解决 #106

// g1  - \
// g2  - - Enqueue() -- N --> channel(多个通道) -- 1 --> enqueueProxy 协程 --
// g3  - /                                                                   \
//                                                      两个代理协程之间通过互斥锁、通道协作,详见下方说明3
// g4  - \                                                                   /
// g5  - - Dequeue() -- N --> channel(多个通道)-- 1 --> dequeueProxy 协程 --
// g6  - /
//
// 说明:
//    1. g1、g2、g3并发调用Enqueue方法,Enqueue方法内部启动一个代理协程 enqueueProxy,
//       - enqueueProxy 职责
//         - 从调用者协程们接收数据
//         - 与下方 dequeueProxy 协程并发访问底层无锁优先级队列,将收到的数据入队
//         - 将入队结果返回给调用者协程们
//         - 如果有高优先级元素入队,导致队头变更,向 d.wakeupSignalForDequeueProxy 发信号通知下方 dequeueProxy 协程
//         - 监听退出信号,来自最后一个调用者协程发送的退出信号
//       - g1、g2、g3 通过channel与 enqueueProxy 协程通信
//         - 调用协程们通过 d.newElementsChan 向 enqueueProxy 发送数据
//         - 调用协程们通过 d.enqueueErrorChan 从 enqueueProxy 接收错误信息
//         - 第一/最后一个调用协程通过 d.quitSignalForEnqueueProxy 和 d.continueSignalFromEnqueueProxy 
//           启动/关闭 enqueueProxy 协程,在非并发场景下,g1既是第一个又是最后一个需要负责启动和关闭 enqueueProxy
//           并发下,g1负责启动 enqueueProxy,g3 负责关闭 enqueueProxy
//
//    2. g3、g4、g5并发调用Dequeue方法,Dequeue方法内部启动一个代理协程 dequeueProxy,
//       - dequeueProxy 职责
//         - 获取队头,等待其过期;
// 		- 与上方 enqueueProxy 协程并发访问底层无锁优先级队列,将过期队头出队
//         - 将出队结果返回给调用者协程们
//         - 监听唤醒信号,来自 enqueueProxy 协程,重新检查队头
//         - 监听退出信号,来自最后一个调用者协程发送的退出信号
//       - g3、g4、g5 通过channel与 DequeueProxy 协程通信
//         - 调用者协程们从 d.expiredElements 获取过期元素
//         - dequeueProxy 协程通过 d.wakeupSignalForDequeueProxy 获取通知以重新检查队头
//         - 第一/最后一个调用协程通过 d.quitSignalForDequeueProxy 和 d.continueSignalFromDequeueProxy
//           启动/关闭 dequeueProxy 协程
//         - Dequeue的逻辑语义是"拿到队头,等待队头超时或自己超时返回; 拿不到队头,阻塞等待直到ctx过期"
//           故Dequeue不返回延迟队列为空的错误,而是让调用者阻塞等待ctx超时;
//           如果调用者未传递具有超时的ctx,导致永久阻塞是他自己的问题
//
//    3. enqueueProxy 与 dequeueProxy 协程之间通过互斥锁、通道来协作
//      - 用 d.mutex 并发操作底层优先级队列 d.q
//      - 用 d.wakeupSignalForDequeueProxy && d.wakeupSignalForEnqueueProxy 在队列状态变化时相互唤醒
//      - 无锁队列 d.q 上只有 enqueueProxy 与 dequeueProxy 两个协程并发访问

  1. 优点
    • 借鉴SingleFlight思想,用enqueueProxydequeueProxy两个协程将原内部互斥锁上的并发量从M:N降为1:1,减少不必要的锁竞争.
    • 借助select + channel,将Enqueue及Dequeue上大量并发调用者协程的阻塞与唤醒工作委托给Go运行时处理
      • 大量Enqueue高优先级元素导致队头频繁改变时,只需要enqueueProxy通知dequeueProxy协程重新获取队头即可,不要频繁唤醒所有阻塞在Dequeue上的并发调用者协程
    • 借助select + ctx.Done,使Enqueue和Dequeue中ctx超时控制更加精准
      • 不会出现在互斥锁上排队过程中超时无法中断,也不用陷入两难:好不容易拿到锁,不入队/出队吧白排队了,入队/出队操作吧又严重超时. 用户体验差——设置超时你不准,不设置超时你永久阻塞.
  2. 不足
    • 入队元素要实现的接口设计及命名问题,当前实现要求用户实现Deadline方法返回一个time.Time的截止日期——time.Now().Add(expire)
    • 时间间隔较大的多个Enqueue/Dequeue调用被视为串行/单次调用,这时enqueueProxydequeueProxy协程无法被复用,当这样调用较多时会有创建/销毁代理协程的开销。
    • 可以考虑添加两个方法来显示创建/销毁enqueueProxydequeueProxy协程
       // 启动/销毁代理协程
       delayQueue.Star()
       defer delayQueue.Stop()
       
       delayQueue.Enqueue(ctx, xx)
      // 任意时间间隔
       delayQueue.Dequeue

@codecov
Copy link

codecov bot commented Oct 28, 2022

Codecov Report

Merging #114 (5c47a0b) into dev (27357ee) will increase coverage by 0.38%.
The diff coverage is 97.88%.

@@            Coverage Diff             @@
##              dev     #114      +/-   ##
==========================================
+ Coverage   95.42%   95.81%   +0.38%     
==========================================
  Files          28       29       +1     
  Lines        1269     1505     +236     
==========================================
+ Hits         1211     1442     +231     
- Misses         43       48       +5     
  Partials       15       15              
Impacted Files Coverage Δ
queue/delay_queue.go 97.88% <97.88%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@longyue0521 longyue0521 requested review from flycash and flyhigher139 and removed request for flyhigher139 October 28, 2022 15:49
queue/delay_queue.go Outdated Show resolved Hide resolved
Copy link
Contributor

@flycash flycash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我大概理解这个思路,我试试能不能简化一下这个代码。我觉得可以启用显式地 Start 和 Close 的方法,或者我们在创建的时候直接 Start,在关闭的时候要求用户主动关闭。
这样子的话在 Enqueue 和 Dequeue 里面的代码就简单多了。

我觉得从思路上来说应该要简单很淡。你可以看一下我的实现,我的实现接近 DB pool 的那种搞法,就是用了 req,然后 req 里面维持了一个 channel 来接收唤醒信号。

缺陷就是性能会很差,主要是 make channel 的问题。好处就是不需要 Close 也不需要 Start。
这两个实现暂时先留着,我后面做做性能分析,然后确定最终的方案。

主要是我觉得这个思路没问题,但是代码不知道能不能继续简化。我们可以通过要求用户主动 Start 或者 Close 来丢弃一部分状态变迁的代码,应该会更加清晰一些

@flycash
Copy link
Contributor

flycash commented Oct 31, 2022

@flyhigher139 你可以看看我们两种实现,看看的你的感受,或者也顺便做个benchmark 测试。

@flyhigher139
Copy link
Collaborator

@flyhigher139 你可以看看我们两种实现,看看的你的感受,或者也顺便做个benchmark 测试。

@flycash 老师的那个实现方案,我大概看了1遍半就差不多了,主要是我会用类似的思路考虑如何实现,逻辑很容易就补齐了
@longyue0521 的代码,配合注释,大概看了一下,能基本理解实现思路,但仔细看细节,理顺每一步逻辑,开始要看两三遍,很大程度上是花在熟悉代码和脑补业务逻辑上了

另外,我个人之前对CSP的并发模型理解比较浅,最近打算强化一下,暂时对这两个方案没有倾向性,如果不是这个情况,我估计大概率我会倾向于老师那个方案

@longyue0521
Copy link
Collaborator Author

longyue0521 commented Nov 1, 2022

@flycash

  1. API 设计问题
  • 现在的Delayable接口中有一个Deadline() time.Time方法,接口的命名、方法的命名及返回值是否要修改?
  • New方法现在接收capacity和comparator两个参数,而comparator的实现是依赖Delayable接口的方法,加上延迟队列的逻辑语义(截止日期近的,小的先出队),我准备把New方法改为只接收一个capacity参数,在New方法内部提供comparator的默认实现——截止日期小的、近的元素先出队,不再让用户传入.
  1. 代理协程生命周期管理问题,如果启用显示的Start和Stop方式管理的话,大概需要如下重构.
    以Enqueue为例(Dequeue是类似的), 它承担着开启/关闭代理协程enqueueProxy职责.代码如下:
func (d *DelayQueue[T]) Enqueue(ctx context.Context, t T) error {
         // ....
	d.startEnqueueProxy()
	defer d.closeEnqueueProxy()
        // ....
}
func (d *DelayQueue[T]) startEnqueueProxy() {
	d.enqueueMutex.Lock()
	defer d.enqueueMutex.Unlock()

	d.enqueueCallers++
         // 这里不但承担着“开启代理协程”的职责
         // 还承担着检测代理协程是否退出(因panic而退出),如果是就重启的职责.
         // 使用显示的Start方法只能移除“开启代理协程“的职责
         // 移除”检测/重启代理协程“的职责,就需要确保代理协程绝不会因panic或其他异常退出.
	if atomic.LoadInt64(&d.numOfEnqueueProxyGo) == 0 {
		go d.enqueueProxy()
		<-d.continueSignalFromEnqueueProxy
	}
}
func (d *DelayQueue[T]) closeEnqueueProxy() {
	d.enqueueMutex.Lock()
	defer d.enqueueMutex.Unlock()

	d.enqueueCallers--
        // “关闭代理协程”的逻辑比较容易移到Stop/Close方法中,
        // 可以用close(closeSignalChan) + wg.Wait()的方式,或者 closeCtxCancelFunc() + wg.Wait()来实现 
	if d.enqueueCallers == 0 && atomic.LoadInt64(&d.numOfEnqueueProxyGo) == 1 {
		d.quitSignalForEnqueueProxy <- struct{}{}
		<-d.continueSignalFromEnqueueProxy
	}
}

enqueueProxy 协程中也包含与Enqueue同步、处理enqueueProxy内部panic等非核心逻辑代码.

func (d *DelayQueue[T]) enqueueProxy() {

	defer func() {
	        // 如果有panic的话,吞掉panic
		_ = recover()
		// 重置状态——enqueueProxy已退出
		atomic.CompareAndSwapInt64(&d.numOfEnqueueProxyGo, 1, 0)
                // 与Enqueue同步,退出
		d.continueSignalFromEnqueueProxy <- struct{}{}
                // 上面两行代码可以使用
                // d.stopWaitGroup.Done()来替代
	}()

	// 设置状态——enqueueProxy已启动
	atomic.CompareAndSwapInt64(&d.numOfEnqueueProxyGo, 0, 1)
        //  与Enqueue同步,开始执行
	d.continueSignalFromEnqueueProxy <- struct{}{}
       // 上面两行代码可以使用
       //  d.startWaitGroup.Done()来替代.
       // .....
}

那么Start和Stop方法可以这样实现

func (d *DelayQueue[T]) Start() {
	d.stopWaitGroup.Add(2)
	d.startWaitGroup.Add(2)
	go d.enqueueProxy()
	go d.dequeueProxy()
	d.startWaitGroup.Wait()
}

func (d *DelayQueue[T]) Stop() {
         d.stopCtxCancelFunc() // close(d.stopSignalChan)
	d.stopWaitGroup.Wait()
}

最后,代理协程异常退出用户是无感知的,异常退出会导致Enqueue/Dequeue上的并发协程要么ctx超时退出,要么永久阻塞(如果没设置超时).
解决方法,要么通过review确保enqueueProxy/dequeueProxy没有代码会panic,要么使用上面的“检测/重启代理协程”的方式.

@longyue0521 longyue0521 mentioned this pull request Nov 1, 2022
Copy link
Contributor

@flycash flycash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

大体上,这种思路确实可以做出来,但是还是很难让人理解清楚。这个调度比之前我们的 TaskPool 的实现还要复杂一点,原因在于里面使用了很多个 channel 和 sync 工具,所以对于用户来说,包括对于我来说,只能做到大概理解这个算法。

queue/delay_queue.go Show resolved Hide resolved
Comment on lines 100 to 106
d.stoppedProxiesWaitGroup.Add(proxies)
d.startedProxiesWaitGroup.Add(proxies)
go d.enqueueProxy()
go d.dequeueProxy()
d.startedProxiesWaitGroup.Wait()
atomic.AddInt64(&d.numOfEnqueueProxyGo, 1)
atomic.AddInt64(&d.numOfDequeueProxyGo, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我有点困惑,就是如果我们采用这种方案的话,我的理解是只需要一个 queue 维持一个 goroutine 来处理入队,一个 goroutine 来处理出队。那么并不需要这个 waitGroup,也不需要维持住这些计数。

Comment on lines 61 to 63
if capacity <= 0 {
return nil, fmt.Errorf("%w: capacity必须大于0", errInvalidArgument)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个限制我觉得不太好。虽然我之前吐槽过用户应该永远使用有界队列,但是在优先级队列本身允许无界的情况下,那么这个也应该考虑允许它是无界队列。这就限制住了你并不能使用 expiredElement 这个 channel。
之前我大概沿着你的思路去,我是尝试额外引入了一个信号量,用于接收入队的信号的。后来发现还是需要引入一些中间结构,所以我最终觉得这种思路复杂度也太高。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expireElement之前承担着“缓存过期数据”及“与调用者协程们通信”两项职责.现在前一个职责由d.qCache承担,它现在只承担“与调用者协程们通信”的职责,故将其重命名为dequeueElemsChan并且取消了缓冲区.

Signed-off-by: longyue0521 <longyueli0521@gmail.com>
queue/delay_queue.go Show resolved Hide resolved
queue/delay_queue.go Show resolved Hide resolved
Copy link
Contributor

@flycash flycash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • 我觉得对 proxy 的复杂生命周期控制是不需要的。只需要有一个 Close 的方法,收到这个就信号就直接退出。再之后也不用管用户 Close 了是不是还在用,都不需要;
  • 我认为 expireElement 这个东西太影响性能了,尤其是它必须保持和 capacity 一样的大小

我现在的想法是,等待 sync.Cond 支持 Wait 调用,在这之前不支持 ctx 超时控制的阻塞队列。

Signed-off-by: longyue0521 <longyueli0521@gmail.com>
@longyue0521
Copy link
Collaborator Author

longyue0521 commented Nov 2, 2022

@flycash @flyhigher139

  1. 实现了无界队列,取消了capacity不能等于零的限制.
  2. expireElement之前承担着“缓存过期数据”及“与调用者协程们通信”两项职责.现在前一个职责由d.qCache承担,它现在只承担“与调用者协程们通信”的职责,故将其重命名为dequeueElemsChan并且取消了缓冲区.
  3. waitGroup在测试中很有用,用于提供可靠的时间点插入断言,判断延队列的运行情况.

如果图小看不清,可以右击在新标签页中打开图片链接.

delay_queue_data_flow

程序实体介绍

  1. 最上方逻辑并发优先级队列(以下简称,逻辑队列)是一个抽象的程序实体(代码中不存在),它由d.qd.qCached.mutex这三个具体实体构成.
    • d.q 是非并发安全的优先级队列,用于存储未过期的用户数据
    • d.qCache是非并发安全的优先级队列,用于存储已过期的用户数据
    • d.mutex 并发控制对二者的访问,从而形成逻辑队列抽象实体.
  2. enqueueProxy协程是逻辑队列Enqueue端的代理协程,即如果想要向逻辑队列Enqueue数据,必须通过enqueueProxy协程,该协程暴露了两个“接口”用于与其通信, 其中:
    • enqueueElemsChan通道用于客户端协程向enqueueProxy协程传递要插入逻辑队列的数据
    • enqueueErrorChan通道用于接收插入逻辑队列的结果.
    • enqueueProxy协程只访问逻辑队列d.q部分
  3. dequeueProxy协程是逻辑队列Dequeue端的代理协程,即如果想要从逻辑队列Dequeue数据,必须通过dequeueProxy协程,该协程同样暴露了两个“接口”,
    • dequeueElemsChan通道用于获取数据
    • dequeueErrorChan通道用于获取错误
    • dequeueProxy协程只访问逻辑队列d.qCache部分
  4. workerProxy协程,是逻辑队列的内部工作协程,负责将逻辑队列内部d.q中的未过期数据,等待其过期后,再搬运到逻辑队列内部d.qCache中,在搬运结束后,选择恰当时机向dequeueProxy协程发送唤醒信号.

Enqueue流程

  1. 并发调用G1、G2、G3, 通过“客户端”(Enqueue方法)中的enqueueElemsChan通道,向enqueueProxy协程发送数据,期望数据进入逻辑队列
  2. enqueueProxy协程收到数据后,操纵逻辑队列内部d.q将数据入队,并通过enqueueErrorChan通道将返回结果返回给“客户端”协程.
  3. 如果新入队的元素,改变队首,enqueueProxy协程通过wakeupWorkerProxy通道向workerProxy协程发送通知信号.

Dequeue流程

  1. 并发调用G4、G5、G6(图上画错了), 通过“客户端”(Dequeue方法)中的dequeueElemsChan通道等待来自逻辑队列中的过期数据
  2. dequeueProxy协程自启动后,就不断尝试从逻辑队列d.qCache中获取过期数据,如果能够获取到就通过dequeueElemsChan通道发给“客户端”协程,如果获取不到,阻塞自己直到workerProxy写成成功搬运数据后唤醒自己.

@flycash
Copy link
Contributor

flycash commented Nov 4, 2022

嗯,我确实理解了你的设计和调度,所以我依旧认为过于复杂。相比之下,我在 #115 中的实现就要清晰很多。从性能上来说,#115 核心在于会频繁创建一个 singnal 的channel,但是相比可读性带来的提升,我觉得是可以接受的。所以这个我关闭了,除非说我 #115 中的实现有无法解决的并发问题。

感谢

@flycash flycash closed this Nov 4, 2022
@longyue0521
Copy link
Collaborator Author

好的

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants