Skip to content

Commit

Permalink
Add Sink.asPublisher example and update doc (#30105)
Browse files Browse the repository at this point in the history
Co-authored-by: Renato Cavalcanti <renato@cavalcanti.be>
  • Loading branch information
muskan3006 and octonato committed May 12, 2021
1 parent 240378f commit dd8b514
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 1 deletion.
29 changes: 28 additions & 1 deletion akka-docs/src/main/paradox/stream/operators/Sink/asPublisher.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,31 @@ Integration with Reactive Streams, materializes into a `org.reactivestreams.Publ

## Description

TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646
This method gives you the capability to publish the data from the `Sink` through a Reactive Streams [Publisher](http://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html).
Generally, in Akka Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asPublisher` provides a `Publisher` materialized value when run.
Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Akka stream through the `fanout` parameter.
In Java 9, the Reactive Stream API was included in the JDK, and `Publisher` is available through [Flow.Publisher](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html).
Since those APIs are identical but exist at different package namespaces and does not depend on the Reactive Streams package a separate publisher sink for those is available
through @scala[`akka.stream.scaladsl.JavaFlowSupport.Sink#asPublisher`]@java[`akka.stream.javadsl.JavaFlowSupport.Sink#asPublisher`].


## Example

In the example we are using a source and then creating a Publisher. After that, we see that when `fanout` is true multiple subscribers can subscribe to it,
but when it is false only the first subscriber will be able to subscribe and others will be rejected.

Scala
: @@snip [AsPublisher.scala](/akka-docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala) { #asPublisher }

Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #asPublisher }

## Reactive Streams semantics

@@@div { .callout }

**emits** the materialized publisher

**completes** after the source is consumed and materialized publisher is created

@@@
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import akka.NotUsed;
import akka.actor.ActorSystem;

import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
// #takeLast-operator-example
import akka.japi.Pair;
import org.reactivestreams.Publisher;
// #takeLast-operator-example
import java.util.*;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -139,6 +141,26 @@ static void ignoreExample() {
// #ignore
}

static void asPublisherExample() {
// #asPublisher
Source<Integer, NotUsed> source = Source.range(1, 5);

Publisher<Integer> publisherFalse =
source.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system);
CompletionStage<Integer> resultFromFirstSubscriberFalse =
Source.fromPublisher(publisherFalse)
.runWith(Sink.fold(0, (acc, element) -> acc + element), system);
CompletionStage<Integer> resultFromSecondSubscriberFalse =
Source.fromPublisher(publisherFalse)
.runWith(Sink.fold(1, (acc, element) -> acc * element), system);

resultFromFirstSubscriberFalse.thenAccept(System.out::println); // 15
resultFromSecondSubscriberFalse.thenAccept(
System.out
::println); // No output, because the source was not able to subscribe to the publisher.
// #asPublisher
}

private static Source<String, NotUsed> readLinesFromFile() {
return Source.empty();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package docs.stream.operators.sink

import scala.concurrent.{ ExecutionContextExecutor, Future }
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Sink, Source }

object AsPublisher {
implicit val system: ActorSystem = ???
implicit val ec: ExecutionContextExecutor = system.dispatcher
def asPublisherExample() = {
def asPublisherExample() = {
//#asPublisher
val source = Source(1 to 5)

val publisher = source.runWith(Sink.asPublisher(false))
Source.fromPublisher(publisher).runWith(Sink.foreach(println)) // 1 2 3 4 5
Source
.fromPublisher(publisher)
.runWith(Sink.foreach(println)) //No output, because the source was not able to subscribe to the publisher.
//#asPublisher
}
}
}

0 comments on commit dd8b514

Please sign in to comment.