Skip to content

[FLINK-7637] [kinesis] Fix at-least-once guarantee in FlinkKinesisProducer#4871

Closed
tzulitai wants to merge 7 commits intoapache:masterfrom
tzulitai:FLINK-7637
Closed

[FLINK-7637] [kinesis] Fix at-least-once guarantee in FlinkKinesisProducer#4871
tzulitai wants to merge 7 commits intoapache:masterfrom
tzulitai:FLINK-7637

Conversation

@tzulitai
Copy link
Copy Markdown
Contributor

What is the purpose of the change

Prior to this PR, there is no flushing of KPL outstanding records on checkpoints in the FlinkKinesisProducer. Likewise to the at-least-once issue on the Flink Kafka producer before, this may lead to data loss if there are asynchronous failing records after a checkpoint which the records was part of was completed.

Brief change log

  • Fix at-least-once in the Kinesis producer by properly flushing on checkpoints.
  • Minor fixes (last 2 commits) that cleans up the code.

Verifying this change

New unit tests are added to FlinkKinesisProducerTest to verify at-least-once.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? n/a

…ducer

Prior to this commit, there is no flushing of KPL outstanding records on
checkpoints in the FlinkKinesisProducer. Likewise to the at-least-once
issue on the Flink Kafka producer before, this may lead to data loss if
there are asynchronous failing records after a checkpoint which the
records was part of was completed.
if (this.producer == null) {
throw new RuntimeException("Kinesis producer has been closed");
}
if (thrownException != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it would be easier to review the code, if refactor (like extracting this code to a method) was in separate commit then "real" "production" changes. Especially if those production changes are pretty minimal in term of number of changes line codes :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will keep that in mind for the future 👍

break;
}
}
flushSync(kp);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this kp local variable? Without it, there would be no need to passe KinesisProducer as a param to flushSync, because flushSync() could just use this.producer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will change.

/**
* Check if there are any asynchronous exceptions. If so, rethrow the exception.
*/
private void checkAndPropagateAsyncError() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming that during this moving - copy/pasting - there were no changes in the code? (Btw, that's another reason why having refactors in separate commits makes reviewing easier ;) )

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, only a refactoring of the code to a separate method.


/**
* Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
* it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you forget to set the timeout?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, will add.

*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testAtLeastOnceProducer() throws Throwable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this test is good enough:

  1. It is not testing the code that it should :( it is using overriden version of the FlinkKinesisProducer - DummyFlinkKinesisProducer can hide bugs in the real implementation.
  2. Implementing it as a unit test with mocks, doesn't test for out integration with Kinesis. You made some assumption how at-least-once should be implemented, you implemented it in production code and here you are repeating the same code using the same assumptions :(

However looking at Kafka tests instability I'm not sure which approach is worse... Unless those are not tests instabilities but bugs in our code, which Kafka's ITCases are triggering from time to time - this mockito based test would not discover such bugs.

Copy link
Copy Markdown
Contributor Author

@tzulitai tzulitai Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnowojski I can see your point. Regarding your concerns:

For 1.: I think it is still ok, since the DummyFlinkKinesisProducer only overrides the getKinesisProducer method to implement a mock producer. Also, while the snapshotState method is overriden, I'm only overriding it to inject sync-point latches. The flushing behaviour of the snapshotState's implementation should still be guarded by this test.
For 2.: The lack of integration tests with Kinesis has always been an issue. There simply is no simple way to implement IT tests for that.

What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still not convinced on the value of such tests (reverse implementing the production code), but I will not press it since:

There simply is no simple way to implement IT tests for that.

try {
testHarness.snapshot(123L, 123L);
} catch (Exception e) {
// the next invoke should rethrow the async exception
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the comment refers to invoke, which is probably copy-pasted form above

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, will fix.

snapshotThread.sync();
} catch (Exception e) {
// the next invoke should rethrow the async exception
e.printStackTrace();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover printing.

checkAndPropagateAsyncError();

flushSync(producer);
if (producer.getOutstandingRecordsCount() > 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if records are added by another thread between the calls of flushSync() and producer.getOutstandingRecordsCount()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bowenli86 I don't think that would happen. Records are added to the producer only in invoke, which is guaranteed to not be executed concurrently with snapshotState.

@tzulitai
Copy link
Copy Markdown
Contributor Author

@pnowojski @aljoscha @bowenli86 thanks a lot for the reviews.
I've either addressed them with the follow up commits or left comments.

@bowenli86
Copy link
Copy Markdown
Member

LGTM 👍

@tzulitai
Copy link
Copy Markdown
Contributor Author

tzulitai commented Oct 25, 2017

Thanks.

I made one last change: allow direct import of non-shaded guava in FlinkKinesisProducerTest.
The reason for this is that the Kinesis API directly exposes Guava, so we can't use the Flink shaded dependencies. We already allow direct guava import in FlinkKinesisProducer, so I'm following the same approach here.

Will proceed to merge if Travis gives green.
cc @aljoscha in case you want a final pass before that happens!

@aljoscha
Copy link
Copy Markdown
Contributor

Thanks for the heads-up but I think this looks good!

@tzulitai
Copy link
Copy Markdown
Contributor Author

Merging ..

@asfgit asfgit closed this in 073b82c Oct 25, 2017
tzulitai added a commit to tzulitai/flink that referenced this pull request Oct 25, 2017
…ducer

Prior to this commit, there is no flushing of KPL outstanding records on
checkpoints in the FlinkKinesisProducer. Likewise to the at-least-once
issue on the Flink Kafka producer before, this may lead to data loss if
there are asynchronous failing records after a checkpoint which the
records was part of was completed.

This closes apache#4871.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants