From 0e676f5aea453690085686181722ff9575e8e40b Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Tue, 30 Jan 2018 17:44:44 -0800 Subject: [PATCH 1/4] [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer --- .../kinesis/FlinkKinesisConsumer.java | 22 ++++++- .../kinesis/internals/KinesisDataFetcher.java | 24 ++++++-- .../kinesis/util/KinesisShardAssigner.java | 57 ++++++++++++++++++ .../FlinkKinesisConsumerMigrationTest.java | 5 +- .../internals/KinesisDataFetcherTest.java | 59 ++++++++++++++++++- .../testutils/TestableKinesisDataFetcher.java | 1 + 6 files changed, 155 insertions(+), 13 deletions(-) create mode 100644 flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 7d97b7d8a405d..c96ef77287452 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisShardAssigner; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; @@ -93,6 +94,11 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction imple /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */ private final KinesisDeserializationSchema deserializer; + /** + * The function that determines which subtask a shard should be assigned to. + */ + private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER; + // ------------------------------------------------------------------------ // Runtime state // ------------------------------------------------------------------------ @@ -192,6 +198,14 @@ public FlinkKinesisConsumer(List streams, KinesisDeserializationSchema entry : sequenceNumsToRestore.entrySet()) { // sequenceNumsToRestore is the restored global union state; // should only snapshot shards that actually belong to us - + int hashCode = shardAssigner.assign( + KinesisDataFetcher.convertToStreamShardHandle(entry.getKey().getShardMetadata()), + getRuntimeContext().getNumberOfParallelSubtasks()); if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo( - KinesisDataFetcher.convertToStreamShardHandle(entry.getKey().getShardMetadata()), + hashCode, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask())) { @@ -384,7 +400,7 @@ protected KinesisDataFetcher createFetcher( Properties configProps, KinesisDeserializationSchema deserializationSchema) { - return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema); + return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner); } @VisibleForTesting diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 4d03ba8d7e774..f7c4ebd409365 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisShardAssigner; import org.apache.flink.util.InstantiationUtil; import com.amazonaws.services.kinesis.model.HashKeyRange; @@ -78,6 +79,9 @@ @Internal public class KinesisDataFetcher { + public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = (shard, subtasks) -> shard.hashCode(); + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); // ------------------------------------------------------------------------ @@ -97,6 +101,11 @@ public class KinesisDataFetcher { */ private final KinesisDeserializationSchema deserializationSchema; + /** + * The function that determines which subtask a shard should be assigned to. + */ + private final KinesisShardAssigner shardAssigner; + // ------------------------------------------------------------------------ // Consumer metrics // ------------------------------------------------------------------------ @@ -184,13 +193,15 @@ public KinesisDataFetcher(List streams, SourceFunction.SourceContext sourceContext, RuntimeContext runtimeContext, Properties configProps, - KinesisDeserializationSchema deserializationSchema) { + KinesisDeserializationSchema deserializationSchema, + KinesisShardAssigner kinesisShardToSubTaskIndexFn) { this(streams, sourceContext, sourceContext.getCheckpointLock(), runtimeContext, configProps, deserializationSchema, + kinesisShardToSubTaskIndexFn, new AtomicReference<>(), new ArrayList<>(), createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), @@ -204,6 +215,7 @@ protected KinesisDataFetcher(List streams, RuntimeContext runtimeContext, Properties configProps, KinesisDeserializationSchema deserializationSchema, + KinesisShardAssigner shardAssigner, AtomicReference error, List subscribedShardsState, HashMap subscribedStreamsToLastDiscoveredShardIds, @@ -216,6 +228,7 @@ protected KinesisDataFetcher(List streams, this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks(); this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask(); this.deserializationSchema = checkNotNull(deserializationSchema); + this.shardAssigner = checkNotNull(shardAssigner); this.kinesis = checkNotNull(kinesis); this.consumerMetricGroup = runtimeContext.getMetricGroup() @@ -453,7 +466,8 @@ public List discoverNewShardsToSubscribe() throws Interrupted for (String stream : streamsWithNewShards) { List newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream); for (StreamShardHandle newShard : newShardsOfStream) { - if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) { + int hashCode = shardAssigner.assign(newShard, totalNumberOfConsumerSubtasks); + if (isThisSubtaskShouldSubscribeTo(hashCode, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) { newShardsToSubscribe.add(newShard); } } @@ -596,14 +610,14 @@ private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup /** * Utility function to determine whether a shard should be subscribed by this consumer subtask. * - * @param shard the shard to determine + * @param shardHash hash code for the shard * @param totalNumberOfConsumerSubtasks total number of consumer subtasks * @param indexOfThisConsumerSubtask index of this consumer subtask */ - public static boolean isThisSubtaskShouldSubscribeTo(StreamShardHandle shard, + public static boolean isThisSubtaskShouldSubscribeTo(int shardHash, int totalNumberOfConsumerSubtasks, int indexOfThisConsumerSubtask) { - return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask; + return (Math.abs(shardHash % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask; } @VisibleForTesting diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java new file mode 100644 index 0000000000000..089cdd66f0a53 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import java.io.Serializable; + +/** + * Utility to map Kinesis shards to Flink subtask indices. + */ +public interface KinesisShardAssigner extends Serializable { + /** + * Returns the index of the target subtask that a specific Kafka partition should be + * assigned to. For return values outside the subtask range, modulus operation will + * be applied automatically, hence it is also valid to just return a hash code. + * + *

The resulting distribution of shards has the following contract: + *

    + *
  • 1. Uniform distribution across subtasks
  • + *
  • 2. Deterministic, calls for a given shard always return same index.
  • + *
+ * + *

The above contract is crucial and cannot be broken. Consumer subtasks rely on this + * contract to filter out partitions that they should not subscribe to, guaranteeing + * that all partitions of a single topic will always be assigned to some subtask in a + * uniformly distributed manner. + * + *

Kinesis and the consumer support dynamic re-sharding and shard IDs, while sequential, + * cannot be assumed to be consecutive. There is no perfect generic default assignment function. + * Default subtask index assignment, which is based on hash code, may result in skew, + * with some subtasks having many shards assigned and others none. + * + *

It is recommended to monitor the shard distribution and adjust assignment appropriately. + * Custom implementation may optimize the hash function or use static overrides to limit skew. + * + * @param shard the shard to determine + * @param numParallelSubtasks total number of subtasks + * @return index or hash code + */ + int assign(StreamShardHandle shard, int numParallelSubtasks); +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java index fd2d8805bb7c5..75f584e3527a6 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.connectors.kinesis; -import com.amazonaws.services.kinesis.model.SequenceNumberRange; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.testutils.OneShotLatch; @@ -42,8 +41,8 @@ import org.apache.flink.streaming.util.migration.MigrationTestUtil; import org.apache.flink.streaming.util.migration.MigrationVersion; +import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; - import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -419,7 +418,7 @@ public TestFetcher( HashMap testStateSnapshot, List testInitialDiscoveryShards) { - super(streams, sourceContext, runtimeContext, configProps, deserializationSchema); + super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER); this.testStateSnapshot = testStateSnapshot; this.testInitialDiscoveryShards = testInitialDiscoveryShards; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java index 1620366e5d82f..eba31bd355e22 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java @@ -33,13 +33,15 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext; import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisShardAssigner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.TestLogger; import org.junit.Test; +import org.powermock.reflect.Whitebox; import java.util.Collections; import java.util.HashMap; @@ -644,4 +646,57 @@ public RuntimeContext getRuntimeContext() { return context; } } + + + // ---------------------------------------------------------------------- + // Tests shard distribution with custom hash function + // ---------------------------------------------------------------------- + + @Test + public void testShardToSubtaskMappingWithCustomHashFunction() throws Exception { + + int totalCountOfSubtasks = 10; + int shardCount = 3; + + for (int i = 0; i < 2; i++) { + + final int hash = i; + final KinesisShardAssigner allShardsSingleSubtaskFn = (shard, subtasks) -> hash; + Map streamToShardCount = new HashMap<>(); + List fakeStreams = new LinkedList<>(); + fakeStreams.add("fakeStream"); + streamToShardCount.put("fakeStream", shardCount); + + for (int j = 0; j < totalCountOfSubtasks; j++) { + + int subtaskIndex = j; + // subscribe with default hashing + final TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + fakeStreams, + new TestSourceContext<>(), + new Properties(), + new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), + totalCountOfSubtasks, + subtaskIndex, + new AtomicReference<>(), + new LinkedList<>(), + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams), + FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount)); + Whitebox.setInternalState(fetcher, "shardAssigner", allShardsSingleSubtaskFn); // override hashing + List shards = fetcher.discoverNewShardsToSubscribe(); + fetcher.shutdownFetcher(); + + String msg = String.format("for hash=%d, subtask=%d", hash, subtaskIndex); + if (j == i) { + assertEquals(msg, shardCount, shards.size()); + } else { + assertEquals(msg, 0, shards.size()); + } + } + + } + + } + } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java index 93f3d386059f8..b7cfb2d32d1db 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java @@ -68,6 +68,7 @@ public TestableKinesisDataFetcher( getMockedRuntimeContext(fakeTotalCountOfSubtasks, fakeIndexOfThisSubtask), fakeConfiguration, deserializationSchema, + DEFAULT_SHARD_ASSIGNER, thrownErrorUnderTest, subscribedShardsStateUnderTest, subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, From 9a7aaef6399b9a7c8ddf20fb88e18db43d30dcef Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Sat, 3 Feb 2018 22:33:20 -0800 Subject: [PATCH 2/4] Address review comments. --- docs/dev/connectors/kinesis.md | 5 ++++ .../kinesis/FlinkKinesisConsumer.java | 19 +++++++++++++-- .../kinesis/internals/KinesisDataFetcher.java | 5 ++-- .../kinesis/util/KinesisShardAssigner.java | 24 ++++++++----------- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index 9bcd70a753440..4ee9c6b03d463 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -119,6 +119,11 @@ then some consumer subtasks will simply be idle and wait until it gets assigned new shards (i.e., when the streams are resharded to increase the number of shards for higher provisioned Kinesis service throughput). +Also note that the assignment of shards to subtasks may not be optimal when +shard IDs are not consecutive (as result of dynamic re-sharding in Kinesis). +For cases where skew in the assignment leads to significant imbalanced consumption, +a custom implementation of `KinesisShardAssigner` can be set on the consumer. + ### Configuring Starting Position The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index c96ef77287452..db806c84c5e87 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -69,6 +70,15 @@ * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.

* + *

Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, while sequential, + * cannot be assumed to be consecutive. There is no perfect generic default assignment function. + * Default shard to subtask assignment, which is based on hash code, may result in skew, + * with some subtasks having many shards assigned and others none. + * + *

It is recommended to monitor the shard distribution and adjust assignment appropriately. + * A custom assigner implementation can be set via {@link #setShardAssigner(KinesisShardAssigner)} to optimize the + * hash function or use static overrides to limit skew. + * * @param the type of data emitted */ @PublicEvolving @@ -202,8 +212,13 @@ public KinesisShardAssigner getShardAssigner() { return shardAssigner; } - public void setShardAssigner(KinesisShardAssigner kinesisShardToSubTaskIndexFn) { - this.shardAssigner = checkNotNull(kinesisShardToSubTaskIndexFn, "function can not be null"); + /** + * Provide a custom assigner to influence how shards are distributed over subtasks. + * @param shardAssigner + */ + public void setShardAssigner(KinesisShardAssigner shardAssigner) { + this.shardAssigner = checkNotNull(shardAssigner, "function can not be null"); + ClosureCleaner.clean(shardAssigner, true); } // ------------------------------------------------------------------------ diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index f7c4ebd409365..4fb24099e1054 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -81,7 +81,6 @@ public class KinesisDataFetcher { public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = (shard, subtasks) -> shard.hashCode(); - private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); // ------------------------------------------------------------------------ @@ -194,14 +193,14 @@ public KinesisDataFetcher(List streams, RuntimeContext runtimeContext, Properties configProps, KinesisDeserializationSchema deserializationSchema, - KinesisShardAssigner kinesisShardToSubTaskIndexFn) { + KinesisShardAssigner shardAssigner) { this(streams, sourceContext, sourceContext.getCheckpointLock(), runtimeContext, configProps, deserializationSchema, - kinesisShardToSubTaskIndexFn, + shardAssigner, new AtomicReference<>(), new ArrayList<>(), createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java index 089cdd66f0a53..bd33c05953e03 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java @@ -17,41 +17,37 @@ package org.apache.flink.streaming.connectors.kinesis.util; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; import java.io.Serializable; /** - * Utility to map Kinesis shards to Flink subtask indices. + * Utility to map Kinesis shards to Flink subtask indices. Users can implement this interface to optimize + * distribution of shards over subtasks. See {@link #assign(StreamShardHandle, int)} for details. */ +@PublicEvolving public interface KinesisShardAssigner extends Serializable { + /** - * Returns the index of the target subtask that a specific Kafka partition should be + * Returns the index of the target subtask that a specific shard should be * assigned to. For return values outside the subtask range, modulus operation will * be applied automatically, hence it is also valid to just return a hash code. * - *

The resulting distribution of shards has the following contract: + *

The resulting distribution of shards should have the following contract: *

    *
  • 1. Uniform distribution across subtasks
  • *
  • 2. Deterministic, calls for a given shard always return same index.
  • *
* *

The above contract is crucial and cannot be broken. Consumer subtasks rely on this - * contract to filter out partitions that they should not subscribe to, guaranteeing - * that all partitions of a single topic will always be assigned to some subtask in a + * contract to filter out shards that they should not subscribe to, guaranteeing + * that each shard of a stream will always be assigned to one subtask in a * uniformly distributed manner. * - *

Kinesis and the consumer support dynamic re-sharding and shard IDs, while sequential, - * cannot be assumed to be consecutive. There is no perfect generic default assignment function. - * Default subtask index assignment, which is based on hash code, may result in skew, - * with some subtasks having many shards assigned and others none. - * - *

It is recommended to monitor the shard distribution and adjust assignment appropriately. - * Custom implementation may optimize the hash function or use static overrides to limit skew. - * * @param shard the shard to determine * @param numParallelSubtasks total number of subtasks - * @return index or hash code + * @return target index, if index falls outside of the range, modulus operation will be applied */ int assign(StreamShardHandle shard, int numParallelSubtasks); } From 137a7615c7d3e7c3b5e0bde6843aa9c582b82a77 Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Tue, 6 Feb 2018 22:31:20 -0800 Subject: [PATCH 3/4] Move KinesisShardAssigner to parent package. --- .../streaming/connectors/kinesis/FlinkKinesisConsumer.java | 1 - .../connectors/kinesis/{util => }/KinesisShardAssigner.java | 2 +- .../connectors/kinesis/internals/KinesisDataFetcher.java | 2 +- .../connectors/kinesis/internals/KinesisDataFetcherTest.java | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) rename flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/{util => }/KinesisShardAssigner.java (97%) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index db806c84c5e87..407a5a9552450 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -44,7 +44,6 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; -import org.apache.flink.streaming.connectors.kinesis.util.KinesisShardAssigner; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisShardAssigner.java similarity index 97% rename from flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java rename to flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisShardAssigner.java index bd33c05953e03..76e3cd661612d 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisShardAssigner.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.kinesis.util; +package org.apache.flink.streaming.connectors.kinesis; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 4fb24099e1054..945f396c8b87a 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants; import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter; @@ -34,7 +35,6 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; -import org.apache.flink.streaming.connectors.kinesis.util.KinesisShardAssigner; import org.apache.flink.util.InstantiationUtil; import com.amazonaws.services.kinesis.model.HashKeyRange; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java index eba31bd355e22..104aad31d3061 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java @@ -22,6 +22,7 @@ import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; @@ -33,7 +34,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext; import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher; -import org.apache.flink.streaming.connectors.kinesis.util.KinesisShardAssigner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; From aad286ebd8066b4f113683d2d1a9ec2d37688228 Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Wed, 7 Feb 2018 09:46:13 -0800 Subject: [PATCH 4/4] Add test for KinesisDataFetcher.isThisSubtaskShouldSubscribeTo --- .../kinesis/internals/KinesisDataFetcherTest.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java index 104aad31d3061..7854d039dd5f2 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java @@ -55,6 +55,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -699,4 +700,14 @@ public void testShardToSubtaskMappingWithCustomHashFunction() throws Exception { } + @Test + public void testIsThisSubtaskShouldSubscribeTo() { + assertTrue(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(0, 2, 0)); + assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(1, 2, 0)); + assertTrue(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 0)); + assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(0, 2, 1)); + assertTrue(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(1, 2, 1)); + assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 1)); + } + }