From e4af8f005ba364d0593c79892a03d4407dc5c933 Mon Sep 17 00:00:00 2001 From: nabond251 Date: Wed, 8 Jun 2022 22:00:42 -0400 Subject: [PATCH] Update integration.md (#5985) Corrected various punctuation, spelling, grammar, and style issues; also corrected slow service SelectAsyncUnordered example --- docs/articles/streams/integration.md | 184 +++++++++++++-------------- 1 file changed, 92 insertions(+), 92 deletions(-) diff --git a/docs/articles/streams/integration.md b/docs/articles/streams/integration.md index da273c42b19..f77d48d7a28 100644 --- a/docs/articles/streams/integration.md +++ b/docs/articles/streams/integration.md @@ -7,7 +7,7 @@ title: Integration ## Integrating with Actors -For piping the elements of a stream as messages to an ordinary actor you can use ``Ask`` in a +For piping the elements of a stream as messages to an ordinary actor, you can use ``Ask`` in a ``SelectAsync`` or use ``Sink.ActorRefWithAck``. Messages can be sent to a stream with ``Source.Queue`` or via the ``IActorRef`` that is @@ -16,7 +16,7 @@ materialized by ``Source.ActorRef``. ### SelectAsync + Ask A nice way to delegate some processing of elements in a stream to an actor is to use ``Ask`` -in ``SelectAsync``. The back-pressure of the stream is maintained by the ``Task`` of the ``Ask`` +in ``SelectAsync``. The back-pressure of the stream is maintained by the ``Task`` of the ``Ask``, and the mailbox of the actor will not be filled with more messages than the given ``parallelism`` of the ``SelectAsync`` stage. @@ -29,13 +29,13 @@ words .RunWith(Sink.Ignore(), _actorMaterializer); ``` -Note that the messages received in the actor will be in the same order as the stream elements, -i.e. the `parallelism` does not change the ordering of the messages. There is a performance -advantage of using parallelism > 1 even though the actor will only process one message at a time +Note that the messages received in the actor will be in the same order as the stream elements; +i.e., the `parallelism` does not change the ordering of the messages. There is a performance +advantage of using parallelism > 1 even though the actor will only process one message at a time, because then there is already a message in the mailbox when the actor has completed previous message. The actor must reply to the `Sender` for each message from the stream. That reply will complete -the `CompletionStage` of the `Ask` and it will be the element that is emitted downstreams +the `CompletionStage` of the `Ask` and it will be the element that is emitted downstream from `SelectAsync`. ```csharp @@ -55,62 +55,62 @@ public class Translator : ReceiveActor The stream can be completed with failure by sending `Akka.Actor.Status.Failure` as reply from the actor. -If the `Ask` fails due to timeout the stream will be completed with `TimeoutException` failure. -If that is not desired outcome you can use `Recover` on the `Ask` `CompletionStage`. +If the `Ask` fails due to a timeout, the stream will be completed with `TimeoutException` failure. +If that is not the desired outcome, you can use `Recover` on the `Ask` `CompletionStage`. -If you don't care about the replies you can use `Sink.Ignore` after the `SelectAsync` stage -and then actor is effectively a sink of the stream. +If you do not care about the replies, you can use `Sink.Ignore` after the `SelectAsync` stage +and then the actor is effectively a sink of the stream. The same pattern can be used with [Actor routers](xref:routers). Then you can use -`SelectAsyncUnordered` for better efficiency if you don't care about the order of the emitted +`SelectAsyncUnordered` for better efficiency, if you do not care about the order of the emitted downstream elements (the replies). ### Sink.ActorRefWithAck The sink sends the elements of the stream to the given `IActorRef` that sends back back-pressure signal. -First element is always `OnInitMessage`, then stream is waiting for the given acknowledgement message -from the given actor which means that it is ready to process elements. +The first element is always `OnInitMessage`; then the stream is waiting for the given acknowledgement message +from the given actor, which means that it is ready to process elements. It also requires the given acknowledgement message after each stream element to make back-pressure work. -If the target actor terminates the stream will be cancelled. When the stream is completed successfully +If the target actor terminates, the stream will be cancelled. When the stream is completed successfully, the given `OnCompleteMessage` will be sent to the destination actor. When the stream -is completed with failure a `Akka.Actor.Status.Failure` message will be sent to the destination actor. +is completed with failure, an `Akka.Actor.Status.Failure` message will be sent to the destination actor. > [!NOTE] ->Using `Sink.ActorRef` or ordinary `Tell` from a `Select` or `ForEach` stage means that there is ->no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages +>Using `Sink.ActorRef` or the ordinary `Tell` from a `Select` or `ForEach` stage means that there is +>no back-pressure signal from the destination actor; i.e., if the actor is not consuming the messages >fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero >`mailbox-push-timeout-time` or use a rate limiting stage in front. ->It's often better to use `Sink.ActorRefWithAck` or `Ask` in `SelectAsync`, though. +>It is often better to use `Sink.ActorRefWithAck` or `Ask` in `SelectAsync`, though. ### Source.Queue `Source.Queue` can be used for emitting elements to a stream from an actor (or from anything running outside the stream). The elements will be buffered until the stream can process them. You can `Offer` -elements to the queue and they will be emitted to the stream if there is demand from downstream, -otherwise they will be buffered until request for demand is received. +elements to the queue and they will be emitted to the stream if there is demand from downstream; +otherwise, they will be buffered until request for demand is received. Use overflow strategy `Akka.Streams.OverflowStrategy.Backpressure` to avoid dropping of elements if the buffer is full. `ISourceQueueWithComplete.OfferAsync` returns `Task` -which completes with `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. -It completes with `QueueOfferResult.Dropped` if element was dropped. It can also complete with -`QueueOfferResult.Failure` when stream failed or `QueueOfferResult.QueueClosed` -when downstream is completed. +which completes with `QueueOfferResult.Enqueued` if the element was added to the buffer or sent downstream. +It completes with `QueueOfferResult.Dropped` if the element was dropped. It can also complete with +`QueueOfferResult.Failure` when the stream failed or `QueueOfferResult.QueueClosed` +when downstream processing is completed. -When used from an actor you typically `pipe` the result of the `Task` back to the actor +When used from an actor, you typically `pipe` the result of the `Task` back to the actor to continue processing. ### Source.ActorRef Messages sent to the actor that is materialized by ``Source.ActorRef`` will be emitted to the -stream if there is demand from downstream, otherwise they will be buffered until request for +stream if there is demand from downstream; otherwise, they will be buffered until request for demand is received. -Depending on the defined `OverflowStrategy` it might drop elements if there is no space +Depending on the defined `OverflowStrategy`, it might drop elements if there is no space available in the buffer. The strategy ``OverflowStrategy.Backpressure`` is not supported -for this Source type, i.e. elements will be dropped if the buffer is filled by sending +for this Source type; i.e., elements will be dropped if the buffer is filled by sending at a rate that is faster than the stream can consume. You should consider using ``Source.Queue`` if you want a backpressured actor interface. @@ -119,12 +119,12 @@ The stream can be completed successfully by sending `Akka.Actor.Status.Success` The stream can be completed with failure by sending ``Akka.Actor.Status.Failure`` to the actor reference. -The actor will be stopped when the stream is completed, failed or cancelled from downstream, -i.e. you can watch it to get notified when that happens. +The actor will be stopped when the stream is completed, failed, or cancelled from downstream; +i.e., you can watch it to get notified when that happens. ## Integrating with External Services -Stream transformations and side effects involving external non-stream based services can be +Stream transformations and side effects involving external non-stream-based services can be performed with ``SelectAsync`` or ``SelectAsyncUnordered``. For example, sending emails to the authors of selected tweets using an external @@ -142,18 +142,18 @@ var authors = tweets .Select(t => t.Author); ``` -Assume that we can lookup their email address using: +Assume that we can look up their email address using: ```csharp -Task LookupEmail(string handle) +Task LookUpEmail(string handle) ``` -Transforming the stream of authors to a stream of email addresses by using the ``LookupEmail`` +Transforming the stream of authors to a stream of email addresses by using the ``LookUpEmail`` service can be done with ``SelectAsync``: ```csharp var emailAddresses = authors - .SelectAsync(4, author => AddressSystem.LookupEmail(author.Handle)) + .SelectAsync(4, author => AddressSystem.LookUpEmail(author.Handle)) .Collect(s => string.IsNullOrWhiteSpace(s) ? null : s); ``` @@ -170,20 +170,20 @@ sendEmails.Run(materializer); ``` ``SelectAsync`` is applying the given function that is calling out to the external service to -each of the elements as they pass through this processing step. The function returns a `Task` -and the value of that task will be emitted downstreams. The number of Tasks +each of the elements as they pass through this processing step. The function returns a `Task`, +and the value of that task will be emitted downstream. The number of Tasks that shall run in parallel is given as the first argument to ``SelectAsync``. These Tasks may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream. -That means that back-pressure works as expected. For example if the ``EmailServer.Send`` -is the bottleneck it will limit the rate at which incoming tweets are retrieved and +That means that back-pressure works as expected. For example, if the ``EmailServer.Send`` +is the bottleneck, it will limit the rate at which incoming tweets are retrieved and email addresses looked up. The final piece of this pipeline is to generate the demand that pulls the tweet authors information through the emailing pipeline: we attach a ``Sink.Ignore`` which makes it all run. If our email process would return some interesting data -for further transformation then we would of course not ignore it but send that +for further transformation, then we would of course not ignore it but send that result stream onwards for further processing or storage. Note that ``SelectAsync`` preserves the order of the stream elements. In this example the order @@ -195,7 +195,7 @@ var authors = tweets .Select(t => t.Author); var emailAddresses = authors - .SelectAsyncUnordered(4, author => AddressSystem.LookupEmail(author.Handle)) + .SelectAsyncUnordered(4, author => AddressSystem.LookUpEmail(author.Handle)) .Collect(s => string.IsNullOrWhiteSpace(s) ? null : s); var sendEmails = emailAddresses.SelectAsyncUnordered(4, address => @@ -208,7 +208,7 @@ sendEmails.Run(materializer); ``` In the above example the services conveniently returned a `Task` of the result. -If that is not the case you need to wrap the call in a `Task`. +If that is not the case, you need to wrap the call in a `Task`. For a service that is exposed as an actor, or if an actor is used as a gateway in front of an external service, you can use ``Ask``: @@ -222,7 +222,7 @@ var saveTweets = akkaTweets ``` Note that if the ``Ask`` is not completed within the given timeout the stream is completed with failure. -If that is not desired outcome you can use ``Recover`` on the ``Ask`` `Task`. +If that is not the desired outcome, you can use ``Recover`` on the ``Ask`` `Task`. ### Illustrating Ordering and Parallelism @@ -230,19 +230,19 @@ Let us look at another example to get a better understanding of the ordering and parallelism characteristics of ``SelectAsync`` and ``SelectAsyncUnordered``. Several ``SelectAsync`` and ``SelectAsyncUnordered`` tasks may run concurrently. -The number of concurrent tasks are limited by the downstream demand. -For example, if 5 elements have been requested by downstream there will be at most 5 +The number of concurrent tasks is limited by the downstream demand. +For example, if 5 elements have been requested by downstream, there will be at most 5 tasks in progress. ``SelectAsync`` emits the task results in the same order as the input elements were received. That means that completed results are only emitted downstream -when earlier results have been completed and emitted. One slow call will thereby +when earlier results have been completed and emitted. One slow call will therefore delay the results of all successive calls, even though they are completed before the slow call. -``SelectAsyncUnordered`` emits the task results as soon as they are completed, i.e. +``SelectAsyncUnordered`` emits the task results as soon as they are completed; i.e., it is possible that the elements are not emitted downstream in the same order as -received from upstream. One slow call will thereby not delay the results of faster +received from upstream. One slow call will therefore not delay the results of faster successive calls as long as there is downstream demand of several elements. Here is a fictive service that we can use to illustrate these aspects. @@ -270,7 +270,7 @@ public class SometimesSlowService } ``` -Elements starting with a lower case character are simulated to take longer time to process. +Elements starting with a lower-case character are simulated to take longer time to process. Here is how we can use it with ``SelectAsync``: @@ -334,13 +334,13 @@ after: I after: J ``` -Note that ``after`` lines are in the same order as the ``before`` lines even -though elements are ``completed`` in a different order. For example ``H`` +Note that the ``after`` lines are in the same order as the ``before`` lines, even +though elements are ``completed`` in a different order. For example, ``H`` is ``completed`` before ``g``, but still emitted afterwards. The numbers in parenthesis illustrates how many calls that are in progress at -the same time. Here the downstream demand and thereby the number of concurrent -calls are limited by the buffer size (4) of the `ActorMaterializerSettings`. +the same time. Here, the downstream demand and therefore the number of concurrent +calls is limited by the buffer size (4) of the `ActorMaterializerSettings`. Here is how we can use the same service with ``SelectAsyncUnordered``: @@ -355,7 +355,7 @@ var result = Source.From(new[] {"a", "B", "C", "D", "e", "F", "g", "H", "i", "J" Console.WriteLine($"before {x}"); return x; }) - .SelectAsync(4, service.Convert) + .SelectAsyncUnordered(4, service.Convert) .RunForeach(x => Console.WriteLine($"after: {x}"), materializer); ``` @@ -404,41 +404,41 @@ completed: i (0) after: I ``` -Note that ``after`` lines are not in the same order as the ``before`` lines. For example ``H`` overtakes the slow ``G``. +Note that the ``after`` lines are not in the same order as the ``before`` lines. For example, ``H`` overtakes the slow ``G``. The numbers in parenthesis illustrates how many calls that are in progress at -the same time. Here the downstream demand and thereby the number of concurrent -calls are limited by the buffer size (4) of the `ActorMaterializerSettings`. +the same time. Here, the downstream demand and therefore the number of concurrent +calls is limited by the buffer size (4) of the `ActorMaterializerSettings`. ### Integrating with Observables -Starting from version 1.3.2, Akka.Streams offers integration with observables - both as possible sources and sinks for incoming events. In order to expose Akka.Streams runnable graph as an observable, use `Sink.AsObservable` method. Example: +Starting from version 1.3.2, Akka.Streams offers integration with observables - both as possible sources and sinks for incoming events. In order to expose Akka.Streams runnable graph as an observable, use the `Sink.AsObservable` method. Example: ```csharp IObservable observable = Source.From(new []{ 1, 2, 3 }) .RunWith(Sink.AsObservable(), materializer); ``` -In order to use an observable as an input source to Akka graph, you may want to use `Source.FromObservable` method. Example: +In order to use an observable as an input source to Akka graph, you may want to use the `Source.FromObservable` method. Example: ```csharp await Source.FromObservable(observable, maxBufferCapacity: 128, overflowStrategy: OverflowStrategy.DropHead) .RunForEach(Console.WriteLine, materializer); ``` -You may notice two extra parameters here. One of the advantages of Akka.Streams (and reactive streams in general) over Reactive Extensions is notion of backpressure - absent in Rx.NET. This puts a constraint of rate limiting the events incoming form upstream. If an observable will be producing events faster, than downstream is able to consume them, source stage will start to buffer them up to a provided `maxBufferCapacity` limit. Once that limit is reached, an overflow strategy will be applied. There are several different overflow strategies to choose from: +You may notice two extra parameters here. One of the advantages of Akka.Streams (and reactive streams in general) over Reactive Extensions is the notion of backpressure - absent in Rx.NET. This puts a constraint of limiting the rate of incoming events from upstream. If an observable will be producing events faster than downstream is able to consume them, the source stage will start to buffer them up to a provided `maxBufferCapacity` limit. Once that limit is reached, an overflow strategy will be applied. There are several different overflow strategies to choose from: -* `OverflowStrategy.DropHead` (default) will drop the oldest element. In this mode source works in circular buffer fashion. -* `OverflowStrategy.DropTail` will cause a current element to replace a one set previously in a buffer. -* `OverflowStrategy.DropNew` will cause current event to be dropped. This effectively will cause dropping any new incoming events until a buffer will get some free space. -* `OverflowStrategy.Fail` will cause a `BufferOverflowException` to be send as an error signal. -* `OverflowStrategy.DropBuffer` will cause a whole buffer to be cleared once it's limit has been reached. +* `OverflowStrategy.DropHead` (default) will drop the oldest element. In this mode, the source works in circular buffer fashion. +* `OverflowStrategy.DropTail` will cause a current element to replace one set previously in a buffer. +* `OverflowStrategy.DropNew` will cause a current element to be dropped. This effectively will cause dropping any new incoming elements until a buffer gets some free space. +* `OverflowStrategy.Fail` will cause a `BufferOverflowException` to be sent as an error signal. +* `OverflowStrategy.DropBuffer` will cause a whole buffer to be cleared once its limit has been reached. Any other `OverflowStrategy` option is not supported by `Source.FromObservable` stage. ### Integrating with Event Handlers -C# events can also be used as a potential source of an Akka.NET stream. It's possible using `Source.FromEvent` methods. Example: +C# events can also be used as a potential source of an Akka.NET stream. This is possible using `Source.FromEvent` methods. Example: ```csharp Source.FromEvent( @@ -456,12 +456,12 @@ Source.FromEvent, RoutedEventArgs>( .RunForEach(e => Console.WriteLine($"Captured click from {e.Source}"), materializer); ``` -Just like in case of `Source.FromObservable`, `Source.FromEvents` can take optional parameters used to configure buffering strategy applied for incoming events. +Just like in case of `Source.FromObservable`, `Source.FromEvents` can take optional parameters used to configure the buffering strategy applied for incoming events. ### Integrating with Reactive Streams `Reactive Streams` defines a standard for asynchronous stream processing with non-blocking -back pressure. It makes it possible to plug together stream libraries that adhere to the standard. +backpressure. It makes it possible to plug together stream libraries that adhere to the standard. Akka Streams is one such library. * Reactive Streams: @@ -485,7 +485,7 @@ and another library knows how to store author handles in a database: ISubscriber Storage ``` -Using an Akka Streams Flow we can transform the stream and connect those: +Using an Akka Streams `Flow`, we can transform the stream and connect those: ```csharp var authors = Flow.Create() @@ -570,15 +570,15 @@ Please note that a factory is necessary to achieve reusability of the resulting ### Implementing Reactive Streams Publisher or Subscriber -As described above any Akka Streams ``Source`` can be exposed as a Reactive Streams ``Publisher`` -and any ``Sink`` can be exposed as a Reactive Streams ``Subscriber``. Therefore we recommend that you +As described above any Akka Streams ``Source`` can be exposed as a Reactive Streams ``Publisher``, +and any ``Sink`` can be exposed as a Reactive Streams ``Subscriber``. Therefore, we recommend that you implement Reactive Streams integrations with built-in stages or [custom stages](xref:custom-stream-processing). -For historical reasons the `ActorPublisher` and `ActorSubscriber` are -provided to support implementing Reactive Streams `Publisher` class and `Subscriber` class with +For historical reasons, the `ActorPublisher` and `ActorSubscriber` are +provided to support implementing the Reactive Streams `Publisher` class and the `Subscriber` class with an `Actor` class. -These can be consumed by other Reactive Stream libraries or used as an Akka Streams `Source` class or `Sink` class. +These can be consumed by other Reactive Stream libraries, or used as an Akka Streams `Source` class or `Sink` class. > [!WARNING] @@ -587,13 +587,13 @@ These can be consumed by other Reactive Stream libraries or used as an Akka Stre > [!WARNING] > `ActorPublisher` class and `ActorSubscriber` class cannot be used with remote actors, -> because if signals of the Reactive Streams protocol (e.g. ``Request``) are lost the -> the stream may deadlock. +> because if signals of the Reactive Streams protocol (e.g., ``Request``) are lost the +> stream may deadlock. ### ActorPublisher -Extend `Akka.Streams.Actor.ActorPublisher` to implement a stream publisher that keeps track of the subscription life cycle and requested elements. +Extend `Akka.Streams.Actor.ActorPublisher` to implement a stream publisher that keeps track of the subscription lifecycle and requested elements. Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: @@ -684,21 +684,21 @@ public class JobManager : Actors.ActorPublisher You send elements to the stream by calling ``OnNext``. You are allowed to send as many elements as have been requested by the stream subscriber. This amount can be inquired with -``TotalDemand``. It is only allowed to use ``OnNext`` when ``IsActive`` and ``TotalDemand > 0``, -otherwise ``OnNext`` will throw ``IllegalStateException``. +``TotalDemand``. It is only allowed to use ``OnNext`` when ``IsActive`` and ``TotalDemand > 0``; +otherwise, ``OnNext`` will throw ``IllegalStateException``. -When the stream subscriber requests more elements the ``ActorPublisherMessage.Request`` message +When the stream subscriber requests more elements, the ``ActorPublisherMessage.Request`` message is delivered to this actor, and you can act on that event. The ``TotalDemand`` is updated automatically. When the stream subscriber cancels the subscription the ``ActorPublisherMessage.Cancel`` message -is delivered to this actor. After that subsequent calls to ``OnNext`` will be ignored. +is delivered to this actor. After that, subsequent calls to ``OnNext`` will be ignored. You can complete the stream by calling ``OnComplete``. After that you are not allowed to -call ``OnNext``, ``OnError`` and ``OnComplete``. +call ``OnNext``, ``OnError``, and ``OnComplete``. You can terminate the stream with failure by calling ``OnError``. After that you are not allowed to -call ``OnNext``, ``OnError`` and ``OnComplete``. +call ``OnNext``, ``OnError``, and ``OnComplete``. If you suspect that this ``ActorPublisher`` may never get subscribed to, you can set the ``SubscriptionTimeout`` property to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when @@ -706,11 +706,11 @@ the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded` cleanup and stop itself. If the actor is stopped the stream will be completed, unless it was not already terminated with -failure, completed or canceled. +failure, completed, or canceled. More detailed information can be found in the API documentation. -This is how it can be used as input `Source` to a `Flow`: +This is how it can be used as an input `Source` to a `Flow`: ```csharp var jobManagerSource = Source.ActorPublisher(JobManager.Props); @@ -733,7 +733,7 @@ You can only attach one subscriber to this publisher. Use a ``Broadcast``-elemen ### ActorSubscriber -Extend `Akka.Streams.Actor.ActorSubscriber` to make your class a stream subscriber with full control of stream back pressure. It will receive `OnNext`, `OnComplete` and `OnError` messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. +Extend `Akka.Streams.Actor.ActorSubscriber` to make your class a stream subscriber with full control of stream backpressure. It will receive `OnNext`, `OnComplete`, and `OnError` messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. Here is an example of such an actor. It dispatches incoming jobs to child worker actors: @@ -852,21 +852,21 @@ public class Worker : ReceiveActor } ``` -Subclass must define the ``RequestStrategy`` to control stream back pressure. -After each incoming message the ``ActorSubscriber`` will automatically invoke +Subclass must define the ``RequestStrategy`` to control stream backpressure. +After each incoming message, the ``ActorSubscriber`` will automatically invoke the ``IRequestStrategy.RequestDemand`` and propagate the returned demand to the stream. * The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself. * The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or delegated to other actors. -* You can also implement a custom ``IRequestStrategy`` or call ``Request`` manually together with - ``ZeroRequestStrategy`` or some other strategy. In that case - you must also call ``Request`` when the actor is started or when it is ready, otherwise +* You can also implement a custom ``IRequestStrategy``, or call ``Request`` manually together with + ``ZeroRequestStrategy`` or some other strategy. In that case, + you must also call ``Request`` when the actor is started or when it is ready; otherwise, it will not receive any elements. More detailed information can be found in the API documentation. -This is how it can be used as output `Sink` to a `Flow`: +This is how it can be used as an output `Sink` to a `Flow`: ```csharp var n = 118;