Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Outstanding dequeues not completed on tryOffer1 #246

Merged
merged 1 commit into from Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -5,10 +5,9 @@ import arrow.core.None
import arrow.core.Option
import arrow.core.Some
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.ForkConnected
import arrow.fx.coroutines.IQueue
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.Token
import arrow.fx.coroutines.UnsafePromise
import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.stream.Chunk
import arrow.fx.coroutines.stream.Stream
Expand Down Expand Up @@ -79,15 +78,15 @@ internal interface Subscribe<A, Selector> {

internal interface PubSub<I, O, Selector> : Publish<I>, Subscribe<O, Selector> {

data class Publisher<A>(val token: Token, val i: A, val signal: Promise<Unit>) {
suspend fun complete(): Unit {
ForkConnected { signal.complete(Unit) }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary to launch completion in a Fiber since the join operations already intercept before returning, meaning they always get properly re-scheduled before returning from join.

data class Publisher<A>(val token: Token, val i: A, val signal: UnsafePromise<Unit>) {
fun complete(): Unit {
signal.complete(Result.success(Unit))
}
}

data class Subscriber<A, Selector>(val token: Token, val selector: Selector, val signal: Promise<A>) {
suspend fun complete(a: A): Unit {
ForkConnected { signal.complete(a) }
data class Subscriber<A, Selector>(val token: Token, val selector: Selector, val signal: UnsafePromise<A>) {
fun complete(a: A): Unit {
signal.complete(Result.success(a))
}
}

Expand Down Expand Up @@ -566,9 +565,9 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
fun <X> update(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, X>): X =
state.modify { ps ->
val (ps1, result) = f(ps)
val (ps2, _) = loop(ps1) { Unit }
Pair(ps2, result)
}
val (ps2, action) = loop(ps1) { Unit }
Pair(ps2, { action.invoke(); result })
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the bug fix, in case an outstanding offer was found. Its completion needs to be invoked.

}.invoke()

suspend fun <X> modify(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, suspend () -> X>): X =
state.modify { ps ->
Expand Down Expand Up @@ -599,15 +598,15 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
queue: QS,
remains: IQueue<PubSub.Subscriber<O, S>>,
keep: IQueue<PubSub.Subscriber<O, S>>,
acc: (suspend () -> Unit)?
): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)?> =
acc: (() -> Unit)?
): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)?> =
when (val sub = remains.firstOrNull()) {
null -> Pair(ps.copy(queue = queue, subscribers = keep), acc)
else -> {
val (queue, chunk) = strategy.get(sub.selector, queue)
when (chunk) {
is Some -> {
val action = suspend { acc?.invoke(); sub.complete(chunk.t) }
val action = { acc?.invoke(); sub.complete(chunk.t) }
if (!strategy.empty(queue)) consumeSubscribersLoop(ps, queue, remains.tail(), keep, action)
else Pair(ps.copy(queue = queue, subscribers = keep.enqueue(remains.tail())), action)
}
Expand All @@ -621,22 +620,22 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
// yields to Some(nextPS, completeAction) whenever at least one subscriber completes
// before this finishes this always tries to consume all subscribers in order they have been
// registered unless strategy signals it is empty.
private fun consumeSubscribers(ps: PubSub.PubSubState<I, O, QS, S>): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)?> =
private fun consumeSubscribers(ps: PubSub.PubSubState<I, O, QS, S>): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)?> =
consumeSubscribersLoop(ps, ps.queue, ps.subscribers, IQueue.empty(), null)

private tailrec fun publishPublishersLoop(
ps: PubSub.PubSubState<I, O, QS, S>,
queue: QS,
remains: IQueue<PubSub.Publisher<I>>,
keep: IQueue<PubSub.Publisher<I>>,
acc: (suspend () -> Unit)?
): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)?> =
acc: (() -> Unit)?
): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)?> =
when (val first = remains.firstOrNull()) {
null -> Pair(ps.copy(queue = queue, publishers = keep), acc)
else -> {
if (strategy.accepts(first.i, queue)) {
val queue1 = strategy.publish(first.i, queue)
val action = suspend { acc?.invoke(); first.complete() }
val action = { acc?.invoke(); first.complete() }
publishPublishersLoop(ps, queue1, remains.tail(), keep, action)
} else {
publishPublishersLoop(ps, queue, remains.tail(), keep.enqueue(first), acc)
Expand All @@ -648,7 +647,7 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
// yields to None if no single publisher published
// yields to Some(nextPS, publishSignal) if at least one publisher was publishing to the queue
// always tries to publish all publishers reminaing in single cycle, even when one publisher succeeded
private fun publishPublishers(ps: PubSub.PubSubState<I, O, QS, S>): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)?> =
private fun publishPublishers(ps: PubSub.PubSubState<I, O, QS, S>): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)?> =
publishPublishersLoop(ps, ps.queue, ps.publishers, IQueue.empty(), null)

/*
Expand All @@ -659,13 +658,13 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
*/
private tailrec fun loop(
ps: PubSub.PubSubState<I, O, QS, S>,
action: suspend () -> Unit
): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actions running in the loop no longer need to be suspending since we rely on UnsafePromise for signaling.

Instead the functionality of suspend is exposed through the modify operator.

action: () -> Unit
): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)> {
val (ps1, resultPublish) = publishPublishers(ps)
val (ps2, resultConsume) = consumeSubscribers(ps1)
return if (resultConsume == null && resultPublish == null) Pair(ps2, action)
else {
val nextAction = suspend {
val nextAction = {
resultConsume?.invoke()
action.invoke()
resultPublish?.invoke()
Expand Down Expand Up @@ -693,10 +692,10 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
val ps1 = publish_(a, ps)
Pair(ps1, suspend { Unit })
} else {
val publisher = PubSub.Publisher(Token(), a, Promise.unsafe())
val publisher = PubSub.Publisher(Token(), a, UnsafePromise())

val awaitCancellable = suspend {
guaranteeCase({ publisher.signal.get() }) { ex ->
guaranteeCase({ publisher.signal.join() }) { ex ->
clearPublisherOnErrorOrCancel(publisher.token, ex)
}
}
Expand All @@ -720,9 +719,9 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
when (option) {
None -> {
val token = Token()
val sub = PubSub.Subscriber(token, selector, Promise.unsafe<O>())
val sub = PubSub.Subscriber(token, selector, UnsafePromise<O>())
val cancellableGet = suspend {
guaranteeCase({ sub.signal.get() }) { ex ->
guaranteeCase({ sub.signal.join() }) { ex ->
clearSubscriberOnErrorOrCancel(token, ex)
}
}
Expand All @@ -738,8 +737,8 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
when (option) {
is Some -> Pair(ps, suspend { option.t })
None -> {
val sub = PubSub.Subscriber(token, selector, Promise.unsafe<O>())
Pair(ps.copy(subscribers = ps.subscribers.enqueue(sub)), suspend { sub.signal.get() })
val sub = PubSub.Subscriber(token, selector, UnsafePromise<O>())
Pair(ps.copy(subscribers = ps.subscribers.enqueue(sub)), suspend { sub.signal.join() })
}
}
}
Expand Down
@@ -1,13 +1,17 @@
package arrow.fx.coroutines.stream.concurrent

import arrow.core.Option
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.StreamSpec
import arrow.fx.coroutines.parTupledN
import arrow.fx.coroutines.milliseconds
import arrow.fx.coroutines.stream.Stream
import arrow.fx.coroutines.stream.append
import arrow.fx.coroutines.stream.compile
import arrow.fx.coroutines.stream.noneTerminate
import arrow.fx.coroutines.stream.parJoinUnbounded
import arrow.fx.coroutines.stream.terminateOnNone
import arrow.fx.coroutines.timeOutOrNull
import io.kotest.assertions.assertSoftly
import io.kotest.matchers.ints.shouldBeLessThan
import io.kotest.matchers.shouldBe
Expand All @@ -17,22 +21,45 @@ import io.kotest.property.arbitrary.positiveInts

class QueueTest : StreamSpec(spec = {

"Queue with capacity can always take tryOffer1" {
checkAll(Arb.int()) { i ->
val q = Queue.unbounded<Int>()

q.tryOffer1(i) shouldBe true
q.dequeue().take(1).compile().toList() shouldBe listOf(i)
}
}

"Outstanding taker received tryOffer1 value" {
checkAll(Arb.int()) { i ->
val q = Queue.unbounded<Int>()
val start = Promise<Unit>()

val f = ForkAndForget {
start.complete(Unit)
q.dequeue1()
}

start.get()

q.tryOffer1(i) shouldBe true
f.join() shouldBe i
}
}

"unbounded producer/consumer" {
checkAll(Arb.stream(Arb.int())) { s ->
val expected = s.compile().toList()
val n = expected.size
val q = Queue.unbounded<Int>()

parTupledN({
q.dequeue()
.take(n)
.compile()
.toList()
}, {
s.through(q.enqueue())
.compile()
.drain()
}).first shouldBe expected
Stream(
q.dequeue(),
s.through(q.enqueue()).drain()
).parJoinUnbounded()
.take(n)
.compile()
.toList() shouldBe expected
}
}

Expand Down Expand Up @@ -113,4 +140,24 @@ class QueueTest : StreamSpec(spec = {
.toList() shouldBe expected
}
}

"dequeue releases subscriber on " - {
"interrupt" {
val q = Queue.unbounded<Int>()
q.dequeue().interruptAfter(100.milliseconds).compile().drain()
q.enqueue1(1)
q.enqueue1(2)
q.dequeue1() shouldBe 1
}

"cancel" {
val q = Queue.unbounded<Int>()
timeOutOrNull(100.milliseconds) {
q.dequeue1()
} shouldBe null
q.enqueue1(1)
q.enqueue1(2)
q.dequeue1() shouldBe 1
}
}
})