Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new graph element: contextBypass #15957

Closed
sirthias opened this issue Sep 23, 2014 · 16 comments
Closed

Add new graph element: contextBypass #15957

sirthias opened this issue Sep 23, 2014 · 16 comments
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:stream

Comments

@sirthias
Copy link
Contributor

During the HTTP work we have discovered that we repeatedly build this stream graph setup:

----> incoming Elements ----> split ----------
                                |             |
                                |             |
                                |             |
                                V             V
                             Context     Some inner flow
                                |             |
                                |             |
                                |             |
                                V             |
<---- outgoing Elements <---- "zip" <---------

This appears to be a setup that is of general utility and not in any way specific to HTTP. Here is how its API could look like:

type =>>[A, B] = ProcessorFlow[A, B]

def contextBypass[A, B, C](inner: A =>> B): (Option[A], C) =>> (Option[B], C)

The logic of the produced flow would be like this:

  • If the incoming element is (Some(a), ctx) the option is unpacked and its value sent through the inner flow. The ctx is split off and sent through the bypass, where it is zipped with the corresponding element coming in from the inner flow to form a (Some(b), ctx) tuple.
  • If the incoming element is (None, ctx) the ctx is sent through the bypass without the inner flow seeing an element. The outer flow would then produce a (None, ctx) element at the respective position in the element stream.

Having contextBypass available would significantly simplify the HTTP stream definition on the server-side as well as on the client side.

@drewhk
Copy link
Member

drewhk commented Sep 23, 2014

an alternative, maybe auxiliary option is to have a project-unproject element pair that works somewhat akin to lenses, maybe looking like this (draft):

someclaseClasses.project(<thing that takes off an int>, <thing that puts back an int>).map(_ + 1).filter(_.isPrime).unproject()

where the final stream is a stream of case classes,

@sirthias
Copy link
Contributor Author

Interesting. What would the API signature for project and unproject look like?

@drewhk
Copy link
Member

drewhk commented Sep 23, 2014

I have no idea yet, I just dumped the idea here so we don't forget when discussing this ticket.

@drewhk drewhk added the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Sep 23, 2014
@drewhk drewhk added this to the streams-1.0 milestone Sep 23, 2014
@jrudolph
Copy link
Member

@drewhk's use case if written in one function could look like this:

projectedMap[T, U, A, B](project: T => A, unproject: (T, B) => U, f: ProcessorFlow[A, B]): ProcessorFlow[T, U]

E.g.

case class Person(name: String, age: Int)
val fixName: ProcessorFlow[String, String] = ...

projectedMap(_.name, (p, n) => p.copy(name = n), fixName): ProcessorFlow[Person, Person]

In our case:

val requestWithContext: (Ctx, HttpRequest) = ...
val userHandler: ProcessorFlow[HttpRequest, HttpResponse]

projectedMap(_._2, { case (ctx, response) => (ctx, response) }, userHandler): ProcessorFlow[(Ctx, HttpRequest), (Ctx, HttpResponse)]

Another alternative would also be useful with this signature:

projectedMapUnordered[T, U, A, B](project: T => A, unproject: (T, B) => U, f: A => Future[B]): ProcessorFlow[T, U]

where f is a function returning a Future and the ordering of the output elements is not required to match the ordering of the input elements.

@drewhk
Copy link
Member

drewhk commented Sep 23, 2014

The API is one thing, my proposal differs from the graph one that it lifts AST nodes automatically without needing a bcast-zip pair and therefore can handle one-to-many and many-to-one cases, too, like filter or mapSeq -- but this needs some internal support.

@sirthias
Copy link
Contributor Author

Endre, I have no idea what you are talking about... :)
But it does sound nice! :)

@drewhk
Copy link
Member

drewhk commented Sep 23, 2014

Never mind, we will discuss it when the time comes, I am not sure I understand it either ;)

@aruediger
Copy link
Contributor

Is this still valid?

We currently have methodBypassFanout + ResponseParsingMerge in HttpClient but I think a more general "context bypass" could be useful for things like redirections.

@sirthias
Copy link
Contributor Author

Yes, I think this ticket is still valid.
Not sure that the signature I proposed in the description above really is "the one", but it feels like there is a general construct here that is worthwhile providing.

@13h3r
Copy link
Member

13h3r commented Jan 25, 2015

As for me this can be very useful prebuilt combinator. Lot of cases maps directly to this flow.
I am not sure about API details. What about something like this:

def shortcut[A, B](shortcutCondition: A => Bool, shortcut: A => B)(inner: Flow[A, B]): Flow[A, B]

@drewhk drewhk modified the milestones: streams-2.0, streams-backlog Dec 8, 2015
@rkuhn rkuhn removed the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Mar 9, 2016
@rkuhn rkuhn added the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Mar 23, 2016
@johanandren
Copy link
Member

Should be solved by WithContext, please re-open if not.

@jphelp32
Copy link

@johanandren, I may not be completely understanding the original request here. Or perhaps I'm conflating it w/my own use case that I'm struggling with. But I don't think the WithContext implementation solves the problem. From the original ask, the "innerFlow" maps A to B. Correct me if I'm wrong, but in order for WithContext to work, A and B both would have to be capable of holding the context internally. But I don't think that's really a possibility in most cases.

@raboof
Copy link
Member

raboof commented Jul 18, 2019

From the original ask, the "innerFlow" maps A to B. Correct me if I'm wrong, but in order for WithContext to work, A and B both would have to be capable of holding the context internally. But I don't think that's really a possibility in most cases

I think it works something like this:

case class AWithContext(payload: A, context: MyContext)
val src: Source[AWithContext] = ???
val innerFlow: Flow[A => B] = ???

src
  .asSourceWithContext(awc => awc.context)
  .map(_.payload)
  .via(innerFlow)

Does that make sense?

@jrudolph
Copy link
Member

.via(innerFlow)

I think there is no via on SourceWithContext that would work like that. With the via that is there, the passed in flow needs to deal with context manually. The problem is that Flow is too general as it allows many-inputs to many-outputs and there's no way generally to figure out the association between inputs and outputs. I guess we could provide a via where you could specify how inputs and outputs map, e.g. one input to one output.

@jphelp32
Copy link

jphelp32 commented Jul 18, 2019

What I was looking for when I came here and also when I first read about WithContext. I don't know whether this is the same as the original ask:

case class AWithContext(payload: A, context: MyContext)
case class BWithContext(payload: B, context: MyContext)
val src: Source[AWithContext] = ???
val innerFlow: Flow[A => B] = ???
val downstreamFlow: Flow[BWithContext] = ???

src
  .via(innerFlow.asFlowWithContext(_.payload)(???))  // <-- this is what I can't figure out
  .via(downstreamFlow)

This above is close to the same use case as PassThroughFlow that I found in akka-contrib and alpakka, which, yes, I understand suffers from one-to-one limitation you mention above @jrudolph. My hope was that WithContext was actually solving that limitation. So I hear you saying that it does not. But does it serve the same purpose then as PassThrowFlow? I guess I'm not understanding the use case for WithContext. (The documentation is a bit thin.)

@jrudolph
Copy link
Member

.via(innerFlow.asFlowWithContext(_.payload)(???)) // <-- this is what I can't figure out

This only helps if your inner flow explicitly support the context (but not as a tuple but as some custom wrapped type).

I didn't even know about PassThroughFlow but indeed that looks like an implementation of that special via that would do the automatic propagation (though, it hasn't any safeguards in place if elements don't match at all). We could do a bit better and e.g. fail if there are more outputs than outstanding or if the flow completes before all inputs have seen an output.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:stream
Projects
None yet
Development

No branches or pull requests

9 participants