Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/dev/connectors/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from

Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`.

**NOTE:** Currently, resharding can not be handled transparently (i.e., without failing and restarting jobs) if there are idle consumer
subtasks, which occur when the total number of shards is lower than the configured consumer parallelism. The job must be
configured to enable checkpointing, so that the new shards due to resharding can be correctly picked up and consumed by the
Kinesis consumer after the job is restored. This is a temporary limitation that will be resolved in future versions.
Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for more detail.

#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics

With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ public void open(Configuration parameters) throws Exception {

// initialize sequence numbers with restored state
lastStateSnapshot = sequenceNumsToRestore;
sequenceNumsToRestore = null;
} else {
// start fresh with empty sequence numbers if there are no snapshots to restore from.
lastStateSnapshot = new HashMap<>();
Expand All @@ -198,7 +197,7 @@ public void run(SourceContext<T> sourceContext) throws Exception {
fetcher = new KinesisDataFetcher<>(
streams, sourceContext, getRuntimeContext(), configProps, deserializer);

boolean isRestoringFromFailure = !lastStateSnapshot.isEmpty();
boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
fetcher.setIsRestoringFromFailure(isRestoringFromFailure);

// if we are restoring from a checkpoint, we iterate over the restored
Expand All @@ -210,7 +209,7 @@ public void run(SourceContext<T> sourceContext) throws Exception {

if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
"starting state set to the restored sequence number {}" +
" starting state set to the restored sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue());
}
fetcher.registerNewSubscribedShardState(
Expand Down Expand Up @@ -285,13 +284,13 @@ public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpoint
}

if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotting state. ...");
LOG.debug("Snapshotting state ...");
}

lastStateSnapshot = fetcher.snapshotState();

if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotting state. Last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
Expand All @@ -45,6 +46,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -150,6 +152,14 @@ public class KinesisDataFetcher<T> {
/** Thread that executed runFetcher() */
private Thread mainThread;

/**
* The current number of shards that are actively read by this fetcher.
*
* This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
* and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
*/
private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);

private volatile boolean running = true;

/**
Expand Down Expand Up @@ -229,9 +239,9 @@ public void runFetcher() throws Exception {
// 1. query for any new shards that may have been created while the Kinesis consumer was not running,
// and register them to the subscribedShardState list.
if (LOG.isDebugEnabled()) {
String logFormat = (isRestoredFromFailure)
String logFormat = (!isRestoredFromFailure)
? "Subtask {} is trying to discover initial shards ..."
: "Subtask {} is trying to discover any new shards that were created while the consumer wasn't" +
: "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " +
"running due to failure ...";

LOG.debug(logFormat, indexOfThisConsumerSubtask);
Expand All @@ -250,10 +260,10 @@ public void runFetcher() throws Exception {
: initialPosition.toSentinelSequenceNumber();

if (LOG.isInfoEnabled()) {
String logFormat = (isRestoredFromFailure)
String logFormat = (!isRestoredFromFailure)
? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}"
: "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't" +
"running due to failure, starting state set as sequence number {}";
: "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " +
"running due to failure, starting state set as sequence number {}";

LOG.info(logFormat, indexOfThisConsumerSubtask, shard.toString(), startingStateForNewShard.get());
}
Expand Down Expand Up @@ -287,18 +297,22 @@ public void runFetcher() throws Exception {
for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) {
KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex);

if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(),
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
}
// only start a consuming thread if the seeded subscribed shard has not been completely read already
if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {

shardConsumersExecutor.submit(
new ShardConsumer<>(
this,
seededStateIndex,
subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(),
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
}

shardConsumersExecutor.submit(
new ShardConsumer<>(
this,
seededStateIndex,
subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
}
}

// ------------------------------------------------------------------------
Expand All @@ -311,13 +325,79 @@ public void runFetcher() throws Exception {
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));

// FLINK-4341:
// For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark
// for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise
// the downstream watermarks would not advance, leading to unbounded accumulating state.
//
// The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard
// is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks
// will be messed up.
//
// There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard:
// (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max
// value watermark. This case is encountered when 1) all previously read shards by this subtask were closed
// due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer
// was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup.
// (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted
// a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards
// will be subscribed by this subtask after restore as initial shards on startup.
//
// TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager
// Please see FLINK-4341 for more detail

boolean emittedMaxValueWatermark = false;

if (this.numberOfActiveShards.get() == 0) {
// FLINK-4341 workaround case (a) - please see the above for details on this case
LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...",
indexOfThisConsumerSubtask);
sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
emittedMaxValueWatermark = true;
}

while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
indexOfThisConsumerSubtask);
}
List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();

// -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards --
// Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists
// a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards
// may not correctly reflect the discover result in the below case determination. This may lead to incorrect
// case determination on the current discovery attempt, but can still be correctly handled on future attempts.
//
// Although this can be resolved by wrapping the current shard discovery attempt with the below
// case determination within a synchronized block on the checkpoint lock for atomicity, there will be
// considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore,
// since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as
// we can still eventually handle max value watermark emitting / deliberately failing on successive
// discovery attempts.

if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
// FLINK-4341 workaround case (a) - please see the above for details on this case
LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...",
indexOfThisConsumerSubtask);
sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
emittedMaxValueWatermark = true;
} else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) {
// FLINK-4341 workaround case (b) - please see the above for details on this case
//
// Note that in the case where on resharding this subtask ceased to read all of it's previous shards
// but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark
// will be false; this allows the fetcher to continue reading the new shards without failing on such cases.
// However, due to the race condition mentioned above, we might still fall into case (a) first, and
// then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value
// watermark emitting still remains to be correct.

LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" +
" up watermarks; the new shards will be subscribed by this subtask after restore ...",
indexOfThisConsumerSubtask, newShardsDueToResharding.size());
throw new RuntimeException("Deliberate failure to avoid messing up watermarks");
}

for (KinesisStreamShard shard : newShardsDueToResharding) {
// since there may be delay in discovering a new shard, all new shards due to
// resharding should be read starting from the earliest record possible
Expand Down Expand Up @@ -521,6 +601,14 @@ protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shar
protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);

// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
// we've finished reading the shard and should determine it to be non-active
if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
this.numberOfActiveShards.decrementAndGet();
LOG.info("Subtask {} has reached the end of subscribed shard: {}",
indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
}
}
}

Expand All @@ -530,9 +618,16 @@ protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumbe
* @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
*/
public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {

synchronized (checkpointLock) {
subscribedShardsState.add(newSubscribedShardState);

// If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
// if the consumer had already finished reading a shard before we failed and restored), we determine that
// this subtask has a new active shard
if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
this.numberOfActiveShards.incrementAndGet();
}

return subscribedShardsState.size()-1;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -87,6 +88,9 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
checkArgument(
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
"Should not start a ShardConsumer if the shard has already been completely read.");

this.deserializer = fetcherRef.getClonedDeserializationSchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th
}

if (getRecordsResult == null) {
throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + "retry" +
"attempts returned ProvisionedThroughputExceededException.");
throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
" retry attempts returned ProvisionedThroughputExceededException.");
}

return getRecordsResult;
Expand Down Expand Up @@ -245,8 +245,8 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp
}

if (getShardIteratorResult == null) {
throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + "retry" +
"attempts returned ProvisionedThroughputExceededException.");
throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
" retry attempts returned ProvisionedThroughputExceededException.");
}
return getShardIteratorResult.getShardIterator();
}
Expand Down