Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19001] Add data stream api interoperability #133

Merged
merged 23 commits into from Aug 26, 2020

Conversation

igalshilman
Copy link
Contributor

@igalshilman igalshilman commented Aug 19, 2020

Add DataStream API interoperability

This PR adds an extension the stateful function SDK that allows embedding stateful function applications into a data stream program.

This integration allows:

  • defining DataStreams as StateFun ingresses
  • binding one or more stateful functions
  • binding remote RequestReply functions
  • and finally obtaining StateFun egresses as DataStreams.

Here is a short example snippet of how to insert a StateFun pipeline into a regular DataStream pipeline. (full example)

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  
  DataStream<String> namesStream = ... ;  

  DataStream<RoutableMessage> names =
        namesStream
            .map(
                name ->
                    RoutableMessageBuilder.builder()
                        .withTargetAddress(GREET, name)
                        .withMessageBody(name)
                        .build());

  ...
  StatefulFunctionEgressStreams out =
        StatefulFunctionDataStreamBuilder.builder("example")
            .withDataStreamAsIngress(names)
            .withFunctionProvider(GREET, unused -> new MyFunction())
            .withRequestReplyRemoteFunction(
                requestReplyFunctionBuilder(
                        REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
                    .withPersistedState("seen_count")
                    .withMaxRequestDuration(Duration.ofSeconds(15))
                    .withMaxNumBatchRequests(500))
            .withEgressId(GREETINGS)
            .withConfiguration(statefunConfig)
            .build(env);
  ...
  • A RoutableMessage is the entry point to the StateFun pipeline.
  • Then it is possible to register 1 or more DataStream<RoutableMessage> as ingresses.
  • StateFun would deliver the payloads associated with the RoutableMessage to the appropriate stateful function instance.
  • Egress ids has to be explicitly defined, and they can be collected as a DataStream from StatefulFunctionEgressStreams.

The following is a short description of the changes (not in the same commit order)

  • Add a statefun-flink/statefun-flink-datastream for the new SDK.
  • Added a new example under examples/
  • Restructured the translation logic so that it can be used both from the data stream api and the regular statefun sdk.

This builder is intended to be used by the DataStream embedded users,
and it exposes only the RoutableMessage interface.
This commit adds the ability to construct a Sources representation
from an already existing list of DataStreams that was obtained
by the embedded users.
This module holds the builder classes, but it is here mainly for
packaging reasons. We need to provide a jar-with-dependencies,
and we can not use statefun-distribution since it also contains
the connectors that users don't need.
flink-streaming-java should always be with scope provided. It wasn't
explicitly needed before, because this artifact was not part of any
jar-with-dependencies.
Since the move to 1.11, Flink executors are refactored
out of flink-streaming-java and moved into flink-clients
which is a required dependency now.
This commit serves multiple purposes:
1) Since the providers are initalized also during planning,
making the http client lazily initalized would improve
planning time.
2) It would allow making the provider serializable.
FunctionType is a very simple value type, that is now also
used by the data-stream api bridge (requires java serialization)
This commit makes the internal class HttpFunctionSpec java
serializable, so that it can be used from the statefun-datastream-api.
If at any point we will decide to add a non (trivally serializable) member
to this class, then we can undo this change and provide a serializable
"twin" at the datastream api.
This commits adds a serializable http function provider
that delegates to the real http function provider at runtime.
This commits add a request reply remote function builder,
to be used as a public API.
This class delegates its methods to the non-public (internal)
HttpSpec.Builder class.
@tzulitai tzulitai self-requested a review August 20, 2020 06:36
@sjwiesman
Copy link
Contributor

#docs

@igalshilman
Copy link
Contributor Author

Would be followed on the docs sprint, thanks for the reminder.

@tzulitai tzulitai merged commit fb66deb into apache:master Aug 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants