Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a variant of Source.actorRef that uses simple acking for backpressure #17610

Closed
jrudolph opened this issue May 29, 2015 · 26 comments
Closed
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:stream
Milestone

Comments

@jrudolph
Copy link
Member

This would allow a simpler migration path for systems which have used an application-level Ack-based model in the past and would like to update parts of the processing pipeline to streams without having to convert everything at one go or having to implement ActorPublisher manually.

One possibility would be to introduce another overload

def actorRef[T](bufferSize: Int, ack: AnyRef): Source[T, ActorRef]

which responds to a message with an ack as long as (or as soon as) buffer space is available.

See also http://stackoverflow.com/questions/30479777/integrating-an-ack-based-actor-with-akka-stream?noredirect=1#comment49124723_30479777

@jrudolph
Copy link
Member Author

/cc @fommil

@drewhk
Copy link
Member

drewhk commented May 29, 2015

This is actually similar to mapAsync but reversed. It would be interesting to see if this can be implemented in a more generic way making it a Flow instead (which then can be used to create the desired Source in turn) using the new AsyncStage.

@drewhk drewhk added the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label May 29, 2015
@drewhk drewhk added this to the streams-1.x milestone May 29, 2015
@jrudolph
Copy link
Member Author

The point is to expose an ActorRef target, how would that work with a Flow?

@fommil
Copy link
Contributor

fommil commented May 29, 2015

a Sink variant would also be good.

It would be good if the actual ack message could be defined, as it is in akka-io, as a function. e.g. some people will have a wrapper class for the Ack, containing markup about the message that was received.

For Sink, a partial function to acknowledge that a message is an ack should be sent.

A simple "error" message to close the stream should also be supported. Advanced error behaviour should probably require a custom implementation.

@fommil
Copy link
Contributor

fommil commented May 29, 2015

btw, I don't care about the buffer here. If the underlying actor sends a message without having received an Ack that's an API error. Buffering (if needed) should be handled in the user actor code.

@drewhk
Copy link
Member

drewhk commented May 29, 2015

The point is to expose an ActorRef target, how would that work with a Flow?

It could only work if there is a separate registration message, true.

@fommil
Copy link
Contributor

fommil commented May 29, 2015

@jrudolph are you going to scaladays? Give me a ping if you are (I'll be talking about ENSIME, sadly at the same time as sirthius)... I'll owe you a few beers if you're working on this 😄

@loicdescotte
Copy link

I'm not sure to understand why you need a ack parameter, but it would be definitely great to be able to get an ActorRef from a Source, without writing one manually.
Especially for basic cases where the ActorPublisher simply stores source items in a simple bounded list.
For more specific strategies it makes more sense to write a custom ActorPublisher.

@drewhk
Copy link
Member

drewhk commented Jun 18, 2015

@jrudolph
Copy link
Member Author

Especially for basic cases where the ActorPublisher simply stores source items in a simple bounded list.

The ack is the mechanism by which you can make the upstream sender of items stop sending more items if the bounded list is full.

As @drewhk says, there's already Source.actorRef which operates on a bounded buffer but which isn't able to communicate to the upstream sender when the list is full. That's why there's this ticket.

@loicdescotte
Copy link

@drewhk @jrudolph Sorry my comment was not clear at all :)

The problem I see with Source.actorRef, but I must be misunderstanding how it works, is that when you define a source like this :

val source = Source.actorRef[Item](Int.MaxValue, OverflowStrategy.fail)

Then to get the ref, you need to do something like this to get the actor ref :

  val ref = Flow[Item].to(Sink.ignore).runWith(source)

You can replace Sink.ignore by any Sink, but if you don't want to define the way the source will be consumed right now, it's a problem. For example if I want to keep the source without consuming items and merge it with another source later, in my understanding it's not possible with Source.actorRef.

So I think it would be handful to be able to get a "default" ActorPublisher like this one, to be able to elements into a source : http://stackoverflow.com/a/29077212/591922

Did I miss something?

@drewhk
Copy link
Member

drewhk commented Jun 18, 2015

You can only get the ActorRef after materialization. Before that, the Source is not running, there is no actor to get the reference of. Therefore, if you want to reuse that Source in various settings, you need to handle its materialized value via the xMat methods, and propagate it to the use site.

@jrudolph
Copy link
Member Author

@loicdescotte the topic of materialization is conceptually separate from the topic of how to implement a source / publisher. Materialization allows you to reuse a Source. Any hand-written reusable ActorPublisher would exhibit the exact same problem with materialization as Source.actorPublisher also gives you the ActorRef only after materialization.

So, either you go with reusability and, as @drewhk said, feed the result of materialization back to where it's needed, or if you don't care for reusability you can use this trick to create a one-time Source and instantly get to the ActorRef (which is still a lot better than writing your own Publisher):

val (ref, publisher) = Flow[Item].toMat(Sink.publisher)((a, b) => (a, b) /* or `Keep.both` */).runWith(source)
val source = Source(publisher)

@drewhk
Copy link
Member

drewhk commented Jun 18, 2015

Maybe reverse it (no need for Flow):

 val (ref, publisher) =  Source.actorRef[Item].toMat(Sink.publisher)(Keep.both).run()
 val source = Source(publisher)

Btw, it might make sense to expose this pattern as a built-in method

 val (ref, source) = Source.actorRef[Item].preMaterialize()

@jrudolph
Copy link
Member Author

(Of course, I just stupidly copied the code from above)

@drewhk
Copy link
Member

drewhk commented Jun 18, 2015

Added ticket for built-in support: #17769

@loicdescotte
Copy link

@drewhk @jrudolph thanks this is great!

@drewhk
Copy link
Member

drewhk commented Jun 18, 2015

I still recommend propagating the materialized value explicitly though.

@loicdescotte
Copy link

Sorry, I hope I'm not polluting the issue...

@jrudolph

Any hand-written reusable ActorPublisher would exhibit the exact same problem with materialization as Source.actorPublisher also gives you the ActorRef only after materialization.

In fact , if I use a custom actor like in this exemple, I can use the source to merge it with another and it's working very well. I can consume a merged stream in the end with data from several sources.

@drewhk

I still recommend propagating the materialized value explicitly though.

I need to go deeper in the API to udnerstand how to do this. Thanks a lot for your help.

@jrudolph
Copy link
Member Author

In fact , if I use a custom actor like in this exemple, I can use the source to merge it with another and it's working very well. I can consume a merged stream in the end with data from several sources.

The example uses Source(ActorPublisher[Int](actorRef)) which means the resulting Source isn't reusable, i.e. you start with the actorRef to the publisher and create a one-time Source from it. You could use Source.actorPublisher(props), instead, which would make it reusable but would give you actorRef after materialization...

@loicdescotte
Copy link

@jrudolph What does reusable mean for a source ?
With Source.actorPublisher(props), I need materialization, for example with Sink.ignore, which will drop the items in the source. I've tried to use the source after that (for a merge) but there is no more elements inside...
Thanks :)

@jrudolph
Copy link
Member Author

A Publisher can only be subscribed to once whereas a Source is thought to be materialized several times.

See this document for more information:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/stream-design.html

@loicdescotte
Copy link

Thanks for the link and sorry again for spamming in the issue :)

@drewhk drewhk modified the milestones: streams-backlog, streams-2.0 Dec 8, 2015
@rkuhn rkuhn removed the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Mar 9, 2016
@rkuhn
Copy link
Contributor

rkuhn commented Mar 23, 2016

The Sink.actorRefWithAck has been implemented, the Source should be added as a matter of symmetry.

@rkuhn rkuhn added the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Mar 23, 2016
@gortiz
Copy link

gortiz commented Mar 26, 2019

It seems that at some point the topic changed to the materialization problem and the ACK problem was forgotten. Did I miss something? It would be great to have a Source.actorRefWithAck (and something similar with typed actors)

@nvollmar
Copy link
Contributor

I quickly implemented a version with back-pressure based upon the new graph stage implementation (see #26054)

patriknw pushed a commit that referenced this issue May 20, 2019
* Implements actorRef source variant with backpressure #17610

* Small improvements to documentation and source #17610

* Small improvements to test #17610

* Small improvements to implementation and tests #17610

* Adds API for akka-typed #17610

* Adds ack sender and java api for typed #17610
@patriknw patriknw modified the milestones: stream-backlog, 2.6.0-M2 May 20, 2019
patriknw pushed a commit that referenced this issue May 20, 2019
* Implements actorRef source variant with backpressure #17610

* Small improvements to documentation and source #17610

* Small improvements to test #17610

* Small improvements to implementation and tests #17610

* Adds API for akka-typed #17610

* Adds ack sender and java api for typed #17610

(cherry picked from commit f37f415)
@patriknw patriknw modified the milestones: 2.6.0-M2, 2.5.23 May 20, 2019
@patriknw patriknw reopened this May 20, 2019
patriknw added a commit that referenced this issue May 20, 2019
…triknw

actorRef source variant with backpressure #17610
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:stream
Projects
None yet
Development

No branches or pull requests

8 participants