Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster #7099

Merged
merged 3 commits into from
Feb 17, 2019

Conversation

jgrier
Copy link
Member

@jgrier jgrier commented Nov 15, 2018

What is the purpose of the change

This commit adds a JobMaster RPC endpoint that is used to for global information sharing. One use case will be event time source synchronization where it will be used to share watermarks but there are others. It takes the form of a set of named aggregates that can be updated by a client-supplied AggregateFunction.

Note that the RPC endpoint accepts a serialized AggregateFunction in the form of a byte array. We need to do this so that we can deserialize this using the UserCodeClassLoader. The normal RpcService path does not use the UserCodeClassLoader nor is there any easy way to make it do so.

This PR also includes the code/wiring neccessary to expose this functionality to user functions via the StreamingRuntimeEnvironment.

The PR seems large but it is mostly wiring. To quickly assess the changes I suggest looking at the following classes:

  • GlobalAggregateManager (to understand the API)
  • RpcGlobalAggregateManager (to see the client-side RPC with the JobMaster)
  • JobMaster / JobMasterGateway (server side implementation of the above)
  • GlobalAggregateManagerITCase (for typical usage from user code)

Most of the rest of the PR is just wiring it all up.

Brief change log

  • New RPC endpoint on JobMaster to create, update, and retrieve named aggregates.
  • Updated JobMaster Tests
  • Client side exposure of above endpoint via the StreamingRuntimeEnvironment and GlobalAggregateManager classes.
  • Integration test exercising typical usage from user code.

Verifying this change

This change added tests and can be verified as follows:

  • JobMasterTest
  • GlobalAggregateManagerITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): No
  • The public API, i.e., is any changed class annotated with @Public(Evolving): Yes
  • The serializers: No
  • The runtime per-record code paths (performance sensitive): No
  • Anything that affects deployment or recovery: No
  • The S3 file system connector: No

Documentation

  • Does this pull request introduce a new feature? No
  • If yes, how is the feature documented? not applicable

public class SourceWatermark implements Serializable {

private static final long serialVersionUID = 1L;
private long timestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the timestamp represent? Is it when the watermark last changed or when it was last communicated by the subtask (even if it did not change, for example because the subtask is just reading a lot of data under the same watermark). We will need a way to detect that a source subtask is idle so we can avoid waiting for it (similar to how we has to identify idle within a subtask).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamp here is meant to represent the watermark itself -- the current low watermark for the sub-task that sent it.

I do agree, however, that we will also need to know at what time the watermark was sent so that we can ignore it if it hasn't been updated in some configurable amount of time.

Very good point.

/**
* This represents the watermark for a single source partition.
*/
public class SourceWatermark implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it should be qualified as SourceWatermark vs. just Watermark? Perhaps there are use cases for exchanging watermarks across subtasks that don't necessarily belong to a source. One such example could be operators that perform asynchronous operations. Related, do we want to allow for an identifier for the watermark so that within an application multiple independent groupings could be formed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a bit far fetched, but can it be generalized further to something like a named counter/metric? Currently there isn't anything watermark specific here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One problem is that there is already a Watermark class, but I agree with Thomas' comment. In the future, not all "sources" might be actual physical sources in the pipeline.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my intention was to keep this very focused on the exact use case at hand -- to provide simple state sharing for watermarks in the service of the source synchronization effort. This is why the very specific naming and lack of additional features like namespaces, etc.

If we were to generalize this more it would be good to understand some other specific use cases -- and also to consider whether it's important to tackle that here or just go with the simplest interface we need for the task at hand.

@tweise @aljoscha If we do something more general what are you thinking? Something more like a hash table or a collection of namespaced hashtables? Would we need to make the key and value types generic, etc? Would we want to then distribute the entire hashtable to every sub-task?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could imagine scenarios where different sources have different synchronization. That could be supported with a grouping mechanism for the tasks that participate in the watermark sync. The RPC would pass the group/namespace identifier as additional parameter and only get back the watermark for that (hash table would remain internal).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Will update shortly.

@StephanEwen
Copy link
Contributor

I think this is a very nice feature, +1 to have this.

We have seen other use cases that need a similar mechanism, so I am wondering if we can generify this to a some transient aggregator. One of those use case would need the max across all values and is otherwise almost the same.

@jgrier
Copy link
Member Author

jgrier commented Dec 3, 2018

Sorry I haven't responded to this. We had a baby boy this week so that has kept me pretty busy ;)

Okay, so I'm on board with generifying this further. @StephanEwen if we're to do a generic transient aggregator do you mean to allow the client to provide the aggregation function? In this case the API would look something like this:

/**
 * Update the aggregate and return the new value.
 *
 * @param aggregateName The name of the aggregate to update
 * @param aggregand The value to add to the aggregate
 * @param aggregationFunction The function to apply to the current aggregate and aggregand to obtain the new aggregate value
 * @return The updated aggregate
CompletableFuture<Object> updateAggregate(
      String aggregateName,
      Object aggregand,
      AggregateFunction aggregationFunction);

Is something like this what you had in mind?

@jgrier
Copy link
Member Author

jgrier commented Dec 6, 2018

@aljoscha @tweise Can you guys comment on the above generic aggregator proposal? I'd like to keep this moving forward.

@aljoscha
Copy link
Contributor

aljoscha commented Dec 7, 2018

Can we always assume that the user-jar/class loader will be available where the AggregateFunction is needed? If yes, I think this is a nice approach! (We can probably use concrete types in the interface, though)

@aljoscha
Copy link
Contributor

aljoscha commented Dec 7, 2018

Also, congratulations, I guess! 🎉

@jgrier
Copy link
Member Author

jgrier commented Dec 7, 2018

Can we always assume that the user-jar/class loader will be available where the AggregateFunction is needed? If yes, I think this is a nice approach! (We can probably use concrete types in the interface, though)

@aljoscha I'm actually not sure if the user code classloader is available from the JobMaster but I would think that's reasonable since there's a 1:1 relationship between the JobMaster and a single job.

WRT concrete types in the RPC interface I'm not sure what you're thinking there. The concrete types are not known in this approach. The types are up to the user/client and can be different for each named aggregate.

@tweise
Copy link
Contributor

tweise commented Dec 7, 2018

@aljoscha @tweise Can you guys comment on the above generic aggregator proposal? I'd like to keep this moving forward.

@jgrier +1 as suggested earlier. We just need to confirm that the class of aggregationFunction is available.

@jgrier
Copy link
Member Author

jgrier commented Jan 25, 2019

@tweise Have a look.. Also, I didn't ignore the input about "idleness" issues but that will be handled by the particular aggregation function used. For the example of event time source sync we will want to do this but not in general.

Copy link
Contributor

@tweise tweise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgrier looks good! I also think the watermark and timeout specific logic can be handled with an aggregation function that retains the latest entry for each subtask ID, just like we do in the ZK based implementation.


AggregateFunction aggregateFunction = InstantiationUtil.deserializeObject(serializedAggregateFunction, userCodeLoader);

Object accumulator = accumulators.get(aggregateName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs synchronization? Or is all access to JobMaster already synchronized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's already all synchronized. The RpcService is implemented as as a single Akka actor and thus access is already serialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgrier
Copy link
Member Author

jgrier commented Jan 31, 2019

@tweise @aljoscha @StephanEwen I think this is in a good state for final review and merge. Take a look when you get the chance please.

…to share information across source subtasks.

This will be used implement things like event time source synchronization across sources.  This functionality can be accessed from user code via the StreamingRuntimeEnvironment.
@tweise
Copy link
Contributor

tweise commented Feb 1, 2019

@jgrier thanks for the update, will take a look soon.

Meanwhile, we will put this to work internally.

@tweise tweise merged commit 32c822a into apache:master Feb 17, 2019
klion26 added a commit to klion26/flink that referenced this pull request Feb 18, 2019
Summary:
This closes apache#7662.

[FILNK-11597][test] Remove legacy JobManagerActorTestUtils (apache#7700)

[FLINK-11081][rest] Support server port range

[hotfix][runtime] Fix mistake in RestfulGateway and JobMasterGateway Javadoc

Method requestOperatorBackPressureStats never returns a future that completes
with null.

[FLINK-11578][runtime] Expose MiniCluster#getDispatcherGatewayFuture for testing

[FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base

This closes apache#7697.

[FLINK-11626][build] Bump flink-shaded to 6.0

[FLINK-11628][travis] Cache maven

[FLINK-11424][metrics] Properly remove string/failing gauges

[hotfix] Reorder StandaloneJobClusterEntryPoint class members

[hotfix] Improve error message in JobID.fromHexString

[FLINK-11545] [container] Add job ID to StandaloneJobClusterConfiguration

[FLINK-11545] [container] Add null checks and reduce visibility in StandaloneJobClusterConfiguration

Fix line breaks in StandaloneJobClusterConfiguration

[FLINK-11545] [container] Parse job ID in StandaloneJobClusterConfigurationParserFactory

[pr-review] Make getJobId static

[pr-review] Add expected format and example to error message

Fix checkstyle

Don't wrap JobID.fromHexString error message in StandaloneJobClusterConfigurationParserFactory

[pr-review] Add test for short options

[FLINK-11545] [container] Pass job ID to ClassPathJobGraphRetriever

[FLINK-11545] [container] Catch Exception before exiting with error

[FLINK-11545] [container] Add docs for job-id argument

[hotfix][tests] Remove mocking from ResourceManagerJobMasterTest

[hotfix][tests] Remove mocking from ResourceManagerTaskExecutorTest

[hotfix][tests] Refactor ResourceManagerTest to use Before and After methods

[FLINK-11596][test] Remove legacy ResourceManagerTest

[FLINK-11596][tests] Add ResourceManagerTaskExecutorTest#testDisconnectTaskExecutor

[FLINK-11596][tests] Add heartbeat timeout test to ResourceManagerTest

- Add ResourceManagerTest#testHeartbeatTimeoutWithJobMaster
- Add ResourceManagerTest#testHeartbeatTimeoutWithTaskExecutor

This closes apache#7698.

[hotfix][travis] Remove stray slash

[hotfix][build] Remove hard-coded scala version

[FLINK-11154][network] Bump Netty to 4.1.32

Notable changes since 4.1.24:
- big improvements (performance, feature set) for using openSSL based
  SSL engine (useful for FLINK-9816)
- allow multiple shaded versions of the same netty artifact (as long
  as the shaded prefix is different)
- Ensure ByteToMessageDecoder.Cumulator implementations always release
- Don't re-arm timerfd each epoll_wait
- Use a non-volatile read for ensureAccessible() whenever possible to
  reduce overhead and allow better inlining.
- Do not fail on runtime when an older version of Log4J2 is on the
  classpath
- Fix leak and corruption bugs in CompositeByteBuf
- Add support for TLSv1.3
- Harden ref-counting concurrency semantics
- bug fixes
- Java 9-12 related fixes

- no license changes
- no changes in Netty's NOTICE file

[FLINK-11577][tests, runtime] Improve test coverage of stack trace sampling in TM

- Extract stack trace sampling logic to StackTraceSampleService
- Add unit tests for StackTraceSampleService

[FLINK-11577][tests] Delete obsolete test StackTraceSampleCoordinatorITCase

- Test could silently fail
- Test had no assertions
- Test is superseded by StackTraceSampleServiceTest

[FLINK-10887] [jobmaster] Add global aggregate tracking to the JobMaster (apache#7099)

This adds a JobMaster RPC endpoint that is used to share information across source subtasks.

This will be used implement things like event time source synchronization across sources.  This functionality can be accessed from user code via the StreamingRuntimeEnvironment.

init commit without migration test

Differential Revision: https://aone.alibaba-inc.com/code/D839631
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants