From 40f63eb9c1775bd292a5d028412fb784b7854701 Mon Sep 17 00:00:00 2001 From: Gitomain Date: Tue, 12 Jun 2018 20:54:07 +0200 Subject: [PATCH] KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900) Reviewer: Matthias J. Sax , Bill Bejeck , Guozhang Wang --- .gitignore | 1 + kafka | 1 + .../internals/GlobalStateManagerImpl.java | 2 +- .../GlobalKTableEOSIntegrationTest.java | 390 ++++++++++++++++++ .../GlobalKTableIntegrationTest.java | 66 +-- .../utils/IntegrationTestUtils.java | 35 +- 6 files changed, 429 insertions(+), 66 deletions(-) create mode 160000 kafka create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java diff --git a/.gitignore b/.gitignore index 04f8feed0ad7..fe191eed44b3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ dist *classes +*.class target/ build/ build_eclipse/ diff --git a/kafka b/kafka new file mode 160000 index 000000000000..cc43e77bbbfa --- /dev/null +++ b/kafka @@ -0,0 +1 @@ +Subproject commit cc43e77bbbfad71883011186de55603c936cbcd1 diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 4fd7a591eb66..79088d988062 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -271,8 +271,8 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, if (record.key() != null) { restoreRecords.add(KeyValue.pair(record.key(), record.value())); } - offset = globalConsumer.position(topicPartition); } + offset = globalConsumer.position(topicPartition); stateRestoreAdapter.restoreAll(restoreRecords); stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); restoreCount += restoreRecords.size(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java new file mode 100644 index 000000000000..f7c0e55c05e0 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import kafka.utils.MockTime; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.GlobalKTable; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +@Category({IntegrationTest.class}) +public class GlobalKTableEOSIntegrationTest { + private static final int NUM_BROKERS = 1; + private static final Properties BROKER_CONFIG; + static { + BROKER_CONFIG = new Properties(); + BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1); + BROKER_CONFIG.put("transaction.state.log.min.isr", 1); + } + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = + new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG); + + private static volatile int testNo = 0; + private final MockTime mockTime = CLUSTER.time; + private final KeyValueMapper keyMapper = new KeyValueMapper() { + @Override + public Long apply(final String key, final Long value) { + return value; + } + }; + private final ValueJoiner joiner = new ValueJoiner() { + @Override + public String apply(final Long value1, final String value2) { + return value1 + "+" + value2; + } + }; + private final String globalStore = "globalStore"; + private final Map results = new HashMap<>(); + private StreamsBuilder builder; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + private String globalTableTopic; + private String streamTopic; + private GlobalKTable globalTable; + private KStream stream; + private ForeachAction foreachAction; + + @Before + public void before() throws InterruptedException { + testNo++; + builder = new StreamsBuilder(); + createTopics(); + streamsConfiguration = new Properties(); + final String applicationId = "globalTableTopic-table-eos-test-" + testNo; + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), + Materialized.>as(globalStore) + .withKeySerde(Serdes.Long()) + .withValueSerde(Serdes.String())); + final Consumed stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); + stream = builder.stream(streamTopic, stringLongConsumed); + foreachAction = new ForeachAction() { + @Override + public void apply(final String key, final String value) { + results.put(key, value); + } + }; + } + + @After + public void whenShuttingDown() throws IOException { + if (kafkaStreams != null) { + kafkaStreams.close(); + } + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void shouldKStreamGlobalKTableLeftJoin() throws Exception { + final KStream streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner); + streamTableJoin.foreach(foreachAction); + produceInitialGlobalTableValues(); + startStreams(); + produceTopicValues(streamTopic); + + final Map expected = new HashMap<>(); + expected.put("a", "1+A"); + expected.put("b", "2+B"); + expected.put("c", "3+C"); + expected.put("d", "4+D"); + expected.put("e", "5+null"); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return results.equals(expected); + } + }, 30000L, "waiting for initial values"); + + + produceGlobalTableValues(); + + final ReadOnlyKeyValueStore replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return "J".equals(replicatedStore.get(5L)); + } + }, 30000, "waiting for data in replicated store"); + produceTopicValues(streamTopic); + + expected.put("a", "1+F"); + expected.put("b", "2+G"); + expected.put("c", "3+H"); + expected.put("d", "4+I"); + expected.put("e", "5+J"); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return results.equals(expected); + } + }, 30000L, "waiting for final values"); + } + + @Test + public void shouldKStreamGlobalKTableJoin() throws Exception { + final KStream streamTableJoin = stream.join(globalTable, keyMapper, joiner); + streamTableJoin.foreach(foreachAction); + produceInitialGlobalTableValues(); + startStreams(); + produceTopicValues(streamTopic); + + final Map expected = new HashMap<>(); + expected.put("a", "1+A"); + expected.put("b", "2+B"); + expected.put("c", "3+C"); + expected.put("d", "4+D"); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return results.equals(expected); + } + }, 30000L, "waiting for initial values"); + + + produceGlobalTableValues(); + + final ReadOnlyKeyValueStore replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return "J".equals(replicatedStore.get(5L)); + } + }, 30000, "waiting for data in replicated store"); + + produceTopicValues(streamTopic); + + expected.put("a", "1+F"); + expected.put("b", "2+G"); + expected.put("c", "3+H"); + expected.put("d", "4+I"); + expected.put("e", "5+J"); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return results.equals(expected); + } + }, 30000L, "waiting for final values"); + } + + @Test + public void shouldRestoreTransactionalMessages() throws Exception { + produceInitialGlobalTableValues(); + + startStreams(); + + final Map expected = new HashMap<>(); + expected.put(1L, "A"); + expected.put(2L, "B"); + expected.put(3L, "C"); + expected.put(4L, "D"); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + ReadOnlyKeyValueStore store = null; + try { + store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + } catch (InvalidStateStoreException ex) { + return false; + } + Map result = new HashMap<>(); + Iterator> it = store.all(); + while (it.hasNext()) { + KeyValue kv = it.next(); + result.put(kv.key, kv.value); + } + return result.equals(expected); + } + }, 30000L, "waiting for initial values"); + } + + @Test + public void shouldNotRestoreAbortedMessages() throws Exception { + produceAbortedMessages(); + produceInitialGlobalTableValues(); + produceAbortedMessages(); + + startStreams(); + + final Map expected = new HashMap<>(); + expected.put(1L, "A"); + expected.put(2L, "B"); + expected.put(3L, "C"); + expected.put(4L, "D"); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + ReadOnlyKeyValueStore store = null; + try { + store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + } catch (InvalidStateStoreException ex) { + return false; + } + Map result = new HashMap<>(); + Iterator> it = store.all(); + while (it.hasNext()) { + KeyValue kv = it.next(); + result.put(kv.key, kv.value); + } + return result.equals(expected); + } + }, 30000L, "waiting for initial values"); + } + + private void createTopics() throws InterruptedException { + streamTopic = "stream-" + testNo; + globalTableTopic = "globalTable-" + testNo; + CLUSTER.createTopics(streamTopic); + CLUSTER.createTopic(globalTableTopic, 2, 1); + } + + private void startStreams() { + kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); + kafkaStreams.start(); + } + + private void produceTopicValues(final String topic) throws Exception { + IntegrationTestUtils.produceKeyValuesSynchronously( + topic, + Arrays.asList( + new KeyValue<>("a", 1L), + new KeyValue<>("b", 2L), + new KeyValue<>("c", 3L), + new KeyValue<>("d", 4L), + new KeyValue<>("e", 5L)), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + LongSerializer.class, + new Properties()), + mockTime); + } + + private void produceAbortedMessages() throws Exception { + final Properties properties = new Properties(); + properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); + properties.put(ProducerConfig.RETRIES_CONFIG, 1); + IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp( + globalTableTopic, Arrays.asList( + new KeyValue<>(1L, "A"), + new KeyValue<>(2L, "B"), + new KeyValue<>(3L, "C"), + new KeyValue<>(4L, "D") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + LongSerializer.class, + StringSerializer.class, + properties), + mockTime.milliseconds()); + } + + private void produceInitialGlobalTableValues() throws Exception { + produceInitialGlobalTableValues(true); + } + + private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception { + final Properties properties = new Properties(); + if (enableTransactions) { + properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); + properties.put(ProducerConfig.RETRIES_CONFIG, 1); + } + IntegrationTestUtils.produceKeyValuesSynchronously( + globalTableTopic, + Arrays.asList( + new KeyValue<>(1L, "A"), + new KeyValue<>(2L, "B"), + new KeyValue<>(3L, "C"), + new KeyValue<>(4L, "D") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + LongSerializer.class, + StringSerializer.class, + properties), + mockTime, + enableTransactions); + } + + private void produceGlobalTableValues() throws Exception { + IntegrationTestUtils.produceKeyValuesSynchronously( + globalTableTopic, + Arrays.asList( + new KeyValue<>(1L, "F"), + new KeyValue<>(2L, "G"), + new KeyValue<>(3L, "H"), + new KeyValue<>(4L, "I"), + new KeyValue<>(5L, "J")), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + LongSerializer.class, + StringSerializer.class, + new Properties()), + mockTime); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 8c6a30a5972f..900e65276ee9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -18,7 +18,6 @@ import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; @@ -28,7 +27,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.ForeachAction; @@ -52,23 +50,16 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Properties; @Category({IntegrationTest.class}) public class GlobalKTableIntegrationTest { private static final int NUM_BROKERS = 1; - private static final Properties BROKER_CONFIG; - static { - BROKER_CONFIG = new Properties(); - BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1); - BROKER_CONFIG.put("transaction.state.log.min.isr", 1); - } @ClassRule public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG); + new EmbeddedKafkaCluster(NUM_BROKERS); private static volatile int testNo = 0; private final MockTime mockTime = CLUSTER.time; @@ -229,46 +220,14 @@ public boolean conditionMet() { } }, 30000L, "waiting for final values"); } - - @Test - public void shouldRestoreTransactionalMessages() throws Exception { - produceInitialGlobalTableValues(true); - startStreams(); - - final Map expected = new HashMap<>(); - expected.put(1L, "A"); - expected.put(2L, "B"); - expected.put(3L, "C"); - expected.put(4L, "D"); - - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - ReadOnlyKeyValueStore store = null; - try { - store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); - } catch (InvalidStateStoreException ex) { - return false; - } - Map result = new HashMap<>(); - Iterator> it = store.all(); - while (it.hasNext()) { - KeyValue kv = it.next(); - result.put(kv.key, kv.value); - } - return result.equals(expected); - } - }, 30000L, "waiting for initial values"); - System.out.println("no failed test"); - } - + private void createTopics() throws InterruptedException { streamTopic = "stream-" + testNo; globalTableTopic = "globalTable-" + testNo; CLUSTER.createTopics(streamTopic); CLUSTER.createTopic(globalTableTopic, 2, 1); } - + private void startStreams() { kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); kafkaStreams.start(); @@ -292,29 +251,20 @@ private void produceTopicValues(final String topic) throws Exception { } private void produceInitialGlobalTableValues() throws Exception { - produceInitialGlobalTableValues(false); - } - - private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception { - Properties properties = new Properties(); - if (enableTransactions) { - properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); - properties.put(ProducerConfig.RETRIES_CONFIG, 1); - } IntegrationTestUtils.produceKeyValuesSynchronously( globalTableTopic, Arrays.asList( new KeyValue<>(1L, "A"), new KeyValue<>(2L, "B"), new KeyValue<>(3L, "C"), - new KeyValue<>(4L, "D")), + new KeyValue<>(4L, "D") + ), TestUtils.producerConfig( CLUSTER.bootstrapServers(), LongSerializer.class, - StringSerializer.class, - properties), - mockTime, - enableTransactions); + StringSerializer.class + ), + mockTime); } private void produceGlobalTableValues() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 86cb331956c0..2ab6639ce05a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -180,16 +180,38 @@ public static void produceKeyValuesSynchronouslyWithTimestamp(final Strin producer.flush(); } } + + public static void produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic, + final Collection> records, + final Properties producerConfig, + final Long timestamp) + throws ExecutionException, InterruptedException { + try (final Producer producer = new KafkaProducer<>(producerConfig)) { + producer.initTransactions(); + for (final KeyValue record : records) { + producer.beginTransaction(); + final Future f = producer + .send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value)); + f.get(); + producer.abortTransaction(); + } + } + } - public static void produceValuesSynchronously( - final String topic, final Collection records, final Properties producerConfig, final Time time) + public static void produceValuesSynchronously(final String topic, + final Collection records, + final Properties producerConfig, + final Time time) throws ExecutionException, InterruptedException { IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false); } - public static void produceValuesSynchronously( - final String topic, final Collection records, final Properties producerConfig, final Time time, final boolean enableTransactions) - throws ExecutionException, InterruptedException { + public static void produceValuesSynchronously(final String topic, + final Collection records, + final Properties producerConfig, + final Time time, + final boolean enableTransactions) + throws ExecutionException, InterruptedException { final Collection> keyedRecords = new ArrayList<>(); for (final V value : records) { final KeyValue kv = new KeyValue<>(null, value); @@ -241,10 +263,9 @@ public static List> waitUntilMinRecordsReceived(fina public static List> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords) throws InterruptedException { - return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } - + /** * Wait until enough data (key-value records) has been consumed. *