Skip to content

Commit

Permalink
!str - 18808 - Removes Sink.fanoutPublisher and makes Sink.publisher …
Browse files Browse the repository at this point in the history
…specify number of subscribers

Sink.publisher now takes a max number of Subscribers and
the elasticity between concurrent Subscribers.
  • Loading branch information
viktorklang committed Nov 4, 2015
1 parent 33444c5 commit f839a1f
Show file tree
Hide file tree
Showing 54 changed files with 247 additions and 239 deletions.
13 changes: 6 additions & 7 deletions akka-docs-dev/rst/java/stream-integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ This is how it can be used as input :class:`Source` to a :class:`Flow`:

.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ActorPublisherDocTest.java#actor-publisher-usage

You can only attach one subscriber to this publisher. Use a ``Broadcast``
element or attach a ``Sink.fanoutPublisher`` to enable multiple subscribers.
You can only attach one subscriber to this publisher. Increase the max number of subscribers parameter or use a `Broadcast` element
in order to support multiple subscribers.

ActorSubscriber
^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -414,18 +414,17 @@ by using the Publisher-:class:`Sink`:

.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#source-publisher

A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second
subscription attempt will be rejected with an :class:`IllegalStateException`.
A publisher that is created with ``Sink.publisher`` supports a specified number of subscribers. Additional
subscription attempts will be rejected with an :class:`IllegalStateException`.

A publisher that supports multiple subscribers can be created with ``Sink.fanoutPublisher``
instead:
A publisher that supports multiple subscribers is created as follows:

.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java
:include: author-alert-subscriber,author-storage-subscriber

.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#source-fanoutPublisher

The buffer size controls how far apart the slowest subscriber can be from the fastest subscriber
The input buffer size of the stage controls how far apart the slowest subscriber can be from the fastest subscriber
before slowing down the stream.

To make the picture complete, it is also possible to expose a :class:`Sink` as a :class:`Subscriber`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {

val impl = new Fixture {
override def tweets: Publisher[Tweet] =
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher)
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher(1))

override def storage = TestSubscriber.manualProbe[Author]

Expand Down Expand Up @@ -92,7 +92,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {

//#source-publisher
val authorPublisher: Publisher[Author] =
Source(tweets).via(authors).runWith(Sink.publisher)
Source(tweets).via(authors).runWith(Sink.publisher(1))

authorPublisher.subscribe(storage)
//#source-publisher
Expand All @@ -108,7 +108,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
//#source-fanoutPublisher
val authorPublisher: Publisher[Author] =
Source(tweets).via(authors)
.runWith(Sink.fanoutPublisher(initialBufferSize = 8, maximumBufferSize = 16))
.runWith(Sink.publisher(maxNumberOfSubscribers = Int.MaxValue))

authorPublisher.subscribe(storage)
authorPublisher.subscribe(alert)
Expand Down
9 changes: 4 additions & 5 deletions akka-docs-dev/rst/scala/stream-integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ This is how it can be used as input :class:`Source` to a :class:`Flow`:

.. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage

You can only attach one subscriber to this publisher. Use a ``Broadcast``
element or attach a ``Sink.fanoutPublisher`` to enable multiple subscribers.
A publisher that is created with ``Sink.publisher`` supports a specified number of subscribers. Additional
subscription attempts will be rejected with an :class:`IllegalStateException`.

ActorSubscriber
^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -412,15 +412,14 @@ by using the Publisher-:class:`Sink`:
A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second
subscription attempt will be rejected with an :class:`IllegalStateException`.

A publisher that supports multiple subscribers can be created with ``Sink.fanoutPublisher``
instead:
A publisher that supports multiple subscribers is created as follows:

.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala
:include: author-alert-subscriber,author-storage-subscriber

.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-fanoutPublisher

The buffer size controls how far apart the slowest subscriber can be from the fastest subscriber
The input buffer size of the stage controls how far apart the slowest subscriber can be from the fastest subscriber
before slowing down the stream.

To make the picture complete, it is also possible to expose a :class:`Sink` as a :class:`Subscriber`
Expand Down
2 changes: 1 addition & 1 deletion akka-docs-dev/rst/stream-design.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Akka Streams fully implement the Reactive Streams specification and interoperate

All stream Processors produced by the default materialization of Akka Streams are restricted to having a single Subscriber, additional Subscribers will be rejected. The reason for this is that the stream topologies described using our DSL never require fan-out behavior from the Publisher sides of the elements, all fan-out is done using explicit elements like :class:`Broadcast[T]`.

This means that ``Sink.fanoutPublisher`` must be used where multicast behavior is needed for interoperation with other Reactive Streams implementations.
This means that ``Sink.publisher(<max number of Subscribers>)`` must be used where broadcast behavior is needed for interoperation with other Reactive Streams implementations.

What shall users of streaming libraries expect?
-----------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ private[http] object StreamUtils {
case Nil Nil
case Seq(one) Vector(input.via(one))
case multiple
val (fanoutSub, fanoutPub) = Source.subscriber[ByteString].toMat(Sink.fanoutPublisher(16, 16))(Keep.both).run()
val (fanoutSub, fanoutPub) = Source.subscriber[ByteString].toMat(Sink.publisher(transformers.size))(Keep.both).run()
val sources = transformers.map { flow
// Doubly wrap to ensure that subscription to the running publisher happens before the final sources
// are exposed, so there is no race
Source(Source(fanoutPub).viaMat(flow)(Keep.right).runWith(Sink.publisher))
Source(Source(fanoutPub).viaMat(flow)(Keep.right).runWith(Sink.publisher(1)))
}
// The fanout publisher must be wired to the original source after all fanout subscribers have been subscribed
input.runWith(Sink(fanoutSub))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
def acceptConnection(): (TestSubscriber.ManualProbe[HttpRequest], TestPublisher.ManualProbe[HttpResponse]) = {
connSourceSub.request(1)
val incomingConnection = connSource.expectNext()
val sink = Sink.publisher[HttpRequest]
val sink = Sink.publisher[HttpRequest](1)
val source = Source.subscriber[HttpResponse]

val handler = Flow.fromSinkAndSourceMat(sink, source)(Keep.both)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.reactivestreams.Publisher
class ConcatTest extends AkkaPublisherVerification[Int] {

def createPublisher(elements: Long): Publisher[Int] = {
Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher)
Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher(1))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class FanoutPublisherTest extends AkkaPublisherVerification[Int] {
if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else 0 until elements.toInt

Source(iterable).runWith(Sink.fanoutPublisher(initialBufferSize = 2, maximumBufferSize = 4))
Source(iterable).runWith(Sink.publisher(Int.MaxValue))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class FlattenTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val s1 = Source(iterable(elements / 2))
val s2 = Source(iterable((elements + 1) / 2))
Source(List(s1, s2)).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink.publisher)
Source(List(s1, s2)).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink.publisher(1))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class FuturePublisherTest extends AkkaPublisherVerification[Int] {

def createPublisher(elements: Long): Publisher[Int] = {
val p = Promise[Int]()
val pub = Source(p.future).runWith(Sink.publisher)
val pub = Source(p.future).runWith(Sink.publisher(1))
p.success(0)
pub
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class GroupByTest extends AkkaPublisherVerification[Int] {
val futureGroupSource =
Source(iterable(elements)).groupBy(elem "all").map { case (_, group) group }.runWith(Sink.head)
val groupSource = Await.result(futureGroupSource, 3.seconds)
groupSource.runWith(Sink.publisher)
groupSource.runWith(Sink.publisher(1))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.reactivestreams._
class IterablePublisherTest extends AkkaPublisherVerification[Int] {

override def createPublisher(elements: Long): Publisher[Int] = {
Source(iterable(elements)).runWith(Sink.publisher)
Source(iterable(elements)).runWith(Sink.publisher(1))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.stream.scaladsl.{ Keep, Source, Sink }
class MaybeSourceTest extends AkkaPublisherVerification[Int] {

def createPublisher(elements: Long): Publisher[Int] = {
val (p, pub) = Source.maybe[Int].toMat(Sink.publisher)(Keep.both).run()
val (p, pub) = Source.maybe[Int].toMat(Sink.publisher(1))(Keep.both).run()
p success Some(1)
pub
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class PrefixAndTailTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val futureTailSource = Source(iterable(elements)).prefixAndTail(0).map { case (_, tail) tail }.runWith(Sink.head)
val tailSource = Await.result(futureTailSource, 3.seconds)
tailSource.runWith(Sink.publisher)
tailSource.runWith(Sink.publisher(1))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.reactivestreams._
class SingleElementPublisherTest extends AkkaPublisherVerification[Int] {

def createPublisher(elements: Long): Publisher[Int] = {
Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher)
Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher(1))
}

override def maxElementsFromPublisher(): Long = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.reactivestreams.Publisher
class SingleElementSourceTest extends AkkaPublisherVerification[Int] {

def createPublisher(elements: Long): Publisher[Int] =
Source.single(1).runWith(Sink.publisher)
Source.single(1).runWith(Sink.publisher(1))

override def maxElementsFromPublisher(): Long = 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class SplitWhenTest extends AkkaPublisherVerification[Int] {
else {
val futureSource = Source(iterable(elements)).splitWhen(elem false).runWith(Sink.head)
val source = Await.result(futureSource, 3.seconds)
source.runWith(Sink.publisher)
source.runWith(Sink.publisher(1))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SynchronousFilePublisherTest extends AkkaPublisherVerification[ByteString]
def createPublisher(elements: Long): Publisher[ByteString] =
SynchronousFileSource(file, chunkSize = 512)
.take(elements)
.runWith(Sink.publisher)
.runWith(Sink.publisher(1))

@AfterClass
def after = file.delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ abstract class BaseTwoStreamsSetup extends AkkaSpec {

def completedPublisher[T]: Publisher[T] = TestPublisher.empty[T]

def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher)
def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher(1))

def soonToFailPublisher[T]: Publisher[T] = TestPublisher.lazyError[T](TestException)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ trait ScriptedTest extends Matchers {
class ScriptException(msg: String) extends RuntimeException(msg)

def toPublisher[In, Out]: (Source[Out, _], ActorMaterializer) Publisher[Out] =
(f, m) f.runWith(Sink.publisher)(m)
(f, m) f.runWith(Sink.publisher(1))(m)

object Script {
def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TestPublisherSubscriberSpec extends AkkaSpec {
"have all events accessible from manual probes" in assertAllStagesStopped {
val upstream = TestPublisher.manualProbe[Int]()
val downstream = TestSubscriber.manualProbe[Int]()
Source(upstream).runWith(Sink.publisher)(materializer).subscribe(downstream)
Source(upstream).runWith(Sink.publisher(1))(materializer).subscribe(downstream)

val upstreamSubscription = upstream.expectSubscription()
val downstreamSubscription: Subscription = downstream.expectEventPF { case OnSubscribe(sub) sub }
Expand All @@ -46,7 +46,7 @@ class TestPublisherSubscriberSpec extends AkkaSpec {
"handle gracefully partial function that is not suitable" in assertAllStagesStopped {
val upstream = TestPublisher.manualProbe[Int]()
val downstream = TestSubscriber.manualProbe[Int]()
Source(upstream).runWith(Sink.publisher)(materializer).subscribe(downstream)
Source(upstream).runWith(Sink.publisher(1))(materializer).subscribe(downstream)
val upstreamSubscription = upstream.expectSubscription()
val downstreamSubscription: Subscription = downstream.expectEventPF { case OnSubscribe(sub) sub }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void mustBeAbleToUseMerge() throws Exception {
final Source<String, BoxedUnit> in1 = Source.from(Arrays.asList("a", "b", "c"));
final Source<String, BoxedUnit> in2 = Source.from(Arrays.asList("d", "e", "f"));

final Sink<String, Publisher<String>> publisher = Sink.publisher();
final Sink<String, Publisher<String>> publisher = Sink.publisher(1);

final Source<String, BoxedUnit> source = Source.fromGraph(
FlowGraph.create(new Function<FlowGraph.Builder<BoxedUnit>, SourceShape<String>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void mustBeAbleToUseMerge() throws Exception {
final Source<String, BoxedUnit> in1 = Source.from(Arrays.asList("a", "b", "c"));
final Source<String, BoxedUnit> in2 = Source.from(Arrays.asList("d", "e", "f"));

final Sink<String, Publisher<String>> publisher = Sink.publisher();
final Sink<String, Publisher<String>> publisher = Sink.publisher(1);

final Source<String, BoxedUnit> source = Source.fromGraph(
FlowGraph.create(new Function<FlowGraph.Builder<BoxedUnit>, SourceShape<String>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public SinkTest() {

@Test
public void mustBeAbleToUseFanoutPublisher() throws Exception {
final Sink<Object, Publisher<Object>> pubSink = Sink.fanoutPublisher(2, 2);
final Sink<Object, Publisher<Object>> pubSink = Sink.publisher(Integer.MAX_VALUE);
@SuppressWarnings("unused")
final Publisher<Object> publisher = Source.from(new ArrayList<Object>()).runWith(pubSink, materializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest {
map(_.toString), duration probe.ref ! duration).
map { s: String s + "!" }

val (flowIn: Subscriber[Int], flowOut: Publisher[String]) = flow.runWith(Source.subscriber[Int], Sink.publisher[String])
val (flowIn: Subscriber[Int], flowOut: Publisher[String]) = flow.runWith(Source.subscriber[Int], Sink.publisher[String](1))

val c1 = TestSubscriber.manualProbe[String]()
val c2 = flowOut.subscribe(c1)

val p = Source(0 to 100).runWith(Sink.publisher)
val p = Source(0 to 100).runWith(Sink.publisher(1))
p.subscribe(flowIn)

val s = c1.expectSubscription()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {

val p = SynchronousFileSource(testFile, chunkSize)
.withAttributes(bufferAttributes)
.runWith(Sink.publisher)
.runWith(Sink.publisher(1))
val c = TestSubscriber.manualProbe[ByteString]()
p.subscribe(c)
val sub = c.expectSubscription()
Expand Down Expand Up @@ -113,7 +113,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {

val p = SynchronousFileSource(testFile, chunkSize)
.withAttributes(bufferAttributes)
.runWith(Sink.publisher)
.runWith(Sink.publisher(1))

val c = TestSubscriber.manualProbe[ByteString]()
p.subscribe(c)
Expand All @@ -140,7 +140,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}

"onError whent trying to read from file which does not exist" in assertAllStagesStopped {
val p = SynchronousFileSource(notExistingFile).runWith(Sink.publisher)
val p = SynchronousFileSource(notExistingFile).runWith(Sink.publisher(1))
val c = TestSubscriber.manualProbe[ByteString]()
p.subscribe(c)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ class FlowCompileSpec extends AkkaSpec {
val closedSource: Source[Int, _] = intSeq.via(open3)
"closedSource.run()" shouldNot compile

val closedSink: Sink[Int, _] = open3.to(Sink.publisher[Int])
val closedSink: Sink[Int, _] = open3.to(Sink.publisher[Int](1))
"closedSink.run()" shouldNot compile

closedSource.to(Sink.publisher[Int]).run()
closedSource.to(Sink.publisher[Int](1)).run()
intSeq.to(closedSink).run()
}
"append Sink" in {
val open: Flow[Int, String, _] = Flow[Int].map(_.toString)
val closedSink: Sink[String, _] = Flow[String].map(_.hashCode).to(Sink.publisher[Int])
val closedSink: Sink[String, _] = Flow[String].map(_.hashCode).to(Sink.publisher[Int](1))
val appended: Sink[Int, _] = open.to(closedSink)
"appended.run()" shouldNot compile
"appended.connect(Sink.head[Int])" shouldNot compile
Expand All @@ -61,13 +61,13 @@ class FlowCompileSpec extends AkkaSpec {
val closedSource2: Source[String, _] = closedSource.via(open)
"closedSource2.run()" shouldNot compile
"strSeq.connect(closedSource2)" shouldNot compile
closedSource2.to(Sink.publisher[String]).run
closedSource2.to(Sink.publisher[String](1)).run
}
}

"Sink" should {
val openSource: Sink[Int, _] =
Flow[Int].map(_.toString).to(Sink.publisher[String])
Flow[Int].map(_.toString).to(Sink.publisher[String](1))
"accept Source" in {
intSeq.to(openSource)
}
Expand All @@ -83,7 +83,7 @@ class FlowCompileSpec extends AkkaSpec {
val openSource: Source[String, _] =
Source(Seq(1, 2, 3)).map(_.toString)
"accept Sink" in {
openSource.to(Sink.publisher[String])
openSource.to(Sink.publisher[String](1))
}
"not be accepted by Source" in {
"openSource.connect(intSeq)" shouldNot compile
Expand All @@ -96,7 +96,7 @@ class FlowCompileSpec extends AkkaSpec {
"RunnableGraph" should {
Sink.head[String]
val closed: RunnableGraph[Publisher[String]] =
Source(Seq(1, 2, 3)).map(_.toString).toMat(Sink.publisher[String])(Keep.right)
Source(Seq(1, 2, 3)).map(_.toString).toMat(Sink.publisher[String](1))(Keep.right)
"run" in {
closed.run()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FlowConcatSpec extends BaseTwoStreamsSetup {
val s2: Source[String, _] = Source(List(4, 5, 6)).map(_.toString + "-s")

val subs = TestSubscriber.manualProbe[Any]()
val subSink = Sink.publisher[Any]
val subSink = Sink.publisher[Any](1)

val (_, res) = f1.concat(s2).runWith(s1, subSink)

Expand Down
Loading

0 comments on commit f839a1f

Please sign in to comment.