Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request: Overloaded version of .alsoTo that takes a function #28524

Open
otto-dev opened this issue Jan 27, 2020 · 3 comments
Open

Request: Overloaded version of .alsoTo that takes a function #28524

otto-dev opened this issue Jan 27, 2020 · 3 comments
Labels
0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream

Comments

@otto-dev
Copy link

otto-dev commented Jan 27, 2020

It would be nice for .alsoTo, the back-pressuring version of .wireTap, to accept a T => Unit additionally to the current Sink signature for readable execution of element-wise side effects. .wireTap already provides both signatures.

Context: .wireTap was added for execution of side-effects in a Flow without transforming the elements (see #23512). The issue is that it drops elements when the side-effect back-pressures, which makes it unsuitable in situations where the side-effect is an essential part of the stream processing.

If .alsoTo is not a suitable operator name for this overload, my suggestion is .sideEffect. An example use case would be telling an actor that an element has been processed (and can be removed from the persisted stack or similar):

  SomeSource()
      .ask(2)(actor)
      .via(???)
      .sideEffect(actor ! _) // tell actor the element is processed
      .sideEffect(println) 
      .via(???) // do more with the element stream

Relevant historic PR: #24610 (.wireTap)

@otto-dev otto-dev changed the title Request: Version of .alsoTo that takes a function Request: Overloaded version of .alsoTo that takes a function Jan 27, 2020
@johanandren
Copy link
Member

I don't think we should add that.

For the actor use case, if that message interaction is not ask with a reply that arrives in the stream you won't know that it was delivered and it will not slow the stream down if the actor cannot keep up, but instead fill the mailbox and potentially the heap. I don't think fire-and-forget-without backpressure is something we should encourage even if it can make sense in some use cases.

If the operation should delay the stream element and you do not want to fan out for some reason I think map { n => doStuff(n); n } or mapAsync(1) { n => futureOp(n).map(n) } isn't any extreme in syntax overhead vs how common I'd expect that need is. If it can happen in parallel the same goes for .alsoTo(Sink.foreach(n => doStuff).async).

Note that if this is something you need very often in a specific project you should be able to add your own operators to the stream factories through implicit decoration/extension methods.

@johanandren johanandren added 0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream labels Jan 28, 2020
@otto-dev
Copy link
Author

otto-dev commented Jan 28, 2020

Thanks for getting back to me.

you should be able to add your own operators to the stream factories through implicit decoration/extension methods

Thanks, I may do that.

Just for reference, the question of syntax and sensibility has been discussed along the same lines in #23512. My concern is that the current implementation for that issue (.wireTap) drops elements on back-pressure, making it unsuitable for essential parts of the stream. I can see nothing in the original discussion leading up to this design choice, and I think it may well have sneaked under the radar, eg. due to the later choice of .wireTap for the operator name.

it will not slow the stream down if the actor cannot keep up

Sending a message to an actor was an example use-case, but it could be any side-effect that does not need back-pressure. Having said that: In the above example back-pressure is already generated by the .ask on the same actor, preceding the .sideEffect (first ask for the element, then inform that it is processed).

Just adding my two cents. I'm coming from the same position as the original issue's author, having run into a need for this operator a few times.

@huntc
Copy link
Contributor

huntc commented Mar 30, 2020

Having just come across this again, I also think that there's still utility in having a foreach style of stage. The main reason is to convey that a function is side-effecting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream
Projects
None yet
Development

No branches or pull requests

3 participants