Skip to content

Conversation

@tisonkun
Copy link
Member

What is the purpose of the change

This pull request is a rebase on master of #10185 .

We generally enrich JobClient API as described in FLIP-74 as well as let ClusterClient#submitJob returns a CompletableFuture of JobClient.

For now I think we may or may not introduce dedicated tests because the only implementation is a thin wrapper of ClusterClient. Maybe we can defer the test set until other implementation comes because testing a wrapper gains us little.

Another tricky thing is about lifecycle management a.k.a. whether or not close ClusterClient on ClusterClientJobClientAdapter closed. Currently I use a boolean parameter moveOwnship for explicitly setting, but still looking for other solution.

Verifying this change

This change is already covered by existing tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes, it enriches JobClient which is to be public API).
  • If yes, how is the feature documented? (JavaDocs)

cc @aljoscha @kl0u

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 25, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit e661746 (Wed Dec 04 15:10:14 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 25, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build

@aljoscha aljoscha self-assigned this Nov 25, 2019
@aljoscha aljoscha requested review from aljoscha and kl0u November 25, 2019 11:21
Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tisonkun , thanks a lot for the work. As an overarching comment, I noticed that you add a flag moveOwnership to the ClusterClientJobClientAdapter and this also bubbles up till the ClusterClient.submitJob().

I am wondering if this is needed, and from a conceptual point of view, I lean towards the no. As a first point, I noticed that the only places that this is set to false are the ClientUtils.submitJob... which are mainly used in tests. And second, why not closing the ClusterClient always, when the JobClient closes (which is the responsibility of the call-site), and just let the user decide when to close the job client.

If the user wants to do something with the JobClient, he/she can create a new one (although we still need to figure out how to "retrieve" a jobClient).

WDYT?

@kl0u
Copy link
Contributor

kl0u commented Nov 25, 2019

Also we could simply return CompletableFuture<Void> from the JobClient.cancel() method, instead of moving classes around. On this, I do not have a strong opinion, but do you think that there is any particular reason why returning an Acknowledge instead of Void?

@kl0u kl0u self-assigned this Nov 25, 2019
@tisonkun
Copy link
Member Author

Hi @tisonkun , thanks a lot for the work. As an overarching comment, I noticed that you add a flag moveOwnership to the ClusterClientJobClientAdapter and this also bubbles up till the ClusterClient.submitJob().

I am wondering if this is needed, and from a conceptual point of view, I lean towards the no. As a first point, I noticed that the only places that this is set to false are the ClientUtils.submitJob... which are mainly used in tests. And second, why not closing the ClusterClient always, when the JobClient closes (which is the responsibility of the call-site), and just let the user decide when to close the job client.

If the user wants to do something with the JobClient, he/she can create a new one (although we still need to figure out how to "retrieve" a jobClient).

WDYT?

Yes I also tend not to do so. At that moment I was a bit delirious for thinking about whether or not close cluster client on job client closed :/

for shutting down things, I think it is still a configurable action whether or not we close cluster client on job client close because cluster client "spawns" job client and maybe we call submitJob multiple times within one cluster client(normally for job management platform). Neither we want to spawn cluster client per job nor we want to close the shared cluster client on job client closed. The point here is "who is responsible for closing cluster client? job client or the caller?"

I push a commit daf85a7 for customizing actions on closed for define such manner while considering code quality. Does it make sense to you?

@tisonkun
Copy link
Member Author

tisonkun commented Nov 26, 2019

Also we could simply return CompletableFuture<Void> from the JobClient.cancel() method, instead of moving classes around. On this, I do not have a strong opinion, but do you think that there is any particular reason why returning an Acknowledge instead of Void?

I've ever thought of Void. Void should work well atm. My concern is

  1. If we keep in mind the possibility that a rpc based implementation of JobClient, for current implementation akka doesn't allow null message. Although we don't stick to use akka as rpc implementation, a non-null unit value is better than null representing Void.
  2. Unfortunately Java doesn't have a builtin non-null unit value so that many of Java projects have to implement their own. I think Acknowledge its Flink's unit value which is reasonable to move into flink-core.

As for this point, I don't stick to using Acknowledge and could just using Void if you ask or better, share some advantages of Void.

Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tisonkun ! Thanks for the work.

I left some comments and some additional remarks are:

  1. I would say to not move the JobStatus class from the runtime to the core but rather create a copy of the enum and put in the core, with a method that maps the runtime JobStatus to a core JobStatus. The JobClient will return a core JobStatus.
  2. I would not move the Acknowledge to the core but make the corresponding methods of the JobClient return a Void.

Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work @tisonkun ! I left some minor comments. After integrating them and having a green light on Travis, feel free to merge.

}

@Override
public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(ClassLoader classLoader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use Optional here? It seems OptionalFailure is a bit strange because it does not behave like an Optional, it's more like an Either type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it's better to just return a CompletableFuture<Map<String, Object>>. We can fail the future if there is any failure in the map but otherwise just return a complete map. I don't think it's useful for the user to have to do all the unpacking. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use Map<String, OptionalFailure<Object>> as type of accumulators in JobExecutionResult & ClusterClient already. The investigation how we treat of such unpack things is independent and out of the scope here in my opinion. Shall we open a ticket for take it into consideration and prevent this one to be too complex?

@tisonkun
Copy link
Member Author

Address comment. @aljoscha please take a look about the update as well as my reply above.

@tisonkun
Copy link
Member Author

cherry-pick from slack channel. feel free to react wherever you like.

Sorry but when rebasing I cannot convince myself about why we introduce a flink-core variant of JobStatus? ClusterClient will return runtime JobStatus while JobClient returns JobStatus. It doesn’t make sense to me for introducing such different.
Runtime version JobStatus doesn’t depend on anything inside runtime but a self-contained enum. Shall we add it into o.a.f.api.common? Different from ClosureCleaner which could be used by connectors I think JobStatus is previously totally internal concept that should not breaks user setups and dependencies if we move it.

I’ve pushed a set of commits that we all agree on. The remain problem is about getJobStatus and getAccumulator

for getJobStatus the main concern is about where JobStatus stays and whether we introduce a variant of JobStatus. My opinion is above.
for getAccumulator the main concern is about whether Flink does unpack job for the user. I think we can do so, but maybe in another pass of pull request so that we firstly move forward this set under consensus.
So my idea is that we commit this set of commit as part 1 of FLINK-14762 and I start a new pull request refactor getAccumulator and then implement its JobClient interface. While let’s align about JobStatus .
Another coin about JobStatus is that we already display this sort of status on WebUI so it is reasonable to be core/common api(at least it is effectively user-facing).

@tisonkun
Copy link
Member Author

tison 10:48 AM
I narrow the change set to only unwrap accumulator inside client codes. Here is the diff 6d8f1af
So the remain concern from my side is about core variant of JobStatus . I will be ok if you can describe how we deal with these two JobStatus in the future.

@aljoscha
Copy link
Contributor

This looks good for JobStatus now!

@aljoscha
Copy link
Contributor

I think this is good to merge now! 💐

@tisonkun
Copy link
Member Author

travis fails unstably on a known issue https://issues.apache.org/jira/browse/FLINK-14894 which cannot be reproduced locally

merging now...

thanks for your review!

@tisonkun tisonkun closed this in 3898a4b Nov 29, 2019
@tisonkun tisonkun deleted the FLINK-14762 branch November 29, 2019 13:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants