Permalink
Browse files

Bugfix/queue get (#1308)

* Introduces Resource on PubSub#get

* Added getStream to PubSub to allow for correct unSubscription for interrupted streams.

* Added cancel Spec for Queue.dequeue1

* Added mima exclusions for pubsub

* corrected mima
  • Loading branch information...
pchlupacek authored and mpilquist committed Nov 6, 2018
1 parent aa7e365 commit b847c3557d31120876ac49c775d306d193765f96
@@ -196,7 +196,10 @@ lazy val mimaSettings = Seq(
}.toSet,
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[Problem]("fs2.internal.*"),
ProblemFilters.exclude[Problem]("fs2.Stream#StepLeg.this")
ProblemFilters.exclude[Problem]("fs2.Stream#StepLeg.this"),
ProblemFilters.exclude[Problem]("fs2.concurrent.Publish.*"),
ProblemFilters.exclude[Problem]("fs2.concurrent.Subscribe.*"),
ProblemFilters.exclude[Problem]("fs2.concurrent.PubSub.*")
)
)
@@ -75,6 +75,37 @@ class QueueSpec extends Fs2Spec {
}
}
}
"dequeue releases subscriber on " - {
"interrupt" in {
Queue
.unbounded[IO, Int]
.flatMap { q =>
q.dequeue.interruptAfter(1.second).compile.drain *>
q.enqueue1(1) *>
q.enqueue1(2) *>
q.dequeue1
}
.unsafeRunSync shouldBe 1
}
"cancel" in {
Queue
.unbounded[IO, Int]
.flatMap { q =>
q.dequeue1.timeout(1.second).attempt *>
q.enqueue1(1) *>
q.enqueue1(2) *>
q.dequeue1
}
.unsafeRunSync shouldBe 1
}
}
"size signal is initialized to zero" in {
runLog(
Stream
@@ -46,8 +46,8 @@ object Balance {
def apply[F[_]: Concurrent, O](chunkSize: Int): Pipe[F, O, Stream[F, O]] = { source =>
Stream.eval(PubSub(PubSub.Strategy.closeDrainFirst(strategy[O]))).flatMap { pubSub =>
def subscriber =
Stream
.repeatEval(pubSub.get(chunkSize))
pubSub
.getStream(chunkSize)
.unNoneTerminate
.flatMap(Stream.chunk)
@@ -43,12 +43,12 @@ object Broadcast {
pubSub =>
def subscriber =
Stream.bracket(Sync[F].delay(new Token))(pubSub.unsubscribe).flatMap { selector =>
Stream
.repeatEval(pubSub.get(selector))
pubSub
.getStream(selector)
.unNoneTerminate
.flatMap(Stream.chunk)
}
def publish =
source.chunks
.evalMap(chunk => pubSub.publish(Some(chunk)))
@@ -4,7 +4,7 @@ import cats.{Applicative, Eq}
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.{Concurrent, ExitCase, Sync}
import cats.syntax.all._
import fs2.Chunk
import fs2._
import fs2.internal.Token
import scala.annotation.tailrec
@@ -36,6 +36,15 @@ private[fs2] trait Subscribe[F[_], A, Selector] {
*/
def get(selector: Selector): F[A]
/**
* A variant of `get`, that instead or returning one element will return multiple elements
* in form of stream.
*
* @param selector selector describing which `A` to receive
* @return
*/
def getStream(selector: Selector): Stream[F, A]
/**
* Like `get`, but instead of semantically blocking for a matching element, returns immediately
* with `None` if such an element is not available.
@@ -196,14 +205,16 @@ private[fs2] object PubSub {
}
}
def clearSubscriber(token: Token)(exitCase: ExitCase[Throwable]): F[Unit] = exitCase match {
case ExitCase.Completed => Applicative[F].unit
case ExitCase.Error(_) | ExitCase.Canceled =>
state.update { ps =>
ps.copy(subscribers = ps.subscribers.filterNot(_.token == token))
}
def clearSubscriber(token: Token): F[Unit] =
state.update { ps =>
ps.copy(subscribers = ps.subscribers.filterNot(_.token == token))
}
}
def clearSubscriberOnCancel(token: Token)(exitCase: ExitCase[Throwable]): F[Unit] =
exitCase match {
case ExitCase.Completed => Applicative[F].unit
case ExitCase.Error(_) | ExitCase.Canceled => clearSubscriber(token)
}
new PubSub[F, I, O, Selector] {
def publish(i: I): F[Unit] =
@@ -235,17 +246,39 @@ private[fs2] object PubSub {
update { ps =>
tryGet_(selector, ps) match {
case (ps, None) =>
val token = new Token
val sub =
Subscriber(new Token, selector, Deferred.unsafe[F, O])
Subscriber(token, selector, Deferred.unsafe[F, O])
def cancellableGet =
Sync[F].guaranteeCase(sub.signal.get)(clearSubscriber(sub.token))
Sync[F].guaranteeCase(sub.signal.get)(clearSubscriberOnCancel(token))
(ps.copy(subscribers = ps.subscribers :+ sub), cancellableGet)
case (ps, Some(o)) =>
(ps, Applicative[F].pure(o))
}
}
def getStream(selector: Selector): Stream[F, O] =
Stream.bracket(Sync[F].delay(new Token))(clearSubscriber).flatMap { token =>
def get_ =
update { ps =>
tryGet_(selector, ps) match {
case (ps, None) =>
val sub =
Subscriber(token, selector, Deferred.unsafe[F, O])
(ps.copy(subscribers = ps.subscribers :+ sub), sub.signal.get)
case (ps, Some(o)) =>
(ps, Applicative[F].pure(o))
}
}
Stream.repeatEval(get_)
}
def tryGet(selector: Selector): F[Option[O]] =
update { ps =>
val (ps1, result) = tryGet_(selector, ps)
@@ -209,10 +209,12 @@ object Queue {
pubSub.tryGet(maxSize)
def dequeueChunk(maxSize: Int): Stream[F, A] =
Stream.evalUnChunk(pubSub.get(maxSize)).repeat
pubSub.getStream(maxSize).flatMap(Stream.chunk)
def dequeueBatch: Pipe[F, Int, A] =
_.flatMap(sz => Stream.evalUnChunk(pubSub.get(sz)))
_.flatMap { sz =>
Stream.evalUnChunk(pubSub.get(sz))
}
}
}
@@ -229,10 +231,14 @@ object Queue {
pubSub.tryPublish(a)
def dequeueChunk(maxSize: Int): Stream[F, A] =
Stream.repeatEval(pubSub.get(maxSize)).unNoneTerminate.flatMap(Stream.chunk)
pubSub
.getStream(maxSize)
.unNoneTerminate
.flatMap(Stream.chunk)
def dequeueBatch: Pipe[F, Int, A] =
_.flatMap(sz => Stream.eval(pubSub.get(sz))).unNoneTerminate.flatMap(Stream.chunk)
_.evalMap(pubSub.get).unNoneTerminate
.flatMap(Stream.chunk)
def tryDequeue1: F[Option[Option[A]]] =
pubSub.tryGet(1).flatMap {
@@ -427,13 +433,10 @@ object InspectableQueue {
pubSub.tryGet(Right(maxSize)).map(_.map(_.right.toOption.getOrElse(Chunk.empty)))
def dequeueChunk(maxSize: Int): Stream[F, A] =
Stream
.evalUnChunk(
pubSub.get(Right(maxSize)).map {
_.right.toOption.getOrElse(Chunk.empty)
}
)
.repeat
pubSub.getStream(Right(maxSize)).flatMap {
case Left(_) => Stream.empty
case Right(chunk) => Stream.chunk(chunk)
}
def dequeueBatch: Pipe[F, Int, A] =
_.flatMap { sz =>
@@ -467,7 +470,7 @@ object InspectableQueue {
Stream
.bracket(Sync[F].delay(new Token))(token => pubSub.unsubscribe(Left(Some(token))))
.flatMap { token =>
Stream.repeatEval(pubSub.get(Left(Some(token)))).flatMap {
pubSub.getStream(Left(Some(token))).flatMap {
case Left(s) => Stream.emit(sizeOf(s))
case Right(_) => Stream.empty // impossible
}
@@ -196,7 +196,7 @@ object SignallingRef {
def discrete: Stream[F, A] =
Stream.bracket(Sync[F].delay(Some(new Token)))(pubSub.unsubscribe).flatMap { selector =>
Stream.repeatEval(pubSub.get(selector))
pubSub.getStream(selector)
}
}
@@ -101,12 +101,10 @@ object Topic {
)(selector => pubSub.unsubscribe(Right(selector)))
.map { selector =>
selector ->
Stream
.repeatEval(pubSub.get(Right(selector)))
.flatMap {
case Right(q) => Stream.emit(q)
case Left(_) => Stream.empty // impossible
}
pubSub.getStream(Right(selector)).flatMap {
case Right(q) => Stream.emit(q)
case Left(_) => Stream.empty // impossible
}
}
@@ -140,9 +138,10 @@ object Topic {
Stream
.bracket(Sync[F].delay(new Token))(token => pubSub.unsubscribe(Left(Some(token))))
.flatMap { token =>
Stream.repeatEval(pubSub.get(Left(Some(token)))).flatMap {
pubSub.getStream(Left(Some(token))).flatMap {
case Left(s) => Stream.emit(s.subscribers.size)
case Right(_) => Stream.empty //impossible
}
}
}
@@ -19,6 +19,7 @@ object PubSub {
fs2.concurrent.PubSub(strategy).map { self =>
new PubSub[F, I, O, Selector] {
def get(selector: Selector): F[O] = self.get(selector)
def getStream(selector: Selector): Stream[F, O] = self.getStream(selector)
def tryGet(selector: Selector): F[Option[O]] = self.tryGet(selector)
def subscribe(selector: Selector): F[Boolean] = self.subscribe(selector)
def unsubscribe(selector: Selector): F[Unit] = self.unsubscribe(selector)

0 comments on commit b847c35

Please sign in to comment.