New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add `wireTap` combinator for monitoring #15077

Closed
rkuhn opened this Issue Apr 29, 2014 · 35 comments

Comments

Projects
None yet
7 participants
@rkuhn
Collaborator

rkuhn commented Apr 29, 2014

A tee produces to an external consumer, but does not propagate back pressure from this consumer into the main stream, instead it only copies items to the monitoring stream whenever that has signaled demand as well, dropping items on that output otherwise.

@rkuhn rkuhn added t:stream labels Apr 29, 2014

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Apr 29, 2014

Member

Good idea, but I would call it 'tap' or 'monitor'.

Member

drewhk commented Apr 29, 2014

Good idea, but I would call it 'tap' or 'monitor'.

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw Apr 29, 2014

Member

wire tap is the name in EAIP

Member

patriknw commented Apr 29, 2014

wire tap is the name in EAIP

@rkuhn

This comment has been minimized.

Show comment
Hide comment
@rkuhn

rkuhn Apr 29, 2014

Collaborator

yes, good point.

Collaborator

rkuhn commented Apr 29, 2014

yes, good point.

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Apr 29, 2014

Member

Hm, this is somewhat relevant to #15082 (at least for static splitting), and can be thought as an "unfair-broadcast".

Member

drewhk commented Apr 29, 2014

Hm, this is somewhat relevant to #15082 (at least for static splitting), and can be thought as an "unfair-broadcast".

@sirthias

This comment has been minimized.

Show comment
Hide comment
@sirthias

sirthias Apr 30, 2014

Contributor

There is also value in being able to simply attach a T => Unit function to a stream (no back-pressure), or, more generally an onEvent callback and friends:

sealed trait StreamEvent[+T]
object StreamEvent {
  case class RequestMore(elements: Int) extends StreamEvent[Nothing]
  case object Cancel extends StreamEvent[Nothing]
  case class OnNext[T](value: T) extends StreamEvent[T]
  case object OnComplete extends StreamEvent[Nothing]
  case class OnError(cause: Throwable) extends StreamEvent[Nothing]
}

    // attaches the given callback which "listens" to `cancel' events without otherwise affecting the stream
def onCancel[U](callback: ⇒ U): Res[A] =
  onEventPF { case StreamEvent.Cancel ⇒ callback }

// attaches the given callback which "listens" to `onComplete' events without otherwise affecting the stream
def onComplete[U](callback: ⇒ U): Res[A] =
  onEventPF { case StreamEvent.OnComplete ⇒ callback }

// attaches the given callback which "listens" to `onNext' events without otherwise affecting the stream
def onElement(callback: A ⇒ Unit): Res[A] =
  onEventPF { case StreamEvent.OnNext(element) ⇒ callback(element) }

// attaches the given callback which "listens" to `onError' events without otherwise affecting the stream
def onError[U](callback: Throwable ⇒ U): Res[A] =
  onEventPF { case StreamEvent.OnError(cause) ⇒ callback(cause) }

// attaches the given callback which "listens" to all stream events without otherwise affecting the stream
def onEvent(callback: StreamEvent[A] ⇒ Unit): Res[A] =
  this ~> OnEvent(callback)

// attaches the given callback which "listens" to all stream events without otherwise affecting the stream
def onEventPF(callback: PartialFunction[StreamEvent[A], Unit]): Res[A] =
  onEvent(ev ⇒ callback.applyOrElse(ev, (_: Any) ⇒ ()))

// attaches the given callback which "listens" to `requestMore' events without otherwise affecting the stream
def onRequestMore(callback: Int ⇒ Unit): Res[A] =
  onEventPF { case StreamEvent.RequestMore(elements) ⇒ callback(elements) }

// attaches the given callback which "listens" to `onComplete' and `onError` events without otherwise affecting the stream
def onTerminate(callback: Option[Throwable] ⇒ Unit): Res[A] =
  onEventPF {
    case StreamEvent.OnComplete     ⇒ callback(None)
    case StreamEvent.OnError(cause) ⇒ callback(Some(cause))
  }

This is incredibly helpful for debugging for example by this operation:

// debugging help: simply printlns all events passing through
def printEvent(marker: String): Res[A] = onEvent(ev ⇒ println(s"$marker: $ev"))
Contributor

sirthias commented Apr 30, 2014

There is also value in being able to simply attach a T => Unit function to a stream (no back-pressure), or, more generally an onEvent callback and friends:

sealed trait StreamEvent[+T]
object StreamEvent {
  case class RequestMore(elements: Int) extends StreamEvent[Nothing]
  case object Cancel extends StreamEvent[Nothing]
  case class OnNext[T](value: T) extends StreamEvent[T]
  case object OnComplete extends StreamEvent[Nothing]
  case class OnError(cause: Throwable) extends StreamEvent[Nothing]
}

    // attaches the given callback which "listens" to `cancel' events without otherwise affecting the stream
def onCancel[U](callback: ⇒ U): Res[A] =
  onEventPF { case StreamEvent.Cancel ⇒ callback }

// attaches the given callback which "listens" to `onComplete' events without otherwise affecting the stream
def onComplete[U](callback: ⇒ U): Res[A] =
  onEventPF { case StreamEvent.OnComplete ⇒ callback }

// attaches the given callback which "listens" to `onNext' events without otherwise affecting the stream
def onElement(callback: A ⇒ Unit): Res[A] =
  onEventPF { case StreamEvent.OnNext(element) ⇒ callback(element) }

// attaches the given callback which "listens" to `onError' events without otherwise affecting the stream
def onError[U](callback: Throwable ⇒ U): Res[A] =
  onEventPF { case StreamEvent.OnError(cause) ⇒ callback(cause) }

// attaches the given callback which "listens" to all stream events without otherwise affecting the stream
def onEvent(callback: StreamEvent[A] ⇒ Unit): Res[A] =
  this ~> OnEvent(callback)

// attaches the given callback which "listens" to all stream events without otherwise affecting the stream
def onEventPF(callback: PartialFunction[StreamEvent[A], Unit]): Res[A] =
  onEvent(ev ⇒ callback.applyOrElse(ev, (_: Any) ⇒ ()))

// attaches the given callback which "listens" to `requestMore' events without otherwise affecting the stream
def onRequestMore(callback: Int ⇒ Unit): Res[A] =
  onEventPF { case StreamEvent.RequestMore(elements) ⇒ callback(elements) }

// attaches the given callback which "listens" to `onComplete' and `onError` events without otherwise affecting the stream
def onTerminate(callback: Option[Throwable] ⇒ Unit): Res[A] =
  onEventPF {
    case StreamEvent.OnComplete     ⇒ callback(None)
    case StreamEvent.OnError(cause) ⇒ callback(Some(cause))
  }

This is incredibly helpful for debugging for example by this operation:

// debugging help: simply printlns all events passing through
def printEvent(marker: String): Res[A] = onEvent(ev ⇒ println(s"$marker: $ev"))

@patriknw patriknw changed the title from add `tee` combinator for monitoring to add `wireTap` combinator for monitoring May 7, 2014

@patriknw patriknw added 2 - pick next and removed 1 - triaged labels May 7, 2014

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso May 7, 2014

Member

I think this may be the right element to use for time measurement, right? Could work on it if so, in order to then use for the timed sink.

Member

ktoso commented May 7, 2014

I think this may be the right element to use for time measurement, right? Could work on it if so, in order to then use for the timed sink.

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw May 14, 2014

Member

Have I understood it correctly that we will build this using a buffer (#15092) and actor consumer endpoint (#15173) or some other consumer, e.g. constructed via Duct?

Member

patriknw commented May 14, 2014

Have I understood it correctly that we will build this using a buffer (#15092) and actor consumer endpoint (#15173) or some other consumer, e.g. constructed via Duct?

@sirthias

This comment has been minimized.

Show comment
Hide comment
@sirthias

sirthias May 14, 2014

Contributor

What's the status on this ticket?

I currently have this snippet in the akka-http-core HttpListener actor (which implements one server instance, bound to a certain port):

case StreamTcp.TcpServerBinding(localAddress, connectionStream) ⇒
  log.info("Bound to {}", endpoint)
  val httpConnectionStream = Flow(connectionStream)
    .map(httpServerPipeline)
    .onTerminate(_ ⇒ shutdown(gracePeriod = Duration.Zero))
    .toProducer(materializer)
  bindCommander ! Http.ServerBinding(localAddress, httpConnectionStream)

Note the onTerminate operation, which allows me to trigger the shutdown of the HttpListener actor when the TCP connection stream is completed by the IO layer.

Without the onTerminate helper I have to resort to something like this instead:

val httpConnectionStream = Flow(connectionStream)
  .map(httpServerPipeline)
  .tee(Duct[Http.IncomingConnection].onComplete(materializer)(_ ⇒ shutdown(gracePeriod = Duration.Zero)))
  .toProducer(materializer)

which works but appears to be a bit of an overkill for simply being able to attach an onTerminate wiretap to a stream.

Contributor

sirthias commented May 14, 2014

What's the status on this ticket?

I currently have this snippet in the akka-http-core HttpListener actor (which implements one server instance, bound to a certain port):

case StreamTcp.TcpServerBinding(localAddress, connectionStream) ⇒
  log.info("Bound to {}", endpoint)
  val httpConnectionStream = Flow(connectionStream)
    .map(httpServerPipeline)
    .onTerminate(_ ⇒ shutdown(gracePeriod = Duration.Zero))
    .toProducer(materializer)
  bindCommander ! Http.ServerBinding(localAddress, httpConnectionStream)

Note the onTerminate operation, which allows me to trigger the shutdown of the HttpListener actor when the TCP connection stream is completed by the IO layer.

Without the onTerminate helper I have to resort to something like this instead:

val httpConnectionStream = Flow(connectionStream)
  .map(httpServerPipeline)
  .tee(Duct[Http.IncomingConnection].onComplete(materializer)(_ ⇒ shutdown(gracePeriod = Duration.Zero)))
  .toProducer(materializer)

which works but appears to be a bit of an overkill for simply being able to attach an onTerminate wiretap to a stream.

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw May 14, 2014

Member

I'm not sure tee will work here, because it might receive the onComplete before downstream, but you might not care, and then you could also use
Flow(httpConnectionStream).onComplete(materializer)(_ ⇒ shutdown...)
or add your own ShutdownTransformer

Member

patriknw commented May 14, 2014

I'm not sure tee will work here, because it might receive the onComplete before downstream, but you might not care, and then you could also use
Flow(httpConnectionStream).onComplete(materializer)(_ ⇒ shutdown...)
or add your own ShutdownTransformer

@sirthias

This comment has been minimized.

Show comment
Hide comment
@sirthias

sirthias May 14, 2014

Contributor

Yes, right, the tee is actually a problem.
Consuming the httpConnectionStream myself will not work as it is the stream that I construct for the user to consume.
And writing my own ShutdownTransformer will not work because I won't get notified of error terminations, right?

So it seems I am somewhat at a loss here.

Contributor

sirthias commented May 14, 2014

Yes, right, the tee is actually a problem.
Consuming the httpConnectionStream myself will not work as it is the stream that I construct for the user to consume.
And writing my own ShutdownTransformer will not work because I won't get notified of error terminations, right?

So it seems I am somewhat at a loss here.

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw May 14, 2014

Member

Transformer has onError, onComplete and also cleanup. You probably want to use cleanup.

Member

patriknw commented May 14, 2014

Transformer has onError, onComplete and also cleanup. You probably want to use cleanup.

@sirthias

This comment has been minimized.

Show comment
Hide comment
@sirthias

sirthias May 14, 2014

Contributor

Ah, cool. onError wasn't part of the earlier version I had in mind.
But of course: cleanup is the way to go. Thanks!

Contributor

sirthias commented May 14, 2014

Ah, cool. onError wasn't part of the earlier version I had in mind.
But of course: cleanup is the way to go. Thanks!

@rkuhn

This comment has been minimized.

Show comment
Hide comment
@rkuhn

rkuhn May 15, 2014

Collaborator

So, independently of whether this is needed for HTTP right now I think that this combinator is useful; I would say “implement after more urgent things are done”.

Collaborator

rkuhn commented May 15, 2014

So, independently of whether this is needed for HTTP right now I think that this combinator is useful; I would say “implement after more urgent things are done”.

@sirthias

This comment has been minimized.

Show comment
Hide comment
@sirthias

sirthias May 15, 2014

Contributor

Just FTR: it's not needed for HTTP right now, because we can use a Transformer. But still I'd deem the feature valuable sugar.

Contributor

sirthias commented May 15, 2014

Just FTR: it's not needed for HTTP right now, because we can use a Transformer. But still I'd deem the feature valuable sugar.

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Nov 6, 2014

Member

I guess a broadcast -> conflate (or buffer in a dropping mode) makes this no longer relevant

Member

drewhk commented Nov 6, 2014

I guess a broadcast -> conflate (or buffer in a dropping mode) makes this no longer relevant

@drewhk drewhk closed this Nov 6, 2014

@2m 2m modified the milestones: invalid, streams-1.x Jun 16, 2015

@rkuhn rkuhn reopened this Oct 5, 2015

@rkuhn

This comment has been minimized.

Show comment
Hide comment
@rkuhn

rkuhn Oct 5, 2015

Collaborator

Reviving this ticket after a change in FlowOps policy.

Collaborator

rkuhn commented Oct 5, 2015

Reviving this ticket after a change in FlowOps policy.

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Oct 5, 2015

Member

Yeah, but this is still a rather specialized construct. Do you think it is worth to put it on FlowOps?

Member

drewhk commented Oct 5, 2015

Yeah, but this is still a rather specialized construct. Do you think it is worth to put it on FlowOps?

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Oct 5, 2015

Member

@drewhk It seems generally useful though? It is an excellent way of chaining Sinks unto a Flow/Source while still allowing it to be meaningfully composed.

Member

viktorklang commented Oct 5, 2015

@drewhk It seems generally useful though? It is an excellent way of chaining Sinks unto a Flow/Source while still allowing it to be meaningfully composed.

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Oct 5, 2015

Member

I am not sure we are talking about the same thing. The original ticket says this:

A tee produces to an external consumer, but does not propagate back pressure from this consumer into the main stream, instead it only copies items to the monitoring stream whenever that has signaled demand as well, dropping items on that output otherwise.

I.e. this is a non-backpressured broadcast. While it is a useful pattern for sure, I am not sure about how general it is.

Member

drewhk commented Oct 5, 2015

I am not sure we are talking about the same thing. The original ticket says this:

A tee produces to an external consumer, but does not propagate back pressure from this consumer into the main stream, instead it only copies items to the monitoring stream whenever that has signaled demand as well, dropping items on that output otherwise.

I.e. this is a non-backpressured broadcast. While it is a useful pattern for sure, I am not sure about how general it is.

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Oct 5, 2015

Member

@drewhk Good point. I'm talking about sugar for Broadcast. (Which I consider a Generally useful thing™)

Member

viktorklang commented Oct 5, 2015

@drewhk Good point. I'm talking about sugar for Broadcast. (Which I consider a Generally useful thing™)

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Oct 5, 2015

Member

Yeah I know, but that is #17827 right?

Member

drewhk commented Oct 5, 2015

Yeah I know, but that is #17827 right?

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Oct 5, 2015

Member

Yeah, true.
Thinking about this Issue specifically I'm not sure if this feature should be made easy, since it injects side-effects and encourages the wrong kind of programming (just as I don't recommend people to use Future.onComplete/onFailure/onSuccess)

Member

viktorklang commented Oct 5, 2015

Yeah, true.
Thinking about this Issue specifically I'm not sure if this feature should be made easy, since it injects side-effects and encourages the wrong kind of programming (just as I don't recommend people to use Future.onComplete/onFailure/onSuccess)

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Oct 5, 2015

Member

I think this proposal will still accept a Sink (i.e. not as much as onComplete, but more a rate-detached operation), the difference is that it does not delay the "main-arm" if this side-sink is not ready, instead just drops. So in terms of the in-line broadcast sugar, this boils down to do an in-line broadcast with a droppy buffer on one-arm.

Member

drewhk commented Oct 5, 2015

I think this proposal will still accept a Sink (i.e. not as much as onComplete, but more a rate-detached operation), the difference is that it does not delay the "main-arm" if this side-sink is not ready, instead just drops. So in terms of the in-line broadcast sugar, this boils down to do an in-line broadcast with a droppy buffer on one-arm.

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Oct 5, 2015

Member

@drewhk But if we introduce the equivalent of alsoTo(s: Sink) the question is if we shouldn't just recommend people to do: alsoTo(Flow[X].buffer(n, drop).to(yadda))

Member

viktorklang commented Oct 5, 2015

@drewhk But if we introduce the equivalent of alsoTo(s: Sink) the question is if we shouldn't just recommend people to do: alsoTo(Flow[X].buffer(n, drop).to(yadda))

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Oct 5, 2015

Member

I think we are talking about the same thing. Given an in-line broadcast, this ticket is just a simple pattern which probably does not deserve a specific sugar method.

Member

drewhk commented Oct 5, 2015

I think we are talking about the same thing. Given an in-line broadcast, this ticket is just a simple pattern which probably does not deserve a specific sugar method.

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Oct 5, 2015

Member

Yep, we agree on that :)

Member

viktorklang commented Oct 5, 2015

Yep, we agree on that :)

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Nov 30, 2015

Member

Since we have alsoTo now, I deem this closeable.

Member

viktorklang commented Nov 30, 2015

Since we have alsoTo now, I deem this closeable.

@rkuhn

This comment has been minimized.

Show comment
Hide comment
@rkuhn

rkuhn Nov 30, 2015

Collaborator

The “not propagating back-pressure” part is not included in alsoTo, and I also think that Broadcast has the wrong termination semantics for this kind of usage, making it impossible to use alsoTo to implement wireTap. Therefore I am not yet convinced that we can close this.

Collaborator

rkuhn commented Nov 30, 2015

The “not propagating back-pressure” part is not included in alsoTo, and I also think that Broadcast has the wrong termination semantics for this kind of usage, making it impossible to use alsoTo to implement wireTap. Therefore I am not yet convinced that we can close this.

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Nov 30, 2015

Member

@rkuhn What about alsoTo(Flow[X].buffer(n, drop).to(Sink.theBismarck()))

Member

viktorklang commented Nov 30, 2015

@rkuhn What about alsoTo(Flow[X].buffer(n, drop).to(Sink.theBismarck()))

@rkuhn

This comment has been minimized.

Show comment
Hide comment
@rkuhn

rkuhn Nov 30, 2015

Collaborator

That does not terminate once the other (i.e. the real) sink is no longer interested. Would you like your logging statement to keep the stream running when everybody else already jumped ship?

Collaborator

rkuhn commented Nov 30, 2015

That does not terminate once the other (i.e. the real) sink is no longer interested. Would you like your logging statement to keep the stream running when everybody else already jumped ship?

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Nov 30, 2015

Member

@rkuhn Now that, my friend, is a very good argument.

Member

viktorklang commented Nov 30, 2015

@rkuhn Now that, my friend, is a very good argument.

@rkuhn

This comment has been minimized.

Show comment
Hide comment
@rkuhn

rkuhn Nov 30, 2015

Collaborator

Thinking about that: maybe wireTap should end up just being an additional argument to alsoTo that controls the “importance” of the other Sink in terms of termination. It depends on which is more discoverable.

Collaborator

rkuhn commented Nov 30, 2015

Thinking about that: maybe wireTap should end up just being an additional argument to alsoTo that controls the “importance” of the other Sink in terms of termination. It depends on which is more discoverable.

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Nov 30, 2015

Member

I more like the tap combinator than an additional argument.

Member

drewhk commented Nov 30, 2015

I more like the tap combinator than an additional argument.

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Nov 30, 2015

Member

@rkuhn @drewhk Isn't that consistent with an eagerComplete flag we have for merge?

Member

viktorklang commented Nov 30, 2015

@rkuhn @drewhk Isn't that consistent with an eagerComplete flag we have for merge?

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Nov 30, 2015

Member

Not really. In case of broadcast eagerCancel would mean if one cancels, then the bcast cancels and stops too. In this case we don't want the cancellation of the logger for example to propagate. I.e. this is more like the analgoue of PreferredMerge (not completely though).

Member

drewhk commented Nov 30, 2015

Not really. In case of broadcast eagerCancel would mean if one cancels, then the bcast cancels and stops too. In this case we don't want the cancellation of the logger for example to propagate. I.e. this is more like the analgoue of PreferredMerge (not completely though).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment