Skip to content

Commit

Permalink
[FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip records
Browse files Browse the repository at this point in the history
This commit acknowledges that null can be returned from the
deserialization schema, if the message cannot be deserialized. If null
is returned for a Kinesis record, no output is produced for that record,
while the sequence number in the shard state is still advanced so that
the record is effectively accounted as processed.
  • Loading branch information
tzulitai committed Jan 10, 2018
1 parent 547d19f commit 8d2b086
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,10 @@ protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
*/
protected final void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collectWithTimestamp(record, recordTimestamp);
if (record != null) {
sourceContext.collectWithTimestamp(record, recordTimestamp);
}

updateState(shardStateIndex, lastSequenceNumber);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

/**
* Deserializes a Kinesis record's bytes.
* Deserializes a Kinesis record's bytes. If the record cannot be deserialized, {@code null}
* may be returned. This informs the Flink Kinesis Consumer to process the Kinesis record
* without producing any output for it, i.e. effectively "skipping" the record.
*
* @param recordValue the record's value as a byte array
* @param partitionKey the record's partition key at the time of writing
* @param seqNum the sequence number of this record in the Kinesis shard
* @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
* @param stream the name of the Kinesis stream that this record was sent to
* @param shardId The identifier of the shard the record was sent to
* @return the deserialized message as an Java object
*
* @return the deserialized message as an Java object ({@code null if the message cannot be deserialized).
* @throws IOException
*/
T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -86,6 +88,56 @@ public void testIfNoShardsAreFoundShouldThrowException() throws Exception {
fetcher.runFetcher(); // this should throw RuntimeException
}

@Test
public void testSkipCorruptedRecord() throws Exception {
final String stream = "fakeStream";
final int numShards = 3;

final LinkedList<KinesisStreamShardState> testShardStates = new LinkedList<>();
final TestSourceContext<String> sourceContext = new TestSourceContext<>();

final TestableKinesisDataFetcher<String> fetcher = new TestableKinesisDataFetcher<>(
Collections.singletonList(stream),
sourceContext,
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
1,
0,
new AtomicReference<>(),
testShardStates,
new HashMap<>(),
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(Collections.singletonMap(stream, numShards)));

// FlinkKinesisConsumer is responsible for setting up the fetcher before it can be run;
// run the consumer until it reaches the point where the fetcher starts to run
final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(TestUtils.getStandardProperties(), fetcher, 1, 0);

CheckedThread consumerThread = new CheckedThread() {
@Override
public void go() throws Exception {
consumer.run(new TestSourceContext<>());
}
};
consumerThread.start();

fetcher.waitUntilRun();
consumer.cancel();
consumerThread.sync();

assertEquals(numShards, testShardStates.size());

for (int i = 0; i < numShards; i++) {
fetcher.emitRecordAndUpdateState("record-" + i, 10L, i, new SequenceNumber("seq-num-1"));
assertEquals(new SequenceNumber("seq-num-1"), testShardStates.get(i).getLastProcessedSequenceNum());
assertEquals(new StreamRecord<>("record-" + i, 10L), sourceContext.removeLatestOutput());
}

// emitting a null (i.e., a corrupt record) should not produce any output, but still have the shard state updated
fetcher.emitRecordAndUpdateState(null, 10L, 1, new SequenceNumber("seq-num-2"));
assertEquals(new SequenceNumber("seq-num-2"), testShardStates.get(1).getLastProcessedSequenceNum());
assertEquals(null, sourceContext.removeLatestOutput()); // no output should have been collected
}

@Test
public void testStreamToLastSeenShardStateIsCorrectlySetWhenNotRestoringFromFailure() throws Exception {
List<String> fakeStreams = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public Object getCheckpointLock() {
@Override
public void close() {}

public StreamRecord<T> removeLatestOutput() {
return collectedOutputs.poll();
}

public ConcurrentLinkedQueue<StreamRecord<T>> getCollectedOutputs() {
return collectedOutputs;
}
Expand Down

0 comments on commit 8d2b086

Please sign in to comment.