From add6b5e52a0b5c0b35c1a4cd6e0a5a9c395e50b7 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 12 Sep 2016 14:15:44 +0100 Subject: [PATCH 1/4] throw exception if the Cluster object has not been intialized --- .../internals/StreamsMetadataState.java | 6 ++++ .../QueryableStateIntegrationTest.java | 35 +++++++++---------- .../internals/StreamsMetadataStateTest.java | 7 ++++ 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index eeb3bc9ffcfe0..b5167a5086ba2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -105,10 +106,12 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam Objects.requireNonNull(storeName, "storeName can't be null"); Objects.requireNonNull(key, "key can't be null"); + final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); if (sourceTopicsInfo == null) { return null; } + return getStreamsMetadataForKey(storeName, key, new DefaultStreamPartitioner<>(keySerializer, @@ -224,6 +227,9 @@ private class SourceTopicsInfo { private String topicWithMostPartitions; private SourceTopicsInfo(final Set sourceTopics) { + if (clusterMetadata.topics().isEmpty()) { + throw new StreamsException("StreamsMetadata is currently not available as the stream thread has not (re-)initialized yet"); + } this.sourceTopics = sourceTopics; for (String topic : sourceTopics) { final List partitions = clusterMetadata.partitionsForTopic(topic); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 04d36f102da48..b82ef58ead3dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -235,15 +235,15 @@ private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final Kafka TestUtils.waitForCondition(new TestCondition() { @Override public boolean conditionMet() { - final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); - if (metadata == null) { - return false; - } - final int index = metadata.hostInfo().port(); - final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); - final ReadOnlyKeyValueStore store; try { - store = streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore()); + final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); + if (metadata == null) { + return false; + } + final int index = metadata.hostInfo().port(); + final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); + final ReadOnlyKeyValueStore store = streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore()); + return store != null && store.get(key) != null; } catch (final IllegalStateException e) { // Kafka Streams instance may have closed but rebalance hasn't happened return false; @@ -252,7 +252,6 @@ public boolean conditionMet() { return false; } - return store != null && store.get(key) != null; } }, 30000, "waiting for metadata, store and value to be non null"); } @@ -266,15 +265,15 @@ private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final TestUtils.waitForCondition(new TestCondition() { @Override public boolean conditionMet() { - final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); - if (metadata == null) { - return false; - } - final int index = metadata.hostInfo().port(); - final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); - final ReadOnlyWindowStore store; try { - store = streamsWithKey.store(storeName, QueryableStoreTypes.windowStore()); + final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); + if (metadata == null) { + return false; + } + final int index = metadata.hostInfo().port(); + final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); + final ReadOnlyWindowStore store = streamsWithKey.store(storeName, QueryableStoreTypes.windowStore()); + return store != null && store.fetch(key, from, to) != null; } catch (final IllegalStateException e) { // Kafka Streams instance may have closed but rebalance hasn't happened return false; @@ -282,7 +281,7 @@ public boolean conditionMet() { // rebalance return false; } - return store != null && store.fetch(key, from, to) != null; + } }, 30000, "waiting for metadata, store and value to be non null"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index d110277e1ca9b..984eb00823e1f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; @@ -209,6 +210,12 @@ public Integer partition(final String key, final Object value, final int numPart assertEquals(expected, actual); } + @Test(expected = StreamsException.class) + public void shouldThrowStreamsExceptionWhenClusterIsEmpty() throws Exception { + discovery.onChange(Collections.>emptyMap(), Cluster.empty()); + discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer()); + } + @Test public void shouldGetInstanceWithKeyWithMergedStreams() throws Exception { final TopicPartition topic2P2 = new TopicPartition("topic-two", 2); From 57e855cf268236a2a3a70996474e455c20a19770 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 12 Sep 2016 19:18:00 +0100 Subject: [PATCH 2/4] add javadoc strings for exceptions --- .../org/apache/kafka/streams/KafkaStreams.java | 4 ++++ .../processor/internals/StreamsMetadataState.java | 15 +++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 4fabdf7e340d6..b7dfb60fed681 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.processor.StateStore; @@ -308,6 +309,7 @@ public Collection allMetadata() { * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName the storeName to find metadata for * @return A collection containing instances of {@link StreamsMetadata} that have the provided storeName + * @throws StreamsException if streams is (re-)initializing */ public Collection allMetadataForStore(final String storeName) { validateIsRunning(); @@ -330,6 +332,7 @@ public Collection allMetadataForStore(final String storeName) { * @param keySerializer Serializer for the key * @param key type * @return The {@link StreamsMetadata} for the storeName and key + * @throws StreamsException if streams is (re-)initializing */ public StreamsMetadata metadataForKey(final String storeName, final K key, @@ -351,6 +354,7 @@ public StreamsMetadata metadataForKey(final String storeName, * @param partitioner Partitioner for the store * @param key type * @return The {@link StreamsMetadata} for the storeName and key + * @throws StreamsException if streams is (re-)initializing */ public StreamsMetadata metadataForKey(final String storeName, final K key, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index b5167a5086ba2..f9e0a719387e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -67,10 +67,11 @@ public synchronized Collection getAllMetadata() { * * @param storeName the storeName to find metadata for * @return A collection of {@link StreamsMetadata} that have the provided storeName + * @throws StreamsException if streams is (re-)initializing */ public synchronized Collection getAllMetadataForStore(final String storeName) { + validateInitialized(); Objects.requireNonNull(storeName, "storeName cannot be null"); - final Set sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName); if (sourceTopics == null) { return Collections.emptyList(); @@ -98,6 +99,7 @@ public synchronized Collection getAllMetadataForStore(final Str * @param keySerializer Serializer for the key * @param key type * @return The {@link StreamsMetadata} for the storeName and key + * @throws StreamsException if streams is (re-)initializing */ public synchronized StreamsMetadata getMetadataWithKey(final String storeName, final K key, @@ -135,6 +137,7 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam * @param partitioner partitioner to use to find correct partition for key * @param key type * @return The {@link StreamsMetadata} for the storeName and key + * @throws StreamsException if streams is (re-)initializing */ public synchronized StreamsMetadata getMetadataWithKey(final String storeName, final K key, @@ -221,15 +224,19 @@ private SourceTopicsInfo getSourceTopicsInfo(final String storeName) { return new SourceTopicsInfo(sourceTopics); } + private void validateInitialized() { + if (clusterMetadata.topics().isEmpty()) { + throw new StreamsException("StreamsMetadata is currently not available as the stream thread has not (re-)initialized yet"); + } + } + private class SourceTopicsInfo { private final Set sourceTopics; private int maxPartitions; private String topicWithMostPartitions; private SourceTopicsInfo(final Set sourceTopics) { - if (clusterMetadata.topics().isEmpty()) { - throw new StreamsException("StreamsMetadata is currently not available as the stream thread has not (re-)initialized yet"); - } + validateInitialized(); this.sourceTopics = sourceTopics; for (String topic : sourceTopics) { final List partitions = clusterMetadata.partitionsForTopic(topic); From 327f9a5b7a68c76806b4d6528749c47e2e1a01b9 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 14 Sep 2016 12:07:10 +0200 Subject: [PATCH 3/4] return StreamsMetadata.NOT_AVAILABLE when the cluster is empty, i.e, not intialized --- .../apache/kafka/streams/KafkaStreams.java | 11 ++++--- .../internals/StreamsMetadataState.java | 30 +++++++++++-------- .../kafka/streams/state/StreamsMetadata.java | 9 ++++++ .../internals/QueryableStoreProvider.java | 2 +- .../StreamThreadStateStoreProvider.java | 4 +-- .../internals/WrappingStoreProvider.java | 4 +-- .../internals/StreamsMetadataStateTest.java | 8 ++--- 7 files changed, 41 insertions(+), 27 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index b7dfb60fed681..bc52a20b4709a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.processor.StateStore; @@ -309,7 +308,6 @@ public Collection allMetadata() { * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName the storeName to find metadata for * @return A collection containing instances of {@link StreamsMetadata} that have the provided storeName - * @throws StreamsException if streams is (re-)initializing */ public Collection allMetadataForStore(final String storeName) { validateIsRunning(); @@ -331,8 +329,8 @@ public Collection allMetadataForStore(final String storeName) { * @param key Key to use to for partition * @param keySerializer Serializer for the key * @param key type - * @return The {@link StreamsMetadata} for the storeName and key - * @throws StreamsException if streams is (re-)initializing + * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE} + * if streams is (re-)initializing */ public StreamsMetadata metadataForKey(final String storeName, final K key, @@ -353,8 +351,8 @@ public StreamsMetadata metadataForKey(final String storeName, * @param key Key to use to for partition * @param partitioner Partitioner for the store * @param key type - * @return The {@link StreamsMetadata} for the storeName and key - * @throws StreamsException if streams is (re-)initializing + * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE} + * if streams is (re-)initializing */ public StreamsMetadata metadataForKey(final String storeName, final K key, @@ -372,6 +370,7 @@ public StreamsMetadata metadataForKey(final String storeName, * @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} * @param return type * @return A facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances + * @throws org.apache.kafka.streams.errors.InvalidStateStoreException if the streams are (re-)initializing */ public T store(final String storeName, final QueryableStoreType queryableStoreType) { validateIsRunning(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index f9e0a719387e9..6f9bea69af235 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -67,11 +66,14 @@ public synchronized Collection getAllMetadata() { * * @param storeName the storeName to find metadata for * @return A collection of {@link StreamsMetadata} that have the provided storeName - * @throws StreamsException if streams is (re-)initializing */ public synchronized Collection getAllMetadataForStore(final String storeName) { - validateInitialized(); Objects.requireNonNull(storeName, "storeName cannot be null"); + + if (!isInitialized()) { + return Collections.emptyList(); + } + final Set sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName); if (sourceTopics == null) { return Collections.emptyList(); @@ -98,8 +100,8 @@ public synchronized Collection getAllMetadataForStore(final Str * @param key Key to use * @param keySerializer Serializer for the key * @param key type - * @return The {@link StreamsMetadata} for the storeName and key - * @throws StreamsException if streams is (re-)initializing + * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE} + * if streams is (re-)initializing */ public synchronized StreamsMetadata getMetadataWithKey(final String storeName, final K key, @@ -108,6 +110,9 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam Objects.requireNonNull(storeName, "storeName can't be null"); Objects.requireNonNull(key, "key can't be null"); + if (!isInitialized()) { + return StreamsMetadata.NOT_AVAILABLE; + } final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); if (sourceTopicsInfo == null) { @@ -136,8 +141,8 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam * @param key Key to use * @param partitioner partitioner to use to find correct partition for key * @param key type - * @return The {@link StreamsMetadata} for the storeName and key - * @throws StreamsException if streams is (re-)initializing + * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE} + * if streams is (re-)initializing */ public synchronized StreamsMetadata getMetadataWithKey(final String storeName, final K key, @@ -146,6 +151,10 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam Objects.requireNonNull(key, "key can't be null"); Objects.requireNonNull(partitioner, "partitioner can't be null"); + if (!isInitialized()) { + return StreamsMetadata.NOT_AVAILABLE; + } + SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); if (sourceTopicsInfo == null) { return null; @@ -224,10 +233,8 @@ private SourceTopicsInfo getSourceTopicsInfo(final String storeName) { return new SourceTopicsInfo(sourceTopics); } - private void validateInitialized() { - if (clusterMetadata.topics().isEmpty()) { - throw new StreamsException("StreamsMetadata is currently not available as the stream thread has not (re-)initialized yet"); - } + private boolean isInitialized() { + return !clusterMetadata.topics().isEmpty(); } private class SourceTopicsInfo { @@ -236,7 +243,6 @@ private class SourceTopicsInfo { private String topicWithMostPartitions; private SourceTopicsInfo(final Set sourceTopics) { - validateInitialized(); this.sourceTopics = sourceTopics; for (String topic : sourceTopics) { final List partitions = clusterMetadata.partitionsForTopic(topic); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java index 541221fcbe1d6..9602bfe10a5ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KafkaStreams; +import java.util.Collections; import java.util.Set; /** @@ -29,6 +30,14 @@ * NOTE: This is a point in time view. It may change when rebalances happen. */ public class StreamsMetadata { + /** + * Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance + * operations. + */ + public final static StreamsMetadata NOT_AVAILABLE = new StreamsMetadata(new HostInfo("unavailable", -1), + Collections.emptySet(), + Collections.emptySet()); + private final HostInfo hostInfo; private final Set stateStoreNames; private final Set topicPartitions; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java index 640761ca08d13..64dac1f24b124 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java @@ -46,7 +46,7 @@ public T getStore(final String storeName, final QueryableStoreType querya allStores.addAll(storeProvider.stores(storeName, queryableStoreType)); } if (allStores.isEmpty()) { - throw new InvalidStateStoreException("Store: " + storeName + " is currently not available"); + throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); } return queryableStoreType.create( new WrappingStoreProvider(storeProviders), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index e761ed0062fed..3a50a68a7d09b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -39,14 +39,14 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread) { @Override public List stores(final String storeName, final QueryableStoreType queryableStoreType) { if (!streamThread.isInitialized()) { - throw new InvalidStateStoreException("Store: " + storeName + " is currently not available as the stream thread has not (re-)initialized yet"); + throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); } final List stores = new ArrayList<>(); for (StreamTask streamTask : streamThread.tasks().values()) { final StateStore store = streamTask.getStore(storeName); if (store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { - throw new InvalidStateStoreException("Store: " + storeName + " isn't isOpen"); + throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); } stores.add((T) store); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java index 167211251d1b8..696b7a5fcda30 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java @@ -48,8 +48,8 @@ public List stores(final String storeName, QueryableStoreType type) { allStores.addAll(stores); } if (allStores.isEmpty()) { - throw new InvalidStateStoreException("Store " + storeName + " is currently " - + "unavailable"); + throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); + } return allStores; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index 984eb00823e1f..03280a831a58e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; @@ -210,10 +209,11 @@ public Integer partition(final String key, final Object value, final int numPart assertEquals(expected, actual); } - @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionWhenClusterIsEmpty() throws Exception { + @Test + public void shouldReturnNotAvailableWhenClusterIsEmpty() throws Exception { discovery.onChange(Collections.>emptyMap(), Cluster.empty()); - discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer()); + final StreamsMetadata result = discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer()); + assertEquals(StreamsMetadata.NOT_AVAILABLE, result); } @Test From afcd964e2d6e8e28b4c3ee85028946f6bae4c387 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 16 Sep 2016 10:31:36 +0100 Subject: [PATCH 4/4] address comments --- .../apache/kafka/streams/KafkaStreams.java | 3 ++- .../CompositeReadOnlyKeyValueStore.java | 24 +++++++++++++++---- .../CompositeReadOnlyWindowStore.java | 15 ++++++++---- .../internals/WrappingStoreProvider.java | 1 - .../CompositeReadOnlyKeyValueStoreTest.java | 21 ++++++++-------- .../CompositeReadOnlyWindowStoreTest.java | 19 +++++++++++++-- .../internals/QueryableStoreProviderTest.java | 3 ++- .../internals/ReadOnlyWindowStoreStub.java | 11 ++++++++- .../internals/WrappingStoreProviderTest.java | 5 ++-- .../StateStoreProviderStub.java | 13 +++++++++- 10 files changed, 86 insertions(+), 29 deletions(-) rename streams/src/test/java/org/apache/kafka/{streams/state/internals => test}/StateStoreProviderStub.java (79%) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index bc52a20b4709a..d88d09ef1d90d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -370,7 +370,8 @@ public StreamsMetadata metadataForKey(final String storeName, * @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} * @param return type * @return A facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances - * @throws org.apache.kafka.streams.errors.InvalidStateStoreException if the streams are (re-)initializing + * @throws org.apache.kafka.streams.errors.InvalidStateStoreException if the streams are (re-)initializing or + * a store with storeName and queryableStoreType doesnt' exist. */ public T store(final String storeName, final QueryableStoreType queryableStoreType) { validateIsRunning(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java index 5ed1d35dee23d..5c47419a1bc25 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java @@ -15,6 +15,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -48,10 +49,15 @@ public CompositeReadOnlyKeyValueStore(final StateStoreProvider storeProvider, public V get(final K key) { final List> stores = storeProvider.stores(storeName, storeType); for (ReadOnlyKeyValueStore store : stores) { - V result = store.get(key); - if (result != null) { - return result; + try { + final V result = store.get(key); + if (result != null) { + return result; + } + } catch (InvalidStateStoreException e) { + throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); } + } return null; } @@ -61,7 +67,11 @@ public KeyValueIterator range(final K from, final K to) { final NextIteratorFunction nextIteratorFunction = new NextIteratorFunction() { @Override public KeyValueIterator apply(final ReadOnlyKeyValueStore store) { - return store.range(from, to); + try { + return store.range(from, to); + } catch (InvalidStateStoreException e) { + throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); + } } }; final List> stores = storeProvider.stores(storeName, storeType); @@ -73,7 +83,11 @@ public KeyValueIterator all() { final NextIteratorFunction nextIteratorFunction = new NextIteratorFunction() { @Override public KeyValueIterator apply(final ReadOnlyKeyValueStore store) { - return store.all(); + try { + return store.all(); + } catch (InvalidStateStoreException e) { + throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); + } } }; final List> stores = storeProvider.stores(storeName, storeType); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java index 9f58e17852ded..b33c0f0ceab3d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java @@ -15,6 +15,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -44,11 +45,15 @@ public CompositeReadOnlyWindowStore(final StateStoreProvider provider, public WindowStoreIterator fetch(final K key, final long timeFrom, final long timeTo) { final List> stores = provider.stores(storeName, windowStoreType); for (ReadOnlyWindowStore windowStore : stores) { - final WindowStoreIterator result = windowStore.fetch(key, timeFrom, timeTo); - if (!result.hasNext()) { - result.close(); - } else { - return result; + try { + final WindowStoreIterator result = windowStore.fetch(key, timeFrom, timeTo); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (InvalidStateStoreException e) { + throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); } } return new WindowStoreIterator() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java index 696b7a5fcda30..eb1bc6473870a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java @@ -49,7 +49,6 @@ public List stores(final String storeName, QueryableStoreType type) { } if (allStores.isEmpty()) { throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); - } return allStores; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index cdc5ac7a36d7d..05c32f02d7d8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.test.StateStoreProviderStub; import org.junit.Before; import org.junit.Test; @@ -42,8 +43,8 @@ public class CompositeReadOnlyKeyValueStoreTest { @SuppressWarnings("unchecked") @Before public void before() { - final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(); - stubProviderTwo = new StateStoreProviderStub(); + final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false); + stubProviderTwo = new StateStoreProviderStub(false); stubOneUnderlying = newStoreInstance(); stubProviderOne.addStore(storeName, stubOneUnderlying); @@ -148,19 +149,19 @@ public void shouldSupportAllAcrossMultipleStores() throws Exception { } @Test(expected = InvalidStateStoreException.class) - public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnGet() throws Exception { - noStores().get("anything"); + public void shouldThrowInvalidStoreExceptionDuringRebalance() throws Exception { + rebalancing().get("anything"); } @Test(expected = InvalidStateStoreException.class) - public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnRange() throws Exception { - noStores().range("anything", "something"); + public void shouldThrowInvalidStoreExceptionOnRangeDuringRebalance() throws Exception { + rebalancing().range("anything", "something"); } @Test(expected = InvalidStateStoreException.class) - public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnAll() throws Exception { - noStores().all(); + public void shouldThrowInvalidStoreExceptionOnAllDuringRebalance() throws Exception { + rebalancing().all(); } @Test @@ -192,8 +193,8 @@ public long approximateNumEntries() { assertEquals(Long.MAX_VALUE, theStore.approximateNumEntries()); } - private CompositeReadOnlyKeyValueStore noStores() { - return new CompositeReadOnlyKeyValueStore<>(new WrappingStoreProvider(Collections.emptyList()), + private CompositeReadOnlyKeyValueStore rebalancing() { + return new CompositeReadOnlyKeyValueStore<>(new WrappingStoreProvider(Collections.singletonList(new StateStoreProviderStub(true))), QueryableStoreTypes.keyValueStore(), storeName); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java index 825c1e8f42bf5..d098429e28a83 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java @@ -15,8 +15,10 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.test.StateStoreProviderStub; import org.junit.Before; import org.junit.Test; @@ -42,8 +44,8 @@ public class CompositeReadOnlyWindowStoreTest { @Before public void before() { - stubProviderOne = new StateStoreProviderStub(); - stubProviderTwo = new StateStoreProviderStub(); + stubProviderOne = new StateStoreProviderStub(false); + stubProviderTwo = new StateStoreProviderStub(false); underlyingWindowStore = new ReadOnlyWindowStoreStub<>(); stubProviderOne.addStore(storeName, underlyingWindowStore); @@ -103,6 +105,19 @@ public void shouldNotGetValuesFromOtherStores() throws Exception { assertEquals(Collections.singletonList(new KeyValue<>(1L, "my-value")), results); } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowInvalidStateStoreExceptionOnRebalance() throws Exception { + final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>(new StateStoreProviderStub(true), QueryableStoreTypes.windowStore(), "foo"); + store.fetch("key", 1, 10); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowInvalidStateStoreExceptionIfFetchThrows() throws Exception { + underlyingWindowStore.setOpen(false); + underlyingWindowStore.fetch("key", 1, 10); + } + static List> toList(final Iterator> iterator) { final List> results = new ArrayList<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java index 276930ffd2f64..3660e8eb2a0b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.NoOpWindowStore; import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.test.StateStoreProviderStub; import org.junit.Before; import org.junit.Test; @@ -33,7 +34,7 @@ public class QueryableStoreProviderTest { @Before public void before() { - final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub(); + final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub(false); theStoreProvider.addStore(keyValueStore, new StateStoreTestUtils.NoOpReadOnlyStore<>()); theStoreProvider.addStore(windowStore, new NoOpWindowStore()); storeProvider = diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index 5b88eb8efb334..2082e00cba0bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -15,6 +15,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; @@ -32,9 +33,13 @@ public class ReadOnlyWindowStoreStub implements ReadOnlyWindowStore, StateStore { private final Map> data = new HashMap<>(); + private boolean open = true; @Override public WindowStoreIterator fetch(final K key, final long timeFrom, final long timeTo) { + if (!open) { + throw new InvalidStateStoreException("Store is not open"); + } final List> results = new ArrayList<>(); for (long now = timeFrom; now <= timeTo; now++) { final Map kvMap = data.get(now); @@ -79,7 +84,11 @@ public boolean persistent() { @Override public boolean isOpen() { - return false; + return open; + } + + public void setOpen(final boolean open) { + this.open = open; } private class TheWindowStoreIterator implements WindowStoreIterator { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index 710557ea91235..708e1534dcc1e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.test.StateStoreProviderStub; import org.junit.Before; import org.junit.Test; @@ -37,8 +38,8 @@ public class WrappingStoreProviderTest { @Before public void before() { - final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(); - final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(); + final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false); + final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false); stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java similarity index 79% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java rename to streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java index 3712990005d90..f17777fab3e5b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java +++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java @@ -12,10 +12,12 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.apache.kafka.streams.state.internals; +package org.apache.kafka.test; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; +import org.apache.kafka.streams.state.internals.StateStoreProvider; import java.util.Collections; import java.util.HashMap; @@ -25,10 +27,19 @@ public class StateStoreProviderStub implements StateStoreProvider { private final Map stores = new HashMap<>(); + private final boolean throwException; + + public StateStoreProviderStub(final boolean throwException) { + + this.throwException = throwException; + } @SuppressWarnings("unchecked") @Override public List stores(final String storeName, final QueryableStoreType queryableStoreType) { + if (throwException) { + throw new InvalidStateStoreException("store is unavailable"); + } if (stores.containsKey(storeName) && queryableStoreType.accepts(stores.get(storeName))) { return (List) Collections.singletonList(stores.get(storeName)); }