Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncx.Cond: Wait(time.Duration) 调用 #116

Closed
flycash opened this issue Nov 2, 2022 · 6 comments
Closed

syncx.Cond: Wait(time.Duration) 调用 #116

flycash opened this issue Nov 2, 2022 · 6 comments

Comments

@flycash
Copy link
Contributor

flycash commented Nov 2, 2022

仅限中文

使用场景

在实现并发阻塞队列的时候,我发现 sync.Cond 缺乏一个关键调用,即等待一段时间。如果等待的时候这个 goroutine 被唤醒,那么可以继续;又或者到期了,这个 goroutine 可以继续。

需求这种 API 是因为我们希望同时控制住阻塞和超时。

理论上来说,一种简单的做法是:

func Dequeue(ctx context.Context) {
    go func() {
         signal.Wait()
        // do something
        select {
        case  wake <- struct{}{}:
        default
        }
    }
    select {
        case <- wake:
        case <- ctx.Done()
    }
}

这种做法的缺陷非常明显,我们没有办法在 ctx.Done 返回的时候中断掉 goroutine 的执行。例如在我们的代码里面,do something 其实代表的就是出队。类似地,入队也有这种问题,而且入队没有办法真的中断,导致即便超时了,最终元素还是会放进去队列里面。

第二个缺点是这迫使我们开启一个 goroutine,性能有影响。在 goroutine 里面我们必须用 select + default 的形式,避免 goroutine 泄露,那么这种写法就会引起下面这个问题。

第三个缺点则是,我们其实希望的是在 Wait 的时候主动释放锁。这种写法迫使我们在进入 select 之前必须释放锁。那么就会造成释放锁之后还没进去 select,那么就有人唤醒了 goroutine,以至于我们直接丢掉了信号。
目前来看, sync.Cond 只提供了一个 Wait 调用,我们需要的是一个类似于 Wait(time.Duration) 的调用。在发起这个调用的时候,锁会被释放掉。

行业分析

最典型的就是 Java 的 Condition 设计,里面就提供了 Await 的方法,满足我们的要求:https://github.com/AdoptOpenJDK/openjdk-jdk12u/blob/master/src/java.base/share/classes/java/util/concurrent/locks/Condition.java#L270

可行方案

在 Go 里面有了类似的建议,但是目前还看不到什么时候才会实现:

实际上我需要 API 是:

// 调用这个 goroutine 会被阻塞,直到超时,或者别人唤醒
// 在阻塞期间,c 上的锁会被释放
func (c Cond) Await(time.Duration) {

}

其它

目前来看,依赖于 Go SDK 难以实现,所以可以考虑使用 cgo 引入一些 C 的API 来支持。当然,如果你能用 Go 原生 API 支持是最好的。

在缺乏这个关键调用的情况下,实现支持超时的并发阻塞队列会非常复杂,可以参考:

111 和 115 都可以通过重构修改为真的线程安全的,但是其复杂度就不会比 #114 更低。所以我并不希望引入如此复杂的实现。

你使用的是 ekit 哪个版本?

你设置的的 Go 环境?

上传 go env 的结果

@flycash flycash changed the title syncx.Cond syncx.Cond: Wait(time.Duration) 调用 Nov 2, 2022
@flyhigher139
Copy link
Collaborator

@flycash gist里找到一个实现:cond2.go

@flycash
Copy link
Contributor Author

flycash commented Nov 2, 2022

我学习学习,看起来思路很奇诡的样子。昨晚我搞了一晚上也没搞出来

@longyue0521
Copy link
Collaborator

我们可以这样,假设这个API已经实现了(用注释/伪码描述其逻辑语义)并根据这个API来实现一下并发阻塞队列。如果实现出的并发阻塞队列没有https://github.com/gotomicro/ekit/pull/111#issuecomment-1298110639 中提到的问题及ctx超时过度延迟的问题,那么我们就集中力量来实现这个API。如果实现出的队列仍有上面的问题,那么我们就应该考虑换一种解决思路。

本质上,实现并发阻塞队列需要考虑三部分:业务需求、并发安全及协程调度。在大多数时候后两者由sync.Mutex来承担了。我们只需要专注于业务需求,并发版的list和queue就是很好的例子,所以能写出简洁优雅的业务代码。但这也给了我们一个假象,面对所有的业务需求似乎都能如此。

因为并发阻塞队列的业务需求关系,我们希望Enqueue和Dequeue的调用者协程们能够具备超时退出,阻塞唤起,提交/获取数据的功能。其中,阻塞唤起是代码实现复杂度的根本来源,因为我们期望的是“在特定时间点阻塞/唤起指定个数的协程”,这时Go内置的工具就帮不上忙了,我们不得不将实现业务需求、并发安全及协程调度的代码揉在一起,这是功能性需求和非功能性需求决定的,而不是说本来需求很简单但实现者故意搞复杂了。

以Enqueue高优先级元素使队首改变为例,Enqueue协程需要唤起Dequeue上阻塞的调用者协程,因为超时退出的需求在,导致这个唤起具有特殊性——一次唤起一个Dequeue协程,直到有Dequeue协程成功检查到新队首;或者直到所有Dequeue协程超时退出。同样的,Dequeue协程有时也需要唤起Enqueue调用者协程。在Go中阻塞/耗时类的API最佳实践中,又要求有超时控制。因此,我们不得不自己来实现这种特定的协程调度机制。如果我们采用广播唤起所有协程的方案,确实省事,代码也优雅,同时满足前面的唤起需求,但这也导致了惊群效应,而惊群效应在生产环境中是不可接受的。

总结,这种特定的阻塞唤起方式,是我们的功能需求和非功能需求共同导致的,不是普遍性需求,Go语言不提供也正常。为了实现这种特定的阻塞唤起协程的协程调度机制,代码复杂点也是正常的。代码复杂度高不高要看参照物,如果和其他可行可用实现比,那么高就是高没什么可说的;如果和脑海中的预期比,那么很可能是我们把问题想简单了,而事实上要用Go实现这个需求的代码的固有复杂度可能就是这么高。

@flycash
Copy link
Contributor Author

flycash commented Nov 4, 2022

我在 #115 里面没有实现这个 API,但是也间接达到了效果了。不过缺点就是这种 channel 的 conn 是只能广播唤醒全部阻塞的。我觉得迟早是需要这个东西,我们可以考虑提前设计起来

@flycash
Copy link
Contributor Author

flycash commented Nov 4, 2022

另外就是,如果我们希望利用 ctx 来超时,那么 await 这一类的方法最好是返回 channel,接近 time.Timer 的API设计

@flycash flycash added the 调研 label Nov 7, 2022
@github-actions
Copy link

github-actions bot commented Jan 6, 2023

This issue is inactive for a long time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants