Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] mapConcat context mapping strategies #28712

Closed
wants to merge 10 commits into from

Conversation

seglo
Copy link
Member

@seglo seglo commented Mar 10, 2020

Motivation

Akka Streams SourceWithContext and FlowWithContext allows the user to propagate a context object per element in the stream. This generally includes metadata about the element such as source offset data or correlation IDs. For stages with 1:1 element mapping it’s easy to propagate contexts on behalf of the user to the transformed element. For stages that have 1:0 or 1:many transformations there is no one right way to propagate the context.

A common use case for contexts is to include positional/offset data from the source element’s system (i.e. a queue). In the case of Alpakka Kafka, there are sources that can propagate partition offset information in the context for each element, which can then be used to commit that an element has been fully processed. The default implementation of mapConcat today will propagate the same input context to all output elements, but with this strategy we don’t know if an element represents the last element from a previous 1:many transformation, so we can’t say the source offset has been fully processed until we wait for the next offset. If we wait for the next offset then commits will always be at least one element behind.

This document proposes a design that lets the user choose a strategy in situations where they’re using contexts and flatMap-like stages such as mapConcat and flatMapConcat.

Context Mapping Strategies

Same

Map the input context of the input element to all output elements on behalf of the user. This is the current behaviour of mapConcat and flatMapConcat and can remain the default behaviour when a strategy is not provided.

First

Allow the user to transform the context of the first element only given the input element and input context, and output element. All other elements will receive the input element’s context.

Last

Allow the user to transform the context of the last element only given the input element, input context, output element, and index of output element. All other elements will receive the input element’s context.

The Last strategy can also be used to implement an “Only” strategy, when only one output element exists.

Iterate

Allow the user to transform the context of each output element. For each output element the user can transform the context given the input element, input context, output element, output element index, and whether there is a next (hasNext) element.

The Iterate strategy is a low-level strategy that lets you map to an output context that can satisfy any 1:many use case.

The Iterate strategy can be used to recreate the Same, First, and/or Last strategy. It can also be used to implement an “Only” strategy, when only one output element exists.

Iterate or Empty

The iterate strategy, including a way to transform an “empty” use case when there are no output elements. This allows the user to push an output element and output context that represents a 1:0 mapping use case.

PR

This PR is an experimental branch demonstrating the above context mapping strategies in mapConcat and its variants.

@akka-ci akka-ci added validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed validating PR is currently being validated by Jenkins labels Mar 10, 2020
@akka-ci
Copy link

akka-ci commented Mar 10, 2020

Test FAILed.

@seglo seglo added the wip Work in progress PR, not ready for merge yet. label Mar 11, 2020
/**
* INTERNAL API
*/
@InternalApi private[akka] final class StatefulMapConcatWithContext[In, Ctx, Out](
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly a copy of StatefulMapConcat. Maybe they could be merged, if it makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't it just be using statefulMapConcat?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could, but I didn't update statefulMapConcat to use the new custom stage. A new stage is required to execute the UDFs at the right time.

Copy link
Member

@jrudolph jrudolph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good so far, @seglo! Some comments:

  • For a strategy to be included, it would be nice to havea basic example or explanation about why it is important and general enough to be used. The contextMapping in the first example seems somewhat useful but hard to say how generally useful it is.
  • You can already do whatever you want by temporarily dropping out of the WithContext API. So, whatever API we come up it needs to be better (for whatever metric) than what you can do with just via. E.g. in the example from the test
        val contextMapping =
        ContextMapStrategy.iterate(fn = (_: String, inCtx: OffsetContext, _: String, index: Int, hasNext: Boolean) => {
          if (index == 0 && hasNext) OffsetContext(inCtx.offset, First)
          else if (index == 0 && !hasNext) OffsetContext(inCtx.offset, Only)
          else if (!hasNext) OffsetContext(inCtx.offset, Last)
          else OffsetContext(inCtx.offset, Within)
        })
		// ...
        .mapConcat(
          { str =>
            List(1, 2, 3).map(i => s"$str-$i")
          },
          contextMapping
        )

can for the concrete case also be written as

.via(Flow[(String, OffsetContext)].mapConcat {
          case (in, OffsetContext(ctx, _)) =>
            Vector(
              s"$in-1" -> OffsetContext(ctx, First),
              s"$in-2" -> OffsetContext(ctx, Within),
              s"$in-3" -> OffsetContext(ctx, Last))
        })

or more generally as

        .via(Flow[(String, OffsetContext)].mapConcat {
          case (in, OffsetContext(ctx, _)) =>
            val result = List(1, 2, 3).map(i => s"$in-$i")

            if (result.isEmpty) Vector.empty
            else if (result.size == 1) result.map(_ -> OffsetContext(ctx, Only))
            else {
              (result.head -> OffsetContext(ctx, First)) +:
              result.drop(1).dropRight(1).map(_ -> OffsetContext(ctx, Within)) :+
              (result.last -> OffsetContext(ctx, Last))
            }
        })

I guess the two main benefits of the new approach would be:

  1. separation of concerns
  2. being able to reuse the context mapping strategies

It's nice that the API seems to cover most cases you could come up with but does it really need that flexibility? (I really don't know, maybe it is needed). One benefit would probably also be that the suggested API could work almost the same API-wise for flatMapConcat for which it would be much harder to change a given non-context usage into a WithContext usage manually (e.g. just doing something on the last element is pretty hard to do as you need to look one element into the future).

* output element [[Out]], index, and if the there is a next output element [[Out]]. This strategy can be used to
* satisfy all 1:many use cases.
*/
def iterate[In, Ctx, Out](fn: (In, Ctx, Out, Int, Boolean) => Ctx): Iterate[In, Ctx, Out] = Iterate(fn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API-wise, usages might look nicer if all the parameters would be put into a wrapper object.

* output element [[Out]], index, and if the there is a next output element [[Out]]. This strategy can be used to
* satisfy all 1:many use cases.
*/
def iterate[In, Ctx, Out](fn: (In, Ctx, Out, Int, Boolean) => Ctx): Iterate[In, Ctx, Out] = Iterate(fn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the index really needed? Otherwise, it could be isFirst: Boolean and isLast: Boolean if that's what the index would be used for usually.

@seglo
Copy link
Member Author

seglo commented Mar 11, 2020

Thanks for the feedback @jrudolph

For a strategy to be included, it would be nice to havea basic example or explanation about why it is important and general enough to be used. The contextMapping in the first example seems somewhat useful but hard to say how generally useful it is.

I could have elaborated on this further in the Motivation section. I first started thinking about this as a result of a request from Cloudflow for Alpakka Kafka. To enable at-least-once delivery guarantees in situations where users use mapConcat they have to know when all the output elements have been produced so that the consumed offset they were transformed from can be committed back to Kafka. Currently they always commit an offset when they encounter the next offset, which means they are always one offset behind when committing.

If an API exists for context mapping then we could provide a default strategy in Alpakka Kafka (or the user could implement their own) that can be re-used to indicate that the context of the last output element is the last element, then a commit could be performed.

You're right this could be handled without contexts at all at an app level, but this would be a useful way to separate this concern from the rest of the user's application code. There are use cases where this specific use case breaks down, such as when elements arrive out of order, but I think that's an edge case that's the responsibility of the user to deal with.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a good direction.

As @jrudolph points out, flatMapConcat would be important to try out as it doesn't offer these workarounds. The hasNext knowledge is not as easily available for it.

It might make sense to come up with other predefined strategies. Eg. Option based which map the context to Some for the first or last element and None for all others (that strategy would request for a Scala/Java distinction).

emptyFn: Option[(In, Ctx) => (Out, Ctx)] = None)

/**
* Passthrough the same context [[Ctx] of the input element [[In]] to all output elements [[Out]].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do [[ links work for type parameters? Ctx is missing one closing bracket.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I'll check.

@seglo
Copy link
Member Author

seglo commented Mar 12, 2020

As @jrudolph points out, flatMapConcat would be important to try out as it doesn't offer these workarounds. The hasNext knowledge is not as easily available for it.

Yes, that's a good point. I focused on mapConcat first because it was an easier use case, but could still demonstrate the proposed API. I'll implement flatMapConcat.

raboof and others added 4 commits March 28, 2020 10:50
This extends the ContextMapStrategy idea to mapAsyncUnordered. This
might help motivate the feature:

Previously, a big limitation of FlowWithContext is that it was somewhat
unclear what kinds of operations were allowed on streams in order not to
violate the expectations of the sinks that use that context.

For example, the current approach for dealing with Kafka commits in
contexts would allow filtering, but not reordering of the elements.

By putting this information in the type of the ContextMapStrategy,
components that care about the offsets (like Alpakka Kafka) can provide
a strategy (or multiple) that allows only those operations that are safe
for them.

Components that provide context-aware flows but don't care about the
offsets themselves (such as generic alpakka components) can indicate
their behaviour in the type of the strategy they accept.

The test shows 2 possible Kafka strategies, one allowing iteration (but
not reordering) and one allowing reordering (but not filtering), and 2
example streams showing how this can be composed in a typesafe way.

This still doesn't cover flatMapConcat. For flatMapConcat, there are some
trade-offs: i.e. in the Kafka case, if you absolutely want all input elements
to be committed, AFAICS we would have to delay each element (until we know
whether the next event is the next element or the stream completion), which is
quite a cost. If you're OK with skipping some commits for filtered-out
input elements (which I'd say is typically reasonable), you don't have to
incur this cost. Signalling whether this is necessary could also be done
from the mapping strategy, and we could provide 'strict' and 'loose' strategies
corresponding the the different behaviours. That seems to fit rather nicely.

(I think there is another option that would only delay the commits, also fit
flatMapMerge and reordering (even at the same time), while only possibly missing
commits at the 'end' of the stream - but let's not get into the woods too far
yet ;))
…xtended' into seglo/streamcontext-flatmap-strategy-merge
@akka-ci akka-ci added validating PR is currently being validated by Jenkins and removed needs-attention Indicates a PR validation failure (set by CI infrastructure) labels Apr 2, 2020
@akka-ci akka-ci added needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed validating PR is currently being validated by Jenkins labels Apr 2, 2020
@akka-ci
Copy link

akka-ci commented Apr 2, 2020

Test FAILed.

1 similar comment
@akka-ci
Copy link

akka-ci commented Apr 2, 2020

Test FAILed.

@ennru ennru mentioned this pull request Apr 6, 2020
26 tasks
@seglo
Copy link
Member Author

seglo commented Apr 14, 2020

This will take on a new direction. I'll use some aspects of this work in a new proposal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-attention Indicates a PR validation failure (set by CI infrastructure) t:stream wip Work in progress PR, not ready for merge yet.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants