From 482c2259ef20f436fcc2dfe1a329b54cbc733db7 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 28 Mar 2017 09:40:04 +0100 Subject: [PATCH 1/2] set internal.leave.group.on.close to false in StreamsConfig --- .../java/org/apache/kafka/streams/StreamsConfig.java | 1 + .../java/org/apache/kafka/streams/StreamsConfigTest.java | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index d2ba063d94263..015a2e8629367 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -402,6 +402,7 @@ public class StreamsConfig extends AbstractConfig { tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); + tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index d345cbda2299a..444322d076b9f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; +import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Test; @@ -36,6 +37,7 @@ import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -256,6 +258,13 @@ public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws streamsConfig.getRestoreConsumerConfigs("client"); } + @Test + public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception { + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map consumerConfigs = streamsConfig.getConsumerConfigs(null, "group", "client"); + assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false)); + } + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) { From 25116c2c6b744a2ea836d9ed87ba61dd44d02485 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 12 Apr 2017 11:32:58 +0100 Subject: [PATCH 2/2] constant for leave.group config --- .../kafka/streams/integration/GlobalKTableIntegrationTest.java | 2 +- .../kafka/streams/integration/InternalTopicIntegrationTest.java | 2 +- .../apache/kafka/streams/integration/JoinIntegrationTest.java | 2 +- .../integration/KStreamAggregationDedupIntegrationTest.java | 2 +- .../streams/integration/KStreamAggregationIntegrationTest.java | 2 +- .../streams/integration/KStreamKTableJoinIntegrationTest.java | 2 +- .../kafka/streams/integration/KStreamRepartitionJoinTest.java | 2 +- .../KStreamsFineGrainedAutoResetIntegrationTest.java | 2 +- .../streams/integration/KTableKTableJoinIntegrationTest.java | 2 +- .../streams/integration/QueryableStateIntegrationTest.java | 2 +- .../kafka/streams/integration/RegexSourceIntegrationTest.java | 2 +- .../apache/kafka/streams/integration/ResetIntegrationTest.java | 2 +- .../kafka/streams/integration/utils/IntegrationTestUtils.java | 1 + 13 files changed, 13 insertions(+), 12 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 2712279d79b6b..94b576dae60ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -98,7 +98,7 @@ public void before() throws InterruptedException { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put("internal.leave.group.on.close", true); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalOne, globalStore); stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream); table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 8ba50f8759839..0ff1d32f85755 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -90,7 +90,7 @@ public void before() { streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put("internal.leave.group.on.close", true); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); } private Properties getTopicConfigProperties(final String changelog) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index b77623bbe53fa..8d95fad45d2d6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -129,7 +129,7 @@ public static void setupConfigsAndUtils() throws Exception { STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - STREAMS_CONFIG.put("internal.leave.group.on.close", true); + STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), 30000, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 0bd4f91680258..372b89c048e53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -97,7 +97,7 @@ public void before() throws InterruptedException { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); - streamsConfiguration.put("internal.leave.group.on.close", true); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); KeyValueMapper mapper = MockKeyValueMapper.SelectValueMapper(); stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index b02f9f954671c..303ec8a1064bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -106,7 +106,7 @@ public void before() throws InterruptedException { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put("internal.leave.group.on.close", true); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); final KeyValueMapper mapper = MockKeyValueMapper.SelectValueMapper(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index 7bc0f4b3240df..18063216a888f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -92,7 +92,7 @@ public void before() throws InterruptedException { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put("internal.leave.group.on.close", true); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index d4590ccfeba58..d3ab176fff510 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -98,7 +98,7 @@ public void before() throws InterruptedException { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); - streamsConfiguration.put("internal.leave.group.on.close", true); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput); streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java index bcf1f7825d369..cff5f43460b2c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java @@ -134,7 +134,7 @@ public void setUp() throws Exception { Properties props = new Properties(); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put("internal.leave.group.on.close", true); + props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration = StreamsTestUtils.getStreamsConfig( "testAutoOffsetId", diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index 590cab01527de..efdd9a0857a4b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -76,7 +76,7 @@ public static void beforeTest() throws Exception { streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfig.put("internal.leave.group.on.close", true); + streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index f158d535648f7..1e16f436fa955 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -137,7 +137,7 @@ public void before() throws IOException, InterruptedException { streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // override this to make the rebalances happen quickly - streamsConfiguration.put("internal.leave.group.on.close", true); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); stringComparator = new Comparator>() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 75a6c0fea326a..5647b1ef246de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -115,7 +115,7 @@ public static void startKafkaCluster() throws Exception { @Before public void setUp() { final Properties properties = new Properties(); - properties.put("internal.leave.group.on.close", true); + properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 659ca4c0d6b4c..afc299fec7698 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -273,7 +273,7 @@ private Properties prepareTest() throws Exception { streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put("internal.leave.group.on.close", true); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 1ca6bd4dab71e..c31647cdab5ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -53,6 +53,7 @@ public class IntegrationTestUtils { public static final long DEFAULT_TIMEOUT = 30 * 1000L; + public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close"; /** * Returns up to `maxMessages` message-values from the topic.