Skip to content

Commit

Permalink
Convenience stream operators for publishing and subscribing to typed …
Browse files Browse the repository at this point in the history
…pub sub #31037
  • Loading branch information
johanandren committed Jan 12, 2022
1 parent ccd5219 commit b9a1a66
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 1 deletion.
17 changes: 17 additions & 0 deletions akka-docs/src/main/paradox/stream/actor-interop.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,20 @@ Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorR
@@@ note
See also: @ref[ActorSink.actorRefWithBackpressure operator reference docs](operators/ActorSink/actorRefWithBackpressure.md)
@@@


### Topic.source

A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic.

@@@ note
See also: @ref[ActorSink.actorRefWithBackpressure operator reference docs](operators/PubSub/source.md)
@@@

### Topic.sink

A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].

@@@ note
See also: @ref[ActorSink.actorRefWithBackpressure operator reference docs](operators/PubSub/sink.md)
@@@
37 changes: 37 additions & 0 deletions akka-docs/src/main/paradox/stream/operators/PubSub/sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# PubSub.sink

A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].

@ref[Actor interop operators](../index.md#actor-interop-operators)

Note that there is no backpressure from the topic, so care must be taken to not publish messages at a higher rate than that can be handled
by subscribers.

If the topic does not have any subscribers when a message is published, or the topic actor is stopped, the message is sent to dead letters.

## Dependency

This operator is included in:

@@dependency[sbt,Maven,Gradle] {
bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion
symbol1=AkkaVersion
value1="$akka.version$"
group="com.typesafe.akka"
artifact="akka-stream-typed_$scala.binary.version$"
version=AkkaVersion
}

## Signature

@apidoc[PubSub.sink](akka.stream.typed.*.PubSub$) { scala="#sink[T](topic:akka.actor.typed.Toppic[T]):akka.stream.scaladsl.Sink[T,akka.NotUsed]" java="#sink(akka.actor.typed.Topic)" }

## Reactive Streams semantics

@@@div { .callout }

**cancels** never

**backpressures** never

@@@
40 changes: 40 additions & 0 deletions akka-docs/src/main/paradox/stream/operators/PubSub/source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# PubSub.source

A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic.

@ref[Actor interop operators](../index.md#actor-interop-operators)

The source can be materialized multiple times, each materialized stream will stream messages published to the topic after the stream has started.

Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic,
if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit
the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure`
strategy.


## Dependency

This operator is included in:

@@dependency[sbt,Maven,Gradle] {
bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion
symbol1=AkkaVersion
value1="$akka.version$"
group="com.typesafe.akka"
artifact="akka-stream-typed_$scala.binary.version$"
version=AkkaVersion
}

## Signature

@apidoc[PubSub.source](akka.stream.typed.*.PubSub$) { scala="#source[T](topic:akka.actor.typed.Toppic[T]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#source(akka.actor.typed.Topic)" }

## Reactive Streams semantics

@@@div { .callout }

**emits** a message published to the topic is emitted as soon as there is demand from downstream

**completes** when the topic actor terminates

@@@
4 changes: 4 additions & 0 deletions akka-docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ Operators meant for inter-operating between Akka Streams and Actors:
|Source/Flow|<a name="ask"></a>@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).|
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.|
|ActorFlow|<a name="askwithstatus"></a>@ref[askWithStatus](ActorFlow/askWithStatus.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply<T>`] where the T will be unwrapped and emitted downstream.|
|PubSub|<a name="sink"></a>@ref[sink](PubSub/sink.md)|A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].|
|PubSub|<a name="source"></a>@ref[source](PubSub/source.md)|A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic. |
|Source/Flow|<a name="watch"></a>@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.|

## Compression operators
Expand Down Expand Up @@ -525,7 +527,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [setup](Source-or-Flow/setup.md)
* [setup](Sink/setup.md)
* [single](Source/single.md)
* [sink](PubSub/sink.md)
* [sliding](Source-or-Flow/sliding.md)
* [source](PubSub/source.md)
* [splitAfter](Source-or-Flow/splitAfter.md)
* [splitWhen](Source-or-Flow/splitWhen.md)
* [statefulMapConcat](Source-or-Flow/statefulMapConcat.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.typed.javadsl

import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.actor.typed.pubsub.Topic
import akka.annotation.ApiMayChange
import akka.stream.OverflowStrategy
import akka.stream.javadsl.Sink
import akka.stream.javadsl.Source

/**
* Sources and sinks to integrate [[akka.actor.typed.pubsub.Topic]] with streams allowing for local or distributed
* publishing and subscribing of elements through a stream.
*/
object PubSub {

/**
* Create a source that will subscribe to a topic and stream messages published to the topic. Can be materialized
* multiple times, each materialized stream will contain messages published after it was started.
*
* Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic,
* if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit
* the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure`
* strategy.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @param bufferSize The maximum number of messages to buffer if the stream applies backpressure
* @param overflowStrategy Strategy to use once the buffer is full.
* @tparam T The type of the published messages
*/
@ApiMayChange
def source[T](
topicActor: ActorRef[Topic.Command[T]],
bufferSize: Int,
overflowStrategy: OverflowStrategy): Source[T, NotUsed] =
akka.stream.typed.scaladsl.PubSub.source(topicActor, bufferSize, overflowStrategy).asJava

/**
* Create a sink that will publish each message to the given topic. Note that there is no backpressure
* from the topic, so care must be taken to not publish messages at a higher rate than that can be handled
* by subscribers. If the topic does not have any subscribers when a message is published the message is
* sent to dead letters.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @tparam T the type of the messages that can be published
*/
@ApiMayChange
def sink[T](topicActor: ActorRef[Topic.Command[T]]): Sink[T, NotUsed] =
akka.stream.typed.scaladsl.PubSub.sink[T](topicActor).asJava

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.typed.scaladsl

import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.actor.typed.pubsub.Topic
import akka.annotation.ApiMayChange
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

/**
* Sources and sinks to integrate [[akka.actor.typed.pubsub.Topic]] with streams allowing for local or distributed
* publishing and subscribing of elements through a stream.
*/
object PubSub {

/**
* Create a source that will subscribe to a topic and stream messages published to the topic. Can be materialized
* multiple times, each materialized stream will contain messages published after it was started.
*
* Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic,
* if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit
* the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure`
* strategy.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @param bufferSize The maximum number of messages to buffer if the stream applies backpressure
* @param overflowStrategy Strategy to use once the buffer is full.
* @tparam T The type of the published messages
*/
@ApiMayChange
def source[T](
topicActor: ActorRef[Topic.Command[T]],
bufferSize: Int,
overflowStrategy: OverflowStrategy): Source[T, NotUsed] =
ActorSource
.actorRef[T](PartialFunction.empty, PartialFunction.empty, bufferSize, overflowStrategy)
.mapMaterializedValue { ref =>
topicActor ! Topic.Subscribe(ref)
NotUsed
}

/**
* Create a sink that will publish each message to the given topic. Note that there is no backpressure
* from the topic, so care must be taken to not publish messages at a higher rate than that can be handled
* by subscribers. If the topic does not have any subscribers when a message is published or the topic actor is stopped,
* the message is sent to dead letters.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @tparam T the type of the messages that can be published
*/
@ApiMayChange
def sink[T](topicActor: ActorRef[Topic.Command[T]]): Sink[T, NotUsed] = {
Sink
.foreach[T] { message =>
topicActor ! Topic.Publish(message)
}
.mapMaterializedValue(_ => NotUsed)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/

/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.typed.scaladsl

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.internal.pubsub.TopicImpl
import akka.actor.typed.pubsub.Topic
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.wordspec.AnyWordSpecLike

class PubSubSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {

"PubSub.source" should {

"emit messages from the topic" in {
val topic = testKit.spawn(Topic[String]("my-topic-1"))

val source = PubSub.source(topic, 100, OverflowStrategy.fail)
val sourceProbe = source.runWith(TestSink())
sourceProbe.ensureSubscription()

// wait until subscription has been seen
val probe = testKit.createTestProbe[TopicImpl.TopicStats]()
probe.awaitAssert {
topic ! TopicImpl.GetTopicStats(probe.ref)
probe.expectMessageType[TopicImpl.TopicStats].localSubscriberCount should ===(1)
}

topic ! Topic.Publish("published")
sourceProbe.requestNext("published")
sourceProbe.cancel()
}

}

"PubSub.sink" should {
"publish messages" in {
val topic = testKit.spawn(Topic[String]("my-topic-2"))

val subscriberProbe = testKit.createTestProbe[String]()
topic ! Topic.Subscribe(subscriberProbe.ref)

// wait until subscription has been seen
val probe = testKit.createTestProbe[TopicImpl.TopicStats]()
probe.awaitAssert {
topic ! TopicImpl.GetTopicStats(probe.ref)
probe.expectMessageType[TopicImpl.TopicStats].localSubscriberCount should ===(1)
}

Source.single("published").runWith(PubSub.sink(topic))

subscriberProbe.expectMessage("published")
}
}

}
4 changes: 3 additions & 1 deletion project/StreamOperatorsIndexGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala").flatMap { f =>
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/PubSub.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/PubSub.scala").flatMap { f =>
val slashesNr = f.count(_ == '/')
val element = f.split("/")(slashesNr).split("\\.")(0)
IO.read(new File(f))
Expand Down

0 comments on commit b9a1a66

Please sign in to comment.