Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Commit

Permalink
Outstanding dequeues not completed on tryOffer1 (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev committed Aug 7, 2020
1 parent ce93e66 commit 882b650
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import arrow.core.None
import arrow.core.Option
import arrow.core.Some
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.ForkConnected
import arrow.fx.coroutines.IQueue
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.Token
import arrow.fx.coroutines.UnsafePromise
import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.stream.Chunk
import arrow.fx.coroutines.stream.Stream
Expand Down Expand Up @@ -79,15 +78,15 @@ internal interface Subscribe<A, Selector> {

internal interface PubSub<I, O, Selector> : Publish<I>, Subscribe<O, Selector> {

data class Publisher<A>(val token: Token, val i: A, val signal: Promise<Unit>) {
suspend fun complete(): Unit {
ForkConnected { signal.complete(Unit) }
data class Publisher<A>(val token: Token, val i: A, val signal: UnsafePromise<Unit>) {
fun complete(): Unit {
signal.complete(Result.success(Unit))
}
}

data class Subscriber<A, Selector>(val token: Token, val selector: Selector, val signal: Promise<A>) {
suspend fun complete(a: A): Unit {
ForkConnected { signal.complete(a) }
data class Subscriber<A, Selector>(val token: Token, val selector: Selector, val signal: UnsafePromise<A>) {
fun complete(a: A): Unit {
signal.complete(Result.success(a))
}
}

Expand Down Expand Up @@ -566,9 +565,9 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
fun <X> update(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, X>): X =
state.modify { ps ->
val (ps1, result) = f(ps)
val (ps2, _) = loop(ps1) { Unit }
Pair(ps2, result)
}
val (ps2, action) = loop(ps1) { Unit }
Pair(ps2, { action.invoke(); result })
}.invoke()

suspend fun <X> modify(f: (PubSub.PubSubState<I, O, QS, S>) -> Pair<PubSub.PubSubState<I, O, QS, S>, suspend () -> X>): X =
state.modify { ps ->
Expand Down Expand Up @@ -599,15 +598,15 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
queue: QS,
remains: IQueue<PubSub.Subscriber<O, S>>,
keep: IQueue<PubSub.Subscriber<O, S>>,
acc: (suspend () -> Unit)?
): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)?> =
acc: (() -> Unit)?
): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)?> =
when (val sub = remains.firstOrNull()) {
null -> Pair(ps.copy(queue = queue, subscribers = keep), acc)
else -> {
val (queue, chunk) = strategy.get(sub.selector, queue)
when (chunk) {
is Some -> {
val action = suspend { acc?.invoke(); sub.complete(chunk.t) }
val action = { acc?.invoke(); sub.complete(chunk.t) }
if (!strategy.empty(queue)) consumeSubscribersLoop(ps, queue, remains.tail(), keep, action)
else Pair(ps.copy(queue = queue, subscribers = keep.enqueue(remains.tail())), action)
}
Expand All @@ -621,22 +620,22 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
// yields to Some(nextPS, completeAction) whenever at least one subscriber completes
// before this finishes this always tries to consume all subscribers in order they have been
// registered unless strategy signals it is empty.
private fun consumeSubscribers(ps: PubSub.PubSubState<I, O, QS, S>): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)?> =
private fun consumeSubscribers(ps: PubSub.PubSubState<I, O, QS, S>): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)?> =
consumeSubscribersLoop(ps, ps.queue, ps.subscribers, IQueue.empty(), null)

private tailrec fun publishPublishersLoop(
ps: PubSub.PubSubState<I, O, QS, S>,
queue: QS,
remains: IQueue<PubSub.Publisher<I>>,
keep: IQueue<PubSub.Publisher<I>>,
acc: (suspend () -> Unit)?
): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)?> =
acc: (() -> Unit)?
): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)?> =
when (val first = remains.firstOrNull()) {
null -> Pair(ps.copy(queue = queue, publishers = keep), acc)
else -> {
if (strategy.accepts(first.i, queue)) {
val queue1 = strategy.publish(first.i, queue)
val action = suspend { acc?.invoke(); first.complete() }
val action = { acc?.invoke(); first.complete() }
publishPublishersLoop(ps, queue1, remains.tail(), keep, action)
} else {
publishPublishersLoop(ps, queue, remains.tail(), keep.enqueue(first), acc)
Expand All @@ -648,7 +647,7 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
// yields to None if no single publisher published
// yields to Some(nextPS, publishSignal) if at least one publisher was publishing to the queue
// always tries to publish all publishers reminaing in single cycle, even when one publisher succeeded
private fun publishPublishers(ps: PubSub.PubSubState<I, O, QS, S>): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)?> =
private fun publishPublishers(ps: PubSub.PubSubState<I, O, QS, S>): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)?> =
publishPublishersLoop(ps, ps.queue, ps.publishers, IQueue.empty(), null)

/*
Expand All @@ -659,13 +658,13 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
*/
private tailrec fun loop(
ps: PubSub.PubSubState<I, O, QS, S>,
action: suspend () -> Unit
): Pair<PubSub.PubSubState<I, O, QS, S>, (suspend () -> Unit)> {
action: () -> Unit
): Pair<PubSub.PubSubState<I, O, QS, S>, (() -> Unit)> {
val (ps1, resultPublish) = publishPublishers(ps)
val (ps2, resultConsume) = consumeSubscribers(ps1)
return if (resultConsume == null && resultPublish == null) Pair(ps2, action)
else {
val nextAction = suspend {
val nextAction = {
resultConsume?.invoke()
action.invoke()
resultPublish?.invoke()
Expand Down Expand Up @@ -693,10 +692,10 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
val ps1 = publish_(a, ps)
Pair(ps1, suspend { Unit })
} else {
val publisher = PubSub.Publisher(Token(), a, Promise.unsafe())
val publisher = PubSub.Publisher(Token(), a, UnsafePromise())

val awaitCancellable = suspend {
guaranteeCase({ publisher.signal.get() }) { ex ->
guaranteeCase({ publisher.signal.join() }) { ex ->
clearPublisherOnErrorOrCancel(publisher.token, ex)
}
}
Expand All @@ -720,9 +719,9 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
when (option) {
None -> {
val token = Token()
val sub = PubSub.Subscriber(token, selector, Promise.unsafe<O>())
val sub = PubSub.Subscriber(token, selector, UnsafePromise<O>())
val cancellableGet = suspend {
guaranteeCase({ sub.signal.get() }) { ex ->
guaranteeCase({ sub.signal.join() }) { ex ->
clearSubscriberOnErrorOrCancel(token, ex)
}
}
Expand All @@ -738,8 +737,8 @@ internal class DefaultPubSub<I, O, QS, S>(private val strategy: PubSub.Strategy<
when (option) {
is Some -> Pair(ps, suspend { option.t })
None -> {
val sub = PubSub.Subscriber(token, selector, Promise.unsafe<O>())
Pair(ps.copy(subscribers = ps.subscribers.enqueue(sub)), suspend { sub.signal.get() })
val sub = PubSub.Subscriber(token, selector, UnsafePromise<O>())
Pair(ps.copy(subscribers = ps.subscribers.enqueue(sub)), suspend { sub.signal.join() })
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package arrow.fx.coroutines.stream.concurrent

import arrow.core.Option
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.StreamSpec
import arrow.fx.coroutines.parTupledN
import arrow.fx.coroutines.milliseconds
import arrow.fx.coroutines.stream.Stream
import arrow.fx.coroutines.stream.append
import arrow.fx.coroutines.stream.compile
import arrow.fx.coroutines.stream.noneTerminate
import arrow.fx.coroutines.stream.parJoinUnbounded
import arrow.fx.coroutines.stream.terminateOnNone
import arrow.fx.coroutines.timeOutOrNull
import io.kotest.assertions.assertSoftly
import io.kotest.matchers.ints.shouldBeLessThan
import io.kotest.matchers.shouldBe
Expand All @@ -17,22 +21,45 @@ import io.kotest.property.arbitrary.positiveInts

class QueueTest : StreamSpec(spec = {

"Queue with capacity can always take tryOffer1" {
checkAll(Arb.int()) { i ->
val q = Queue.unbounded<Int>()

q.tryOffer1(i) shouldBe true
q.dequeue().take(1).compile().toList() shouldBe listOf(i)
}
}

"Outstanding taker received tryOffer1 value" {
checkAll(Arb.int()) { i ->
val q = Queue.unbounded<Int>()
val start = Promise<Unit>()

val f = ForkAndForget {
start.complete(Unit)
q.dequeue1()
}

start.get()

q.tryOffer1(i) shouldBe true
f.join() shouldBe i
}
}

"unbounded producer/consumer" {
checkAll(Arb.stream(Arb.int())) { s ->
val expected = s.compile().toList()
val n = expected.size
val q = Queue.unbounded<Int>()

parTupledN({
q.dequeue()
.take(n)
.compile()
.toList()
}, {
s.through(q.enqueue())
.compile()
.drain()
}).first shouldBe expected
Stream(
q.dequeue(),
s.through(q.enqueue()).drain()
).parJoinUnbounded()
.take(n)
.compile()
.toList() shouldBe expected
}
}

Expand Down Expand Up @@ -113,4 +140,24 @@ class QueueTest : StreamSpec(spec = {
.toList() shouldBe expected
}
}

"dequeue releases subscriber on " - {
"interrupt" {
val q = Queue.unbounded<Int>()
q.dequeue().interruptAfter(100.milliseconds).compile().drain()
q.enqueue1(1)
q.enqueue1(2)
q.dequeue1() shouldBe 1
}

"cancel" {
val q = Queue.unbounded<Int>()
timeOutOrNull(100.milliseconds) {
q.dequeue1()
} shouldBe null
q.enqueue1(1)
q.enqueue1(2)
q.dequeue1() shouldBe 1
}
}
})

0 comments on commit 882b650

Please sign in to comment.