Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip non-deserializable records #5269

Closed
wants to merge 4 commits into from

Conversation

tzulitai
Copy link
Contributor

What is the purpose of the change

This PR is based on #5268, which includes fixes to harden Kinesis unit tests. Only the last commit is relevant.

In the past, we allowed the Flink Kafka Consumer to skip corrupted Kafka records which cannot be deserialized. In reality pipelines, it is entirely normal that this could happen.

This PR adds this functionality to the Flink Kinesis Consumer also.

Brief change log

  • Clarify in Javadoc of KinesisDeserializationSchema that null can be returned if a message cannot be deserialized.
  • If record is null in KinesisDataFetcher::emitRecordAndUpdateState(...), do not collect any output for the record.
  • Add KinesisDataFetcherTest::testSkipCorruptedRecord() to verify feature.

Verifying this change

Additional KinesisDataFetcherTest::testSkipCorruptedRecord() test verifies this change.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@@ -484,7 +484,10 @@ protected Properties getConsumerConfiguration() {
*/
protected final void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collectWithTimestamp(record, recordTimestamp);
if (record != null) {
sourceContext.collectWithTimestamp(record, recordTimestamp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we silently skipping the record or do we log somewhere that a record was invalid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently do not have a log for that.
I'll add a warning log if the record is null.

for (int i = 0; i < numShards; i++) {
fetcher.emitRecordAndUpdateState("record-" + i, 10L, i, new SequenceNumber("seq-num-1"));
assertEquals(new SequenceNumber("seq-num-1"), testShardStates.get(i).getLastProcessedSequenceNum());
assertEquals(new StreamRecord<>("record-" + i, 10L), sourceContext.removeLatestOutput());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good but I'm concerned about the visibility of this behavior.

@tzulitai
Copy link
Contributor Author

@zentol
The contract is that if KinesisDeserializationSchema.deserialize() returns null, that record will be skipped. I agree it makes sense to have a add a warning log if this happens.

…nesisDataFetcherTest and related classes

The previous implementation of the TestableKinesisDataFetcher was
confusing in various ways, causing it hard to be re-used for other
tests. This commit contains the following various cleaups:

- Remove confusing mocks of source context and checkpoint lock. We now
  allow users of the TestableKinesisDataFetcher to provide a source
  context, which should provide the checkpoint lock.
- Remove override of emitRecordAndUpdateState(). Strictly speaking, that
  method should be final. It was previously overriden to allow
  verifying how many records were output by the fetcher. That
  verification would be better implemented within a mock source context.
- Properly parameterize the output type for the
  TestableKinesisDataFetcher.
- Remove use of PowerMockito in KinesisDataFetcherTest.
- Use CheckedThreads to properly capture any exceptions in fetcher /
  consumer threads in unit tests.
- Use assertEquals / assertNull instead of assertTrue where-ever
  appropriate.
Prior to this commit, several unit tests in KinesisDataFetcherTest
relied on sleeps to wait until a certain operation happens, in order for
the test to pass.

This commit removes those sleeps and replaces the test behaviours with
OneShotLatches.
This commit acknowledges that null can be returned from the
deserialization schema, if the message cannot be deserialized. If null
is returned for a Kinesis record, no output is produced for that record,
while the sequence number in the shard state is still advanced so that
the record is effectively accounted as processed.
@tzulitai
Copy link
Contributor Author

tzulitai commented Jan 31, 2018

@zentol I've rebased, and added a warning when skipping records. Could you have another quick look?

@zentol
Copy link
Contributor

zentol commented Jan 31, 2018

looks good, +1

@tzulitai
Copy link
Contributor Author

Thanks, merging ..

tzulitai added a commit to tzulitai/flink that referenced this pull request Feb 1, 2018
This commit acknowledges that null can be returned from the
deserialization schema, if the message cannot be deserialized. If null
is returned for a Kinesis record, no output is produced for that record,
while the sequence number in the shard state is still advanced so that
the record is effectively accounted as processed.

This closes apache#5269.
tzulitai added a commit to tzulitai/flink that referenced this pull request Feb 2, 2018
This commit acknowledges that null can be returned from the
deserialization schema, if the message cannot be deserialized. If null
is returned for a Kinesis record, no output is produced for that record,
while the sequence number in the shard state is still advanced so that
the record is effectively accounted as processed.

This closes apache#5269.
@asfgit asfgit closed this in 3067777 Feb 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants