New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+str #18142 ask pattern integration for akka streams #24325

Merged
merged 2 commits into from Feb 22, 2018

Conversation

Projects
None yet
6 participants
@ktoso
Member

ktoso commented Jan 16, 2018

Introduces nicer integration for asking actors in akka streams.

I was hacking on this in breaks and flights recently.
Relates to #18142 and potentially resolves it.

It needs the askUnordered version as well though, which I'll add soon.

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Jan 16, 2018

Collaborator

Test FAILed.

Collaborator

akka-ci commented Jan 16, 2018

Test FAILed.

@patriknw

Good to add this. Let's see if I understand the benefits over mapAsync+ask:

  • watch for early stream failure
  • map response type
Show outdated Hide outdated akka-stream/src/main/scala/akka/stream/impl/Stages.scala Outdated
Show outdated Hide outdated akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala Outdated
Show outdated Hide outdated akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala Outdated
Show outdated Hide outdated akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala Outdated
Show outdated Hide outdated akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala Outdated
*
* Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
* An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
*

This comment has been minimized.

@patriknw

patriknw Jan 16, 2018

Member

also mention the watch

@patriknw

patriknw Jan 16, 2018

Member

also mention the watch

This comment has been minimized.

@ktoso

ktoso Jan 17, 2018

Member

added

@ktoso

ktoso Jan 17, 2018

Member

added

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Jan 17, 2018

Member

Let's see if I understand the benefits over mapAsync+ask:

watch for early stream failure
map response type

Correct, that's the benefits. The first one being the more important one I think - to watch the target and bind the streams failure with that actors termination, so we'd fail with that rather than a later-on timeout in some of the asks.

Member

ktoso commented Jan 17, 2018

Let's see if I understand the benefits over mapAsync+ask:

watch for early stream failure
map response type

Correct, that's the benefits. The first one being the more important one I think - to watch the target and bind the streams failure with that actors termination, so we'd fail with that rather than a later-on timeout in some of the asks.

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Jan 17, 2018

Member

Addressing comments and adding docs; then back to stream refs :)

Member

ktoso commented Jan 17, 2018

Addressing comments and adding docs; then back to stream refs :)

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Jan 17, 2018

Member

Should I include askUnordered as well right now in this PR?
Decided to not add it yet

Member

ktoso commented Jan 17, 2018

Should I include askUnordered as well right now in this PR?
Decided to not add it yet

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw Jan 17, 2018

Member

askUnordered isn't adding any value unless the destination actorRef is delegating the work to other actors that can complete independent of each other. I'd vote for not adding it, at least not until requested.

Member

patriknw commented Jan 17, 2018

askUnordered isn't adding any value unless the destination actorRef is delegating the work to other actors that can complete independent of each other. I'd vote for not adding it, at least not until requested.

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Jan 18, 2018

Member

Agreed on avoiding to copy mapAsync's code, it indeed is hairy. When I first copied it I did not realise how hairy it really is.

I think I can pull off the exact same semantics with a combo like this:

  def ask[S](parallelism: Int)(ref: ActorRef)(implicit timeout: Timeout, tag: ClassTag[S]): Repr[S] = {
    via(Flow[Out].watch(ref).mapAsync(parallelism)(el  akka.pattern.ask(ref).?(el)(timeout).mapTo[S](tag)).named("ask"))
  }

  /**
   * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
   */
  def watch(ref: ActorRef): Repr[Out] =
    via(Watch(ref))

which by itself is pretty nice if you'd want to bind a lifecycle of a stream to some actor.
I do think the watch is nice, esp if people put big timeouts in there, and use it in a distributed setting. Perhaps the stream refs address much of those use cases, but maybe not all, so would want to provide the proper way (I do feel the watch is "proper", and lack thereof a bit ugly) :)

I'll give this a shot after the stream refs are merged (hacked a bit, have the watch, but need to separate tests etc)

Member

ktoso commented Jan 18, 2018

Agreed on avoiding to copy mapAsync's code, it indeed is hairy. When I first copied it I did not realise how hairy it really is.

I think I can pull off the exact same semantics with a combo like this:

  def ask[S](parallelism: Int)(ref: ActorRef)(implicit timeout: Timeout, tag: ClassTag[S]): Repr[S] = {
    via(Flow[Out].watch(ref).mapAsync(parallelism)(el  akka.pattern.ask(ref).?(el)(timeout).mapTo[S](tag)).named("ask"))
  }

  /**
   * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
   */
  def watch(ref: ActorRef): Repr[Out] =
    via(Watch(ref))

which by itself is pretty nice if you'd want to bind a lifecycle of a stream to some actor.
I do think the watch is nice, esp if people put big timeouts in there, and use it in a distributed setting. Perhaps the stream refs address much of those use cases, but maybe not all, so would want to provide the proper way (I do feel the watch is "proper", and lack thereof a bit ugly) :)

I'll give this a shot after the stream refs are merged (hacked a bit, have the watch, but need to separate tests etc)

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Jan 18, 2018

Member

I also agree with regards to the askUnordered -- was thinking about it in flight, and came to the same conclusion, that it only matters for routers and perhaps cluster sharding, but for sharding perhaps a different specialized Sink would be better actually. If anything

Member

ktoso commented Jan 18, 2018

I also agree with regards to the askUnordered -- was thinking about it in flight, and came to the same conclusion, that it only matters for routers and perhaps cluster sharding, but for sharding perhaps a different specialized Sink would be better actually. If anything

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Feb 19, 2018

Collaborator

Test FAILed.

Collaborator

akka-ci commented Feb 19, 2018

Test FAILed.

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Feb 19, 2018

Collaborator

Test FAILed.

Collaborator

akka-ci commented Feb 19, 2018

Test FAILed.

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Feb 19, 2018

Member

PLS BUILD

Member

ktoso commented Feb 19, 2018

PLS BUILD

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Feb 19, 2018

Collaborator

Test FAILed.

Collaborator

akka-ci commented Feb 19, 2018

Test FAILed.

@patriknw

rebase and fix filename, then merge

@johanandren

LGTM

* '''Cancels when''' downstream cancels
*/
@implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage")
def ask[S](parallelism: Int)(ref: ActorRef)(implicit timeout: Timeout, tag: ClassTag[S]): Repr[S] = {

This comment has been minimized.

@johanandren

johanandren Feb 22, 2018

Member

I'd want something more in the api-docs about what parallelism means in this context. Would also be nice with an overload where you don't have to specify the parallelism - ask[Res](actor) for the simple case of single actor doing work and responding case.

@johanandren

johanandren Feb 22, 2018

Member

I'd want something more in the api-docs about what parallelism means in this context. Would also be nice with an overload where you don't have to specify the parallelism - ask[Res](actor) for the simple case of single actor doing work and responding case.

Show outdated Hide outdated akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala Outdated
@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Feb 22, 2018

Collaborator

Test FAILed.

Collaborator

akka-ci commented Feb 22, 2018

Test FAILed.

+str #18142 ask pattern integration for akka streams
progressed with cleanup, removing the same thread exec context is
weird... causes issues :-/ Need to debug more, could be that some race
also exists in mapAsync then :\

WIP

finish ask impl via watch stage

mima

consistency spec

fix paradox, and fix adding ask/watch to javadsl source

follow up review
@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Feb 22, 2018

Member

Fully rebased and applied all comments

Member

ktoso commented Feb 22, 2018

Fully rebased and applied all comments

applied

@akka-ci akka-ci added validating and removed needs-attention labels Feb 22, 2018

@raboof

raboof approved these changes Feb 22, 2018

👍, one question on scaladoc style

implicit val askTimeout = Timeout(5.seconds)
val words: Source[String, NotUsed] =
Source(List("hello", "hi"))
words
.mapAsync(parallelism = 5)(elem (ref ? elem).mapTo[String])
.ask[String](parallelism = 5)(ref)

This comment has been minimized.

@raboof

raboof Feb 22, 2018

Member

👍

@raboof

raboof Feb 22, 2018

Member

👍

* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].
*
* Do not forget to include the expected response type in the method call, like so:

This comment has been minimized.

@raboof

raboof Feb 22, 2018

Member

👍

@raboof

raboof Feb 22, 2018

Member

👍

Show outdated Hide outdated akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala Outdated
@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Feb 22, 2018

Member

Addressed last last comments ;)

Member

ktoso commented Feb 22, 2018

Addressed last last comments ;)

@ktoso ktoso merged commit d6000df into akka:master Feb 22, 2018

1 check passed

typesafe-cla-validator All users have signed the CLA
Details

@ktoso ktoso deleted the ktoso:wip-ask-in-streams branch Feb 22, 2018

@ktoso ktoso restored the ktoso:wip-ask-in-streams branch Feb 22, 2018

@akka-ci akka-ci added tested and removed validating labels Feb 22, 2018

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Feb 22, 2018

Collaborator

Test PASSed.

Collaborator

akka-ci commented Feb 22, 2018

Test PASSed.

@akka-ci akka-ci added validating and removed tested labels Feb 22, 2018

@ktoso ktoso deleted the ktoso:wip-ask-in-streams branch Feb 22, 2018

@akka-ci akka-ci added tested and removed validating labels Feb 22, 2018

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Feb 22, 2018

Collaborator

Test PASSed.

Collaborator

akka-ci commented Feb 22, 2018

Test PASSed.

@patriknw patriknw added this to the 2.5.10 milestone Feb 23, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment