From bc8e50d99be745300f7418c58e9d30abc5469ba3 Mon Sep 17 00:00:00 2001 From: Gordon Tai Date: Wed, 24 Aug 2016 16:38:06 +0800 Subject: [PATCH 1/4] [FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on resharding This no longer allows the Kinesis consumer to transparently handle resharding. This is a short-term workaround until we have a min-watermark notification service available in the JobManager. --- docs/dev/connectors/kinesis.md | 6 ++++ .../kinesis/FlinkKinesisConsumer.java | 2 +- .../kinesis/internals/KinesisDataFetcher.java | 36 +++++++++++++++++-- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index ce011b3469d5b..b3660608340dd 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -107,6 +107,12 @@ to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`. +**NOTE:** Currently, resharding can not be handled transparently (i.e., without failing and restarting jobs) if there are idle consumer +subtasks, which occur when the total number of shards is lower than the configured consumer parallelism. The new shards +due to resharding will still be picked up by the Kinesis consumer after the job automatically restores. This is a temporary +limitation that will be resolved in future versions. Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) +for more detail. + #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 7b1f836153a25..873849164c92b 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -210,7 +210,7 @@ public void run(SourceContext sourceContext) throws Exception { if (LOG.isInfoEnabled()) { LOG.info("Subtask {} is seeding the fetcher with restored shard {}," + - "starting state set to the restored sequence number {}" + + "starting state set to the restored sequence number {}", getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue()); } fetcher.registerNewSubscribedShardState( diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index d83ab06566319..5f059b7f3c24d 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; @@ -231,7 +232,7 @@ public void runFetcher() throws Exception { if (LOG.isDebugEnabled()) { String logFormat = (isRestoredFromFailure) ? "Subtask {} is trying to discover initial shards ..." - : "Subtask {} is trying to discover any new shards that were created while the consumer wasn't" + + : "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " + "running due to failure ..."; LOG.debug(logFormat, indexOfThisConsumerSubtask); @@ -252,8 +253,8 @@ public void runFetcher() throws Exception { if (LOG.isInfoEnabled()) { String logFormat = (isRestoredFromFailure) ? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}" - : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't" + - "running due to failure, starting state set as sequence number {}"; + : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " + + "running due to failure, starting state set as sequence number {}"; LOG.info(logFormat, indexOfThisConsumerSubtask, shard.toString(), startingStateForNewShard.get()); } @@ -311,6 +312,23 @@ public void runFetcher() throws Exception { ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS))); + // FLINK-4341: + // For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark + // for subtasks that initially do not have shards and won't collect records, otherwise the downstream watermarks + // would not advance, leading to unbounded accumulating state. + + // The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard + // is to be subscribed by a subtask that initially does not have any shards, otherwise the max value watermark + // emitted at the beginning will mess up the watermarks. + // + // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager + // Please see FLINK-4341 for more detail + boolean initiallyHadNoSubscribedShards = false; + if (subscribedShardsState.size() == 0) { + initiallyHadNoSubscribedShards = true; + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + } + while (running) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...", @@ -318,6 +336,18 @@ public void runFetcher() throws Exception { } List newShardsDueToResharding = discoverNewShardsToSubscribe(); + // deliberately fail hard if we discover new shards to subscribe but this subtask initially had no subscribed + // shards; the new shards will be picked up by this same subtask after restore as initial shards + // + // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager + // Please see FLINK-4341 for more detail + if (newShardsDueToResharding.size() > 0 && initiallyHadNoSubscribedShards) { + throw new RuntimeException( + "Subtask " + indexOfThisConsumerSubtask + " has discovered " + newShardsDueToResharding.size() + + " new shards to subscribe, but is failing hard to avoid messing up watermarks; the new shards" + + " will be subscribed by this subtask after restore ..."); + } + for (KinesisStreamShard shard : newShardsDueToResharding) { // since there may be delay in discovering a new shard, all new shards due to // resharding should be read starting from the earliest record possible From d8d0942075aa5cbb2eedf9d7b2ce6132e52f17eb Mon Sep 17 00:00:00 2001 From: Gordon Tai Date: Sun, 28 Aug 2016 15:40:30 +0800 Subject: [PATCH 2/4] [FLINK-4341] Fully consider all cases to emit max value watermark / fail --- .../kinesis/FlinkKinesisConsumer.java | 9 +- .../kinesis/internals/KinesisDataFetcher.java | 128 +++++++++++++----- .../kinesis/internals/ShardConsumer.java | 4 + .../kinesis/proxy/KinesisProxy.java | 8 +- 4 files changed, 109 insertions(+), 40 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 873849164c92b..a62dc100c054b 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -182,7 +182,6 @@ public void open(Configuration parameters) throws Exception { // initialize sequence numbers with restored state lastStateSnapshot = sequenceNumsToRestore; - sequenceNumsToRestore = null; } else { // start fresh with empty sequence numbers if there are no snapshots to restore from. lastStateSnapshot = new HashMap<>(); @@ -198,7 +197,7 @@ public void run(SourceContext sourceContext) throws Exception { fetcher = new KinesisDataFetcher<>( streams, sourceContext, getRuntimeContext(), configProps, deserializer); - boolean isRestoringFromFailure = !lastStateSnapshot.isEmpty(); + boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored @@ -210,7 +209,7 @@ public void run(SourceContext sourceContext) throws Exception { if (LOG.isInfoEnabled()) { LOG.info("Subtask {} is seeding the fetcher with restored shard {}," + - "starting state set to the restored sequence number {}", + " starting state set to the restored sequence number {}", getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue()); } fetcher.registerNewSubscribedShardState( @@ -285,13 +284,13 @@ public HashMap snapshotState(long checkpoint } if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state. ..."); + LOG.debug("Snapshotting state ..."); } lastStateSnapshot = fetcher.snapshotState(); if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state. Last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); } diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 5f059b7f3c24d..2469ea1919f2f 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -151,6 +152,14 @@ public class KinesisDataFetcher { /** Thread that executed runFetcher() */ private Thread mainThread; + /** + * The current number of shards that are actively read by this fetcher. + * + * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, + * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}. + */ + private final AtomicInteger numberOfActiveShards = new AtomicInteger(0); + private volatile boolean running = true; /** @@ -230,7 +239,7 @@ public void runFetcher() throws Exception { // 1. query for any new shards that may have been created while the Kinesis consumer was not running, // and register them to the subscribedShardState list. if (LOG.isDebugEnabled()) { - String logFormat = (isRestoredFromFailure) + String logFormat = (!isRestoredFromFailure) ? "Subtask {} is trying to discover initial shards ..." : "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " + "running due to failure ..."; @@ -251,7 +260,7 @@ public void runFetcher() throws Exception { : initialPosition.toSentinelSequenceNumber(); if (LOG.isInfoEnabled()) { - String logFormat = (isRestoredFromFailure) + String logFormat = (!isRestoredFromFailure) ? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}" : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " + "running due to failure, starting state set as sequence number {}"; @@ -288,18 +297,22 @@ public void runFetcher() throws Exception { for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) { KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex); - if (LOG.isInfoEnabled()) { - LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}", - indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(), - seededShardState.getLastProcessedSequenceNum(), seededStateIndex); - } + // only start a consuming thread if the seeded subscribed shard has not been completely read already + if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - shardConsumersExecutor.submit( - new ShardConsumer<>( - this, - seededStateIndex, - subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(), - subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum())); + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}", + indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(), + seededShardState.getLastProcessedSequenceNum(), seededStateIndex); + } + + shardConsumersExecutor.submit( + new ShardConsumer<>( + this, + seededStateIndex, + subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(), + subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum())); + } } // ------------------------------------------------------------------------ @@ -314,19 +327,33 @@ public void runFetcher() throws Exception { // FLINK-4341: // For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark - // for subtasks that initially do not have shards and won't collect records, otherwise the downstream watermarks - // would not advance, leading to unbounded accumulating state. - + // for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise + // the downstream watermarks would not advance, leading to unbounded accumulating state. + // // The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard - // is to be subscribed by a subtask that initially does not have any shards, otherwise the max value watermark - // emitted at the beginning will mess up the watermarks. + // is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks + // will be messed up. + // + // There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard: + // (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max + // value watermark. This case is encountered when 1) all previously read shards by this subtask were closed + // due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer + // was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup. + // (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted + // a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards + // will be subscribed by this subtask after restore as initial shards on startup. // // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager // Please see FLINK-4341 for more detail - boolean initiallyHadNoSubscribedShards = false; - if (subscribedShardsState.size() == 0) { - initiallyHadNoSubscribedShards = true; + + boolean emittedMaxValueWatermark = false; + + if (this.numberOfActiveShards.get() == 0) { + // FLINK-4341 workaround case (a) - please see the above for details on this case + LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...", + indexOfThisConsumerSubtask); sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + emittedMaxValueWatermark = true; } while (running) { @@ -336,16 +363,40 @@ public void runFetcher() throws Exception { } List newShardsDueToResharding = discoverNewShardsToSubscribe(); - // deliberately fail hard if we discover new shards to subscribe but this subtask initially had no subscribed - // shards; the new shards will be picked up by this same subtask after restore as initial shards + // -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards -- + // Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists + // a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards + // may not correctly reflect the discover result in the below synchronized block. This may lead to incorrect + // case determination on the current discovery attempt, but can still be correctly handled on future attempts. // - // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager - // Please see FLINK-4341 for more detail - if (newShardsDueToResharding.size() > 0 && initiallyHadNoSubscribedShards) { - throw new RuntimeException( - "Subtask " + indexOfThisConsumerSubtask + " has discovered " + newShardsDueToResharding.size() + - " new shards to subscribe, but is failing hard to avoid messing up watermarks; the new shards" + - " will be subscribed by this subtask after restore ..."); + // Although this can be resolved by also wrapping the current shard discovery attempt within the below + // synchronized block, there will be considerable throughput performance regression as shard discovery is a + // remote call to AWS. Therefore, since the case determination is a temporary workaround for FLINK-4341, the + // race condition is tolerable as we can still eventually handle max value watermark emitting / deliberately + // failing on successive discovery attempts. + + synchronized (checkpointLock) { + if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) { + // FLINK-4341 workaround case (a) - please see the above for details on this case + LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...", + indexOfThisConsumerSubtask); + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + emittedMaxValueWatermark = true; + } else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) { + // FLINK-4341 workaround case (b) - please see the above for details on this case + // + // Note that in the case where on resharding this subtask ceased to read all of it's previous shards + // but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark + // will be false; this allows the fetcher to continue reading the new shards without failing on such cases. + // However, due to the race condition mentioned above, we might still fall into case (a) first, and + // then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value + // watermark emitting still remains to be correct. + + LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" + + " up watermarks; the new shards will be subscribed by this subtask after restore ...", + indexOfThisConsumerSubtask, newShardsDueToResharding.size()); + throw new RuntimeException("Deliberate failure to avoid messing up watermarks"); + } } for (KinesisStreamShard shard : newShardsDueToResharding) { @@ -551,6 +602,14 @@ protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shar protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber); + + // if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread, + // we've finished reading the shard and should determine it to be non-active + if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + this.numberOfActiveShards.decrementAndGet(); + LOG.info("Subtask {} has reached the end of subscribed shard: {}", + indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard()); + } } } @@ -560,9 +619,16 @@ protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumbe * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to */ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) { - synchronized (checkpointLock) { subscribedShardsState.add(newSubscribedShardState); + + // If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case + // if the consumer had already finished reading a shard before we failed and restored), we determine that + // this subtask has a new active shard + if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + this.numberOfActiveShards.incrementAndGet(); + } + return subscribedShardsState.size()-1; } } diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 494f5deaf9f3c..6e24e65233a3a 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Properties; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -87,6 +88,9 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); this.lastSequenceNum = checkNotNull(lastSequenceNum); + checkArgument( + !lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()), + "Should not start a ShardConsumer if the shard has already been completely read."); this.deserializer = fetcherRef.getClonedDeserializationSchema(); diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 906689f9e211f..1113fde399848 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -201,8 +201,8 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th } if (getRecordsResult == null) { - throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + "retry" + - "attempts returned ProvisionedThroughputExceededException."); + throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + + " retry attempts returned ProvisionedThroughputExceededException."); } return getRecordsResult; @@ -245,8 +245,8 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp } if (getShardIteratorResult == null) { - throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + "retry" + - "attempts returned ProvisionedThroughputExceededException."); + throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + + " retry attempts returned ProvisionedThroughputExceededException."); } return getShardIteratorResult.getShardIterator(); } From 57b4cb70eb6f1ca4e6f8dc99777f50da7b6fb936 Mon Sep 17 00:00:00 2001 From: Gordon Tai Date: Sun, 28 Aug 2016 15:46:59 +0800 Subject: [PATCH 3/4] [FLINK-4341] Inform checkpointing must be enabled in the workaround notice --- docs/dev/connectors/kinesis.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index b3660608340dd..c54239d73fefe 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -108,10 +108,10 @@ to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`. **NOTE:** Currently, resharding can not be handled transparently (i.e., without failing and restarting jobs) if there are idle consumer -subtasks, which occur when the total number of shards is lower than the configured consumer parallelism. The new shards -due to resharding will still be picked up by the Kinesis consumer after the job automatically restores. This is a temporary -limitation that will be resolved in future versions. Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) -for more detail. +subtasks, which occur when the total number of shards is lower than the configured consumer parallelism. The job must be +configured to enable checkpointing, so that the new shards due to resharding can be correctly picked up and consumed by the +Kinesis consumer after the job is restored. This is a temporary limitation that will be resolved in future versions. +Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for more detail. #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics From ddbd4b589296fbbe14f2992d530a3ab252a44112 Mon Sep 17 00:00:00 2001 From: Gordon Tai Date: Sun, 28 Aug 2016 21:23:32 +0800 Subject: [PATCH 4/4] [FLINK-4341] Remove unnecessary synchronized block --- .../kinesis/internals/KinesisDataFetcher.java | 57 +++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 2469ea1919f2f..a06fdcab51da2 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -366,37 +366,36 @@ public void runFetcher() throws Exception { // -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards -- // Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists // a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards - // may not correctly reflect the discover result in the below synchronized block. This may lead to incorrect + // may not correctly reflect the discover result in the below case determination. This may lead to incorrect // case determination on the current discovery attempt, but can still be correctly handled on future attempts. // - // Although this can be resolved by also wrapping the current shard discovery attempt within the below - // synchronized block, there will be considerable throughput performance regression as shard discovery is a - // remote call to AWS. Therefore, since the case determination is a temporary workaround for FLINK-4341, the - // race condition is tolerable as we can still eventually handle max value watermark emitting / deliberately - // failing on successive discovery attempts. - - synchronized (checkpointLock) { - if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) { - // FLINK-4341 workaround case (a) - please see the above for details on this case - LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...", - indexOfThisConsumerSubtask); - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - emittedMaxValueWatermark = true; - } else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) { - // FLINK-4341 workaround case (b) - please see the above for details on this case - // - // Note that in the case where on resharding this subtask ceased to read all of it's previous shards - // but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark - // will be false; this allows the fetcher to continue reading the new shards without failing on such cases. - // However, due to the race condition mentioned above, we might still fall into case (a) first, and - // then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value - // watermark emitting still remains to be correct. - - LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" + - " up watermarks; the new shards will be subscribed by this subtask after restore ...", - indexOfThisConsumerSubtask, newShardsDueToResharding.size()); - throw new RuntimeException("Deliberate failure to avoid messing up watermarks"); - } + // Although this can be resolved by wrapping the current shard discovery attempt with the below + // case determination within a synchronized block on the checkpoint lock for atomicity, there will be + // considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore, + // since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as + // we can still eventually handle max value watermark emitting / deliberately failing on successive + // discovery attempts. + + if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) { + // FLINK-4341 workaround case (a) - please see the above for details on this case + LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...", + indexOfThisConsumerSubtask); + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + emittedMaxValueWatermark = true; + } else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) { + // FLINK-4341 workaround case (b) - please see the above for details on this case + // + // Note that in the case where on resharding this subtask ceased to read all of it's previous shards + // but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark + // will be false; this allows the fetcher to continue reading the new shards without failing on such cases. + // However, due to the race condition mentioned above, we might still fall into case (a) first, and + // then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value + // watermark emitting still remains to be correct. + + LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" + + " up watermarks; the new shards will be subscribed by this subtask after restore ...", + indexOfThisConsumerSubtask, newShardsDueToResharding.size()); + throw new RuntimeException("Deliberate failure to avoid messing up watermarks"); } for (KinesisStreamShard shard : newShardsDueToResharding) {