Outstanding dequeues not completed on tryOffer1 #246
Conversation
@@ -79,15 +78,15 @@ internal interface Subscribe<A, Selector> { | |||
|
|||
internal interface PubSub<I, O, Selector> : Publish<I>, Subscribe<O, Selector> { | |||
|
|||
data class Publisher<A>(val token: Token, val i: A, val signal: Promise<Unit>) { | |||
suspend fun complete(): Unit { | |||
ForkConnected { signal.complete(Unit) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not necessary to launch completion in a Fiber
since the join
operations already intercept
before returning, meaning they always get properly re-scheduled before returning from join
.
Pair(ps2, result) | ||
} | ||
val (ps2, action) = loop(ps1) { Unit } | ||
Pair(ps2, { action.invoke(); result }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the bug fix, in case an outstanding offer was found. Its completion needs to be invoked.
@@ -659,13 +658,13 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy< | |||
*/ | |||
private tailrec fun loop( | |||
ps: PubSub.PubSubState<I, O, QS, S>, | |||
action: suspend () -> Unit | |||
): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actions running in the loop
no longer need to be suspending since we rely on UnsafePromise
for signaling.
Instead the functionality of suspend
is exposed through the modify
operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great!! thank you @nomisRev !! 🙌
The
update
function inPubSub
was not correctly invoking the completion for outstanding dequeues.To fix this I replaced the usages of
Promise
toUnsafePromise
so we can complete the promises in a non-suspending manner. This allows for bothmodify
andupdate
which can modify the state withsuspend
, andnon-suspend
support.