Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip non-deserializable records #5269

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.internals;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down Expand Up @@ -195,7 +196,7 @@ public KinesisDataFetcher(List<String> streams,
KinesisProxy.create(configProps));
}

/** This constructor is exposed for testing purposes. */
@VisibleForTesting
protected KinesisDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
Object checkpointLock,
Expand Down Expand Up @@ -498,9 +499,16 @@ protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
* when the shard state was registered.
* @param lastSequenceNumber the last sequence number value to update
*/
protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
protected final void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collectWithTimestamp(record, recordTimestamp);
if (record != null) {
sourceContext.collectWithTimestamp(record, recordTimestamp);
} else {
LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.",
lastSequenceNumber,
subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
}

updateState(shardStateIndex, lastSequenceNumber);
}
}
Expand All @@ -515,7 +523,7 @@ protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shar
* when the shard state was registered.
* @param lastSequenceNumber the last sequence number value to update
*/
protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
protected final void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);

Expand Down Expand Up @@ -597,7 +605,8 @@ public static boolean isThisSubtaskShouldSubscribeTo(StreamShardHandle shard,
return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
}

private static ExecutorService createShardConsumersThreadPool(final String subtaskName) {
@VisibleForTesting
protected ExecutorService createShardConsumersThreadPool(final String subtaskName) {
return Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,18 @@
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

/**
* Deserializes a Kinesis record's bytes.
* Deserializes a Kinesis record's bytes. If the record cannot be deserialized, {@code null}
* may be returned. This informs the Flink Kinesis Consumer to process the Kinesis record
* without producing any output for it, i.e. effectively "skipping" the record.
*
* @param recordValue the record's value as a byte array
* @param partitionKey the record's partition key at the time of writing
* @param seqNum the sequence number of this record in the Kinesis shard
* @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
* @param stream the name of the Kinesis stream that this record was sent to
* @param shardId The identifier of the shard the record was sent to
* @return the deserialized message as an Java object
*
* @return the deserialized message as an Java object ({@code null if the message cannot be deserialized).
* @throws IOException
*/
T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException;
Expand Down