Skip to content

Conversation

@akalash
Copy link
Contributor

@akalash akalash commented Apr 6, 2021

… failed

What is the purpose of the change

This PR adds the invocation of task cancel before clean up method when the execution was failed which allows sending cancel signal to custom code in order to avoiding hangs

Brief change log

(for example:)

  • Invocation of cancelTask before cleanUpInvoke when the invocation is failed
  • Invocation of declineCheckpoint instead of the propagation of exception when performCheckpoint failed with exception

Verifying this change

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for failing checkpoint in snapshotState method(CheckpointFailureManagerITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@akalash akalash marked this pull request as draft April 6, 2021 15:27
@flinkbot
Copy link
Collaborator

flinkbot commented Apr 6, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 2b29cc5 (Sat Aug 28 11:14:15 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 6, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@akalash
Copy link
Contributor Author

akalash commented Apr 8, 2021

@flinkbot run azure

@akalash akalash marked this pull request as ready for review April 8, 2021 07:59
@akalash
Copy link
Contributor Author

akalash commented Apr 9, 2021

@flinkbot run azure

1 similar comment
@akalash
Copy link
Contributor Author

akalash commented Apr 12, 2021

@flinkbot run azure

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @akalash , production code changes look good to me.

I have some concerns about the test, PTAL at the comments below.

I'd also consider putting the test into StreamTaskTest (and not in CheckpointFailureManagerITCase as the fix has nothing to do with Failure Manager).

public void testSourceFailureTriggerJobFailed() throws Exception {
// given: Environment with failed source and no restart strategy.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this big interval is needed to ensure that the LegacySourceThread has actually started.
Could we use some more explicit mean instead? (like future / latch / condition)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think about the true target of this property. I will take a look and indeed change it to a latch if possible.

Comment on lines 139 to 141
public void run(SourceContext<String> ctx) throws Exception {
while (running) {
ctx.collect("test");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use ctx.collect, synchronized (ctx.getCheckpointLock()) must be used.
But probably we don't need to emit anything here? Can we just block on some contidion (like latch.await).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latch.await is not ok because the target is the simulation of the loop with checking 'running' flag. But it can be easily replaced by parkNanos for example.

private static class FailedSource extends RichParallelSourceFunction<String>
implements CheckpointedFunction {

public static final AtomicInteger INITIALIZE_TIMES = new AtomicInteger(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable ideally needs to be reset before running the test.

Comment on lines 114 to 119
Optional<RuntimeException> throwable =
findThrowable(jobException, RuntimeException.class);

// then: Job failed with expected exception.
assertTrue(throwable.isPresent());
assertEquals(FailedSource.SOURCE_FAILED_MSG, throwable.get().getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These assertions seem a bit fragile to me (what if Flink wraps an exception into RuntimeException?).
And they are not actually checking the production code, but the test itself: without the fix the test will time out; and without the exception thrown the job might have exited for some other reason.

But the setup is quite simple IMO, so I'd remove them.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense what you said but the main target to check that the cluster fails only by expected reason. But anyway I think I will rework this place to avoid fragility

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR @akalash ,
it LGTM except that I'd replace parkNanos with something interruptiible (please see comment below).

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR @akalash, LGTM.
I'd merge it once tests finish.

@rkhachatryan
Copy link
Contributor

Merged into master 345bf34.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants