From b56c30db45337242b194f1b0217d168d6ee2b733 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Sat, 27 Jan 2018 19:22:07 +0100 Subject: [PATCH 1/2] STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of using enable.auto.commit --- docs/storm-kafka-client.md | 42 +----- .../apache/storm/kafka/spout/KafkaSpout.java | 57 ++++---- .../storm/kafka/spout/KafkaSpoutConfig.java | 132 +++++++++--------- .../kafka/spout/KafkaSpoutConfigTest.java | 17 ++- .../KafkaSpoutMessagingGuaranteeTest.java | 55 ++++++-- .../storm/annotation/InterfaceStability.java | 54 +++++++ 6 files changed, 221 insertions(+), 136 deletions(-) create mode 100644 storm-core/src/jvm/org/apache/storm/annotation/InterfaceStability.java diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index 0a12910da52..2992dd6697d 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -152,18 +152,15 @@ of Java generics. The deserializers can be specified via the consumer propertie There are a few key configs to pay attention to. `setFirstPollOffsetStrategy` allows you to set where to start consuming data from. This is used both in case of failure recovery and starting the spout -for the first time. Allowed values include +for the first time. The allowed values are listed in the [FirstPollOffsetStrategy javadocs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html). - * `EARLIEST` means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits - * `LATEST` means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits - * `UNCOMMITTED_EARLIEST` (DEFAULT) means that the kafka spout polls records from the last committed offset, if any. If no offset has been committed, it behaves as `EARLIEST`. - * `UNCOMMITTED_LATEST` means that the kafka spout polls records from the last committed offset, if any. If no offset has been committed, it behaves as `LATEST`. +`setProcessingGuarantee` lets you configure what processing guarantees the spout will provide. This affects how soon consumed offsets can be committed, and the frequency of commits. See the [ProcessingGuarantee javadoc](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.html) for details. `setRecordTranslator` allows you to modify how the spout converts a Kafka Consumer Record into a Tuple, and which stream that tuple will be published into. By default the "topic", "partition", "offset", "key", and "value" will be emitted to the "default" stream. If you want to output entries to different streams based on the topic, storm provides `ByTopicRecordTranslator`. See below for more examples on how to use these. -`setProp` and `setProps` can be used to set KafkaConsumer properties. The list of these properties can be found in the KafkaConsumer configuration documentation on the [Kafka website](http://kafka.apache.org/documentation.html#consumerconfigs). +`setProp` and `setProps` can be used to set KafkaConsumer properties. The list of these properties can be found in the KafkaConsumer configuration documentation on the [Kafka website](http://kafka.apache.org/documentation.html#consumerconfigs). Note that KafkaConsumer autocommit is unsupported. The KafkaSpoutConfig constructor will throw an exception if the "enable.auto.commit" property is set, and the consumer used by the spout will always have that property set to false. You can configure similar behavior to autocommit through the `setProcessingGuarantee` method on the KafkaSpoutConfig builder. ### Usage Examples @@ -324,7 +321,7 @@ When selecting a kafka client version, you should ensure - # Kafka Spout Performance Tuning -The Kafka spout provides two internal parameters to control its performance. The parameters can be set using the [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) methods [setOffsetCommitPeriodMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193) and [setMaxUncommittedOffsets](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217). +The Kafka spout provides two internal parameters to control its performance. The parameters can be set using the [setOffsetCommitPeriodMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setOffsetCommitPeriodMs-long-) and [setMaxUncommittedOffsets](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setMaxUncommittedOffsets-int-) methods. * "offset.commit.period.ms" controls how often the spout commits to Kafka * "max.uncommitted.offsets" controls how many offsets can be pending commit before another poll can take place @@ -334,7 +331,7 @@ The [Kafka consumer config] (http://kafka.apache.org/documentation.html#consumer * “fetch.min.bytes” * “fetch.max.wait.ms” -* [Kafka Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) instance poll timeout, which is specified for each Kafka spout using the [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) method [setPollTimeoutMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184) +* [Kafka Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) instance poll timeout, which is specified for each Kafka spout using the [setPollTimeoutMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setPollTimeoutMs-long-) method.
Depending on the structure of your Kafka cluster, distribution of the data, and availability of data to poll, these parameters will have to be configured appropriately. Please refer to the Kafka documentation on Kafka parameter tuning. @@ -348,35 +345,6 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 10000000
-# Processing Guarantees - -The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when an offset is committed to Kafka. This is -conceptually equivalent to marking the tuple with the `ConsumerRecord` for that offset as being successfully processed -because the tuple won't get re-emitted in case of failure or time out. - -For the AT_LEAST_ONCE and AT_MOST_ONCE processing guarantees the spout controls when the commit happens. -When the guarantee is NONE Kafka controls when the commit happens. - -* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) - and acked. If a tuple fails or times out it will be re-emitted. A tuple can be processed more than once if for instance - the ack gets lost. - -* AT_MOST_ONCE - Offsets will be committed to Kafka right after being polled but before being emitted to the downstream - components of the topology. Offsets are processed at most once because tuples that fail or timeout won't be retried. - -* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties - "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens - it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times. - This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown. - -To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows: - -```java -KafkaSpoutConfig kafkaConf = KafkaSpoutConfig - .builder(String bootstrapServers, String ... topics) - .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) -``` - # Tuple Tracking By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 361882047f9..27940d08415 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -54,6 +54,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee; import org.apache.storm.kafka.spout.internal.CommitMetadata; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; @@ -91,7 +92,7 @@ public class KafkaSpout extends BaseRichSpout { private transient KafkaSpoutRetryService retryService; // Handles tuple events (emit, ack etc.) private transient KafkaTupleListener tupleListener; - // timer == null if processing guarantee is none or at-most-once + // timer == null only if the processing guarantee is at-most-once private transient Timer commitTimer; // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() @@ -135,8 +136,8 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect tupleListener = kafkaSpoutConfig.getTupleListener(); - if (isAtLeastOnceProcessing()) { - // Only used if the spout should commit an offset to Kafka only after the corresponding tuple has been acked. + if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { + // In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); @@ -321,24 +322,28 @@ private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMeta @Override public void nextTuple() { try { - if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { - kafkaSpoutConfig.getSubscription().refreshAssignment(); - } + if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { + kafkaSpoutConfig.getSubscription().refreshAssignment(); + } - if (shouldCommit()) { + if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) { + if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(kafkaConsumer.assignment()); + } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) { + commitFetchedOffsetsAsync(kafkaConsumer.assignment()); } + } - PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo(); - if (pollablePartitionsInfo.shouldPoll()) { - try { - setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo)); - } catch (RetriableException e) { - LOG.error("Failed to poll from kafka.", e); - } + PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo(); + if (pollablePartitionsInfo.shouldPoll()) { + try { + setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo)); + } catch (RetriableException e) { + LOG.error("Failed to poll from kafka.", e); } + } - emitIfWaitingNotEmitted(); + emitIfWaitingNotEmitted(); } catch (InterruptException e) { throwKafkaConsumerInterruptedException(); } @@ -350,10 +355,6 @@ private void throwKafkaConsumerInterruptedException() { throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted")); } - private boolean shouldCommit() { - return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode - } - private PollablePartitionsInfo getPollablePartitionsInfo() { if (isWaitingToEmit()) { LOG.debug("Not polling. Tuples waiting to be emitted."); @@ -546,6 +547,15 @@ private boolean isEmitTuple(List tuple) { return tuple != null || kafkaSpoutConfig.isEmitNullTuples(); } + private void commitFetchedOffsetsAsync(Set assignedPartitions) { + Map offsetsToCommit = new HashMap<>(); + for (TopicPartition tp : assignedPartitions) { + offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp))); + } + kafkaConsumer.commitAsync(offsetsToCommit, null); + LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); + } + private void commitOffsetsForAckedTuples(Set assignedPartitions) { // Find offsets that are ready to be committed for every assigned topic partition final Map assignedOffsetManagers = new HashMap<>(); @@ -576,11 +586,10 @@ private void commitOffsetsForAckedTuples(Set assignedPartitions) long committedOffset = tpOffset.getValue().offset(); if (position < committedOffset) { /* - * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, - * lots of (more than max.poll.records) later messages were acked, and the failed message then gets acked. - * The consumer may only be part way through "catching up" to where it was when it went back to retry the failed tuple. - * Skip the consumer forward to the committed offset and drop the current waiting to emit list, - * since it'll likely contain committed offsets. + * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more + * than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be + * part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward + * to the committed offset and drop the current waiting to emit list, since it'll likely contain committed offsets. */ LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", position, committedOffset); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 094ae03cf67..cbe52a7ffe1 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.storm.Config; +import org.apache.storm.annotation.InterfaceStability; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; import org.apache.storm.tuple.Fields; import org.slf4j.Logger; @@ -97,7 +98,7 @@ public class KafkaSpoutConfig implements Serializable { * @param builder The Builder to construct the KafkaSpoutConfig from */ public KafkaSpoutConfig(Builder builder) { - setAutoCommitMode(builder); + setKafkaPropsForProcessingGuarantee(builder); this.kafkaProps = builder.kafkaProps; this.subscription = builder.subscription; this.translator = builder.translator; @@ -120,23 +121,26 @@ public KafkaSpoutConfig(Builder builder) { /** * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment. - * By default this parameter is set to UNCOMMITTED_EARLIEST. If the strategy is set to: - *
- *
    - *
  • EARLIEST - the kafka spout polls records starting in the first offset of the partition, regardless - * of previous commits. This setting only takes effect on topology deployment.
  • - *
  • LATEST - the kafka spout polls records with offsets greater than the last offset in the partition, - * regardless of previous commits. This setting only takes effect on topology deployment.
  • - *
  • UNCOMMITTED_EARLIEST - the kafka spout polls records from the last committed offset, if any. If no offset has been - * committed it behaves as EARLIEST.
  • - *
  • UNCOMMITTED_LATEST - the kafka spout polls records from the last committed offset, if any. If no offset has been - * committed it behaves as LATEST.
  • - *
+ * By default this parameter is set to UNCOMMITTED_EARLIEST. */ public enum FirstPollOffsetStrategy { + /** + * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only + * takes effect on topology deployment + */ EARLIEST, + /** + * The kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits. This + * setting only takes effect on topology deployment + */ LATEST, + /** + * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST + */ UNCOMMITTED_EARLIEST, + /** + * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST + */ UNCOMMITTED_LATEST; @Override @@ -147,28 +151,30 @@ public String toString() { /** * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed, - * i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE the spout controls when - * the commit happens. When the guarantee is NONE Kafka controls when the commit happens. - * - *
    - *
  • AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) - * and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance - * the ack gets lost.
  • - *
    - *
  • AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted - * to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it - * won't retry tuples that fail or timeout after the commit to Kafka has been done.
  • - *
    - *
  • NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties - * "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens - * it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times. - * This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown.
  • - *
+ * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE. + * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval. + * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep. */ + @InterfaceStability.Unstable public enum ProcessingGuarantee { + /** + * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or + * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined + * interval. + */ AT_LEAST_ONCE, + /** + * Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream + * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by + * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done + */ AT_MOST_ONCE, - NONE, + /** + * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may + * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the + * spout to control when commits occur. Commits asynchronously on the defined interval. + */ + NO_GUARANTEE, } public static class Builder { @@ -532,7 +538,8 @@ public Builder setPollTimeoutMs(long pollTimeoutMs) { /** * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s. * - *

This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}. + *

This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE} or + * {@link ProcessingGuarantee#NO_GUARANTEE}. * * @param offsetCommitPeriodMs time in ms */ @@ -729,46 +736,45 @@ private static Builder setStringDeserializers(Builder builder) { + private static void setKafkaPropsForProcessingGuarantee(Builder builder) { if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { - LOG.warn("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually." - + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee." - + " This will be treated as an error in the next major release." - + " For now the spout will be configured to behave like it would have in pre-1.2.0 releases."); + LOG.warn("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee." + + "This will be treated as an error in the next major release."); final boolean enableAutoCommit = Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString()); - if(enableAutoCommit) { - builder.processingGuarantee = ProcessingGuarantee.NONE; + if (enableAutoCommit) { + builder.processingGuarantee = ProcessingGuarantee.NO_GUARANTEE; } else { builder.processingGuarantee = ProcessingGuarantee.AT_LEAST_ONCE; } } - if (builder.processingGuarantee == ProcessingGuarantee.NONE) { - builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - } else { - String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); - if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) { - if (autoOffsetResetPolicy == null) { - /* - If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured - for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range - error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer - requests an offset that was deleted. - */ - builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) { - LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'." - + " Some messages may be skipped."); - } - } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) { - if (autoOffsetResetPolicy != null - && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) { - LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'." - + " Some messages may be processed more than once."); - } + String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) { + if (autoOffsetResetPolicy == null) { + /* + * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured + * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range + * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer + * requests an offset that was deleted. + */ + LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once processing", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) { + LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'." + + " Some messages may be skipped."); + } + } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) { + if (autoOffsetResetPolicy != null + && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) { + LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'." + + " Some messages may be processed more than once."); } - builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); } + LOG.info("Setting consumer property '{}' to 'false', because the spout does not support auto-commit", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); } /** diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index b701be93890..91a7be6043f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -32,10 +32,15 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.hamcrest.CoreMatchers; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class KafkaSpoutConfigTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testBasic() { KafkaSpoutConfig conf = KafkaSpoutConfig.builder("localhost:1234", "topic").build(); @@ -94,7 +99,7 @@ public void testCanConfigureWithExplicitTrueBooleanAutoCommitMode() { .build(); assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee", - conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NONE)); + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)); } @Test @@ -114,7 +119,7 @@ public void testCanConfigureWithExplicitTrueStringAutoCommitMode() { .build(); assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee", - conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NONE)); + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)); } @Test @@ -233,4 +238,12 @@ public void testMetricsTimeBucketSizeInSecs() { assertEquals(100, conf.getMetricsTimeBucketSizeInSecs()); } + + @Test + public void testThrowsIfEnableAutoCommitIsSet() { + expectedException.expect(IllegalStateException.class); + KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) + .build(); + } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index ae7612a26a0..a20f7069448 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -17,6 +17,7 @@ package org.apache.storm.kafka.spout; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; @@ -24,6 +25,7 @@ import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -37,6 +39,8 @@ import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.spout.SpoutOutputCollector; @@ -45,11 +49,18 @@ import org.apache.storm.utils.Time.SimulatedTime; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.InOrder; +import org.mockito.runners.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class KafkaSpoutMessagingGuaranteeTest { + @Captor + private ArgumentCaptor> commitCapture; + private final TopologyContext contextMock = mock(TopologyContext.class); private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); private final Map conf = new HashMap<>(); @@ -108,10 +119,10 @@ public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception } @Test - public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception { + public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception { //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) .build(); doTestModeDisregardsMaxUncommittedOffsets(spoutConfig); } @@ -152,10 +163,10 @@ public void testAtMostOnceModeCannotReplayTuples() throws Exception { } @Test - public void testAnyTimesModeCannotReplayTuples() throws Exception { - //When tuple tracking is enabled, the spout must not replay tuples in any-times mode + public void testNoGuaranteeModeCannotReplayTuples() throws Exception { + //When tuple tracking is enabled, the spout must not replay tuples in no guarantee mode KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) .setTupleTrackingEnforced(true) .build(); doTestModeCannotReplayTuples(spoutConfig); @@ -176,7 +187,7 @@ private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig spout.ack(msgIdCaptor.getValue()); - Time.advanceTime(spoutConfig.getOffsetsCommitPeriodMs()); + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); spout.nextTuple(); @@ -195,13 +206,37 @@ public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { } @Test - public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception { - //When tuple tracking is enabled, the spout must not commit acked tuples in any-times mode because committing is managed by the consumer + public void testNoGuaranteeModeCommitsPolledTuples() throws Exception { + //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) .setTupleTrackingEnforced(true) .build(); - doTestModeDoesNotCommitAckedTuples(spoutConfig); + + try (SimulatedTime time = new SimulatedTime()) { + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); + + spout.nextTuple(); + + when(consumerMock.position(partition)).thenReturn(1L); + + ArgumentCaptor msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); + assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); + + spout.nextTuple(); + + verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class)); + + Map commit = commitCapture.getValue(); + assertThat(commit.containsKey(partition), is(true)); + assertThat(commit.get(partition).offset(), is(1L)); + } } } diff --git a/storm-core/src/jvm/org/apache/storm/annotation/InterfaceStability.java b/storm-core/src/jvm/org/apache/storm/annotation/InterfaceStability.java new file mode 100644 index 00000000000..d05ae75ad55 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/annotation/InterfaceStability.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +/** + * Annotation to inform users of how much to rely on a particular package, + * class or method not changing over time. + * + */ +@InterfaceStability.Evolving +public class InterfaceStability { + /** + * Can evolve while retaining compatibility for minor release boundaries.; + * can break compatibility only at major release (ie. at m.0). + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + public @interface Stable {}; + + /** + * Evolving, but can break compatibility at minor release (i.e. m.x) + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + public @interface Evolving {}; + + /** + * No guarantee is provided as to reliability or stability across any + * level of release granularity. + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + public @interface Unstable {}; +} + From 03915f0aefa49c3115e9b66ec3f98b0914dc9433 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Sat, 27 Jan 2018 15:15:45 +0100 Subject: [PATCH 2/2] STORM-2913: Add metadata to at-most-once and at-least-once commits --- .../apache/storm/kafka/spout/KafkaSpout.java | 87 ++++++------------ .../storm/kafka/spout/KafkaSpoutConfig.java | 4 +- .../spout/internal/CommitMetadataManager.java | 91 +++++++++++++++++++ .../kafka/spout/KafkaSpoutConfigTest.java | 8 -- .../KafkaSpoutMessagingGuaranteeTest.java | 75 +++++++++------ 5 files changed, 166 insertions(+), 99 deletions(-) create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 27940d08415..4464e6866a3 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -23,12 +23,8 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -55,7 +51,7 @@ import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee; -import org.apache.storm.kafka.spout.internal.CommitMetadata; +import org.apache.storm.kafka.spout.internal.CommitMetadataManager; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; import org.apache.storm.kafka.spout.internal.OffsetManager; @@ -75,7 +71,6 @@ public class KafkaSpout extends BaseRichSpout { //Initial delay for the commit and subscription refresh timers public static final long TIMER_DELAY_MS = 500; private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); - private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); // Storm protected SpoutOutputCollector collector; @@ -107,8 +102,7 @@ public class KafkaSpout extends BaseRichSpout { // Triggers when a subscription should be refreshed private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; - // Metadata information to commit to Kafka. It is unique per spout per topology. - private transient String commitMetadata; + private transient CommitMetadataManager commitMetadataManager; private transient KafkaOffsetMetric kafkaOffsetMetric; public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { @@ -145,7 +139,7 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect offsetManagers = new HashMap<>(); emitted = new HashSet<>(); waitingToEmit = new HashMap<>(); - setCommitMetadata(context); + commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee()); tupleListener.open(conf, context); if (canRegisterMetrics()) { @@ -160,7 +154,7 @@ private void registerMetric() { kafkaOffsetMetric = new KafkaOffsetMetric(new Supplier() { @Override public Object get() { - return offsetManagers; + return Collections.unmodifiableMap(offsetManagers); } }, new Supplier() { @Override @@ -181,16 +175,6 @@ private boolean canRegisterMetrics() { return true; } - private void setCommitMetadata(TopologyContext context) { - try { - commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata( - context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName())); - } catch (JsonProcessingException e) { - LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e); - throw new RuntimeException(e); - } - } - private boolean isAtLeastOnceProcessing() { return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE; } @@ -228,8 +212,8 @@ private void initialize(Collection partitions) { retryService.retainAll(partitions); /* - * Emitted messages for partitions that are no longer assigned to this spout can't - * be acked and should not be retried, hence remove them from emitted collection. + * Emitted messages for partitions that are no longer assigned to this spout can't be acked and should not be retried, hence + * remove them from emitted collection. */ Iterator msgIdIterator = emitted.iterator(); while (msgIdIterator.hasNext()) { @@ -265,7 +249,7 @@ private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) { if (committedOffset != null) { // offset was previously committed for this consumer group and topic-partition, either by this or another topology. - if (isOffsetCommittedByThisTopology(newTp, committedOffset)) { + if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp, committedOffset, Collections.unmodifiableMap(offsetManagers))) { // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply. kafkaConsumer.seek(newTp, committedOffset.offset()); } else { @@ -293,31 +277,6 @@ private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) { } } - /** - * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. This info is used to decide if - * {@link FirstPollOffsetStrategy} should be applied - * - * @param tp topic-partition - * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka - * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise - */ - private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset) { - try { - if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).hasCommitted()) { - return true; - } - - final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class); - return committedMetadata.getTopologyId().equals(context.getStormId()); - } catch (IOException e) { - LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit " - + "for this topic-partition was done using an earlier version of Storm. " - + "Defaulting to behavior compatible with earlier version", committedOffset); - LOG.trace("", e); - return false; - } - } - // ======== Next Tuple ======= @Override public void nextTuple() { @@ -328,9 +287,12 @@ public void nextTuple() { if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) { if (isAtLeastOnceProcessing()) { - commitOffsetsForAckedTuples(kafkaConsumer.assignment()); + commitOffsetsForAckedTuples(kafkaConsumer.assignment()); } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) { - commitFetchedOffsetsAsync(kafkaConsumer.assignment()); + Map offsetsToCommit = + createFetchedOffsetsMetadata(kafkaConsumer.assignment()); + kafkaConsumer.commitAsync(offsetsToCommit, null); + LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } } @@ -424,7 +386,10 @@ private ConsumerRecords pollKafkaBroker(PollablePartitionsInfo pollablePar numPolledRecords); if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { //Commit polled records immediately to ensure delivery is at-most-once. - kafkaConsumer.commitSync(); + Map offsetsToCommit = + createFetchedOffsetsMetadata(kafkaConsumer.assignment()); + kafkaConsumer.commitSync(offsetsToCommit); + LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } return consumerRecords; } finally { @@ -497,11 +462,14 @@ private boolean emitOrRetryTuple(ConsumerRecord record) { LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); - if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset) - && committedOffset.offset() > record.offset()) { + if (isAtLeastOnceProcessing() + && committedOffset != null + && committedOffset.offset() > record.offset() + && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) { // Ensures that after a topology with this id is started, the consumer fetch // position never falls behind the committed offset (STORM-2844) - throw new IllegalStateException("Attempting to emit a message that has already been committed."); + throw new IllegalStateException("Attempting to emit a message that has already been committed." + + " This should never occur when using the at-least-once processing guarantee."); } final List tuple = kafkaSpoutConfig.getTranslator().apply(record); @@ -547,13 +515,12 @@ private boolean isEmitTuple(List tuple) { return tuple != null || kafkaSpoutConfig.isEmitNullTuples(); } - private void commitFetchedOffsetsAsync(Set assignedPartitions) { + private Map createFetchedOffsetsMetadata(Set assignedPartitions) { Map offsetsToCommit = new HashMap<>(); for (TopicPartition tp : assignedPartitions) { - offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp))); + offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata())); } - kafkaConsumer.commitAsync(offsetsToCommit, null); - LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); + return offsetsToCommit; } private void commitOffsetsForAckedTuples(Set assignedPartitions) { @@ -567,7 +534,7 @@ private void commitOffsetsForAckedTuples(Set assignedPartitions) final Map nextCommitOffsets = new HashMap<>(); for (Map.Entry tpOffset : assignedOffsetManagers.entrySet()) { - final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadata); + final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata()); if (nextCommitOffset != null) { nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset); } @@ -778,6 +745,6 @@ public boolean shouldPoll() { @VisibleForTesting KafkaOffsetMetric getKafkaOffsetMetric() { - return kafkaOffsetMetric; + return kafkaOffsetMetric; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index cbe52a7ffe1..7aa836cf8ad 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -758,7 +758,7 @@ private static void setKafkaPropsForProcessingGuarantee(Builder builder) { * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer * requests an offset that was deleted. */ - LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once processing", + LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) { @@ -772,7 +772,7 @@ private static void setKafkaPropsForProcessingGuarantee(Builder builder) { + " Some messages may be processed more than once."); } } - LOG.info("Setting consumer property '{}' to 'false', because the spout does not support auto-commit", + LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java new file mode 100644 index 00000000000..a63619c6e6f --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java @@ -0,0 +1,91 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Map; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee; +import org.apache.storm.task.TopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generates and reads commit metadata. + */ +public final class CommitMetadataManager { + + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(CommitMetadataManager.class); + // Metadata information to commit to Kafka. It is unique per spout instance. + private final String commitMetadata; + private final ProcessingGuarantee processingGuarantee; + private final TopologyContext context; + + /** + * Create a manager with the given context. + */ + public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) { + this.context = context; + try { + commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata( + context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName())); + this.processingGuarantee = processingGuarantee; + } catch (JsonProcessingException e) { + LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e); + throw new RuntimeException(e); + } + } + + /** + * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. + * + * @param tp The topic partition the commit metadata belongs to. + * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka + * @param offsetManagers The offset managers. + * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise + */ + public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset, + Map offsetManagers) { + try { + if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE + && offsetManagers.containsKey(tp) + && offsetManagers.get(tp).hasCommitted()) { + return true; + } + + final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class); + return committedMetadata.getTopologyId().equals(context.getStormId()); + } catch (IOException e) { + LOG.warn("Failed to deserialize expected commit metadata [{}]." + + " This error is expected to occur once per partition, if the last commit to each partition" + + " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. " + + "Defaulting to behavior compatible with earlier version", committedOffset); + LOG.trace("", e); + return false; + } + } + + public String getCommitMetadata() { + return commitMetadata; + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index 91a7be6043f..90e906b5a8f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -238,12 +238,4 @@ public void testMetricsTimeBucketSizeInSecs() { assertEquals(100, conf.getMetricsTimeBucketSizeInSecs()); } - - @Test - public void testThrowsIfEnableAutoCommitIsSet() { - expectedException.expect(IllegalStateException.class); - KafkaSpoutConfig.builder("localhost:1234", "topic") - .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) - .build(); - } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index a20f7069448..082cc58fd4c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -21,9 +21,9 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.inOrder; @@ -36,13 +36,16 @@ import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.CommitMetadataManager; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -51,6 +54,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.runners.MockitoJUnitRunner; @@ -60,7 +64,7 @@ public class KafkaSpoutMessagingGuaranteeTest { @Captor private ArgumentCaptor> commitCapture; - + private final TopologyContext contextMock = mock(TopologyContext.class); private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); private final Map conf = new HashMap<>(); @@ -85,11 +89,18 @@ public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { spout.nextTuple(); + when(consumerMock.position(partition)).thenReturn(1L); + //The spout should have emitted the tuple, and must have committed it before emit InOrder inOrder = inOrder(consumerMock, collectorMock); inOrder.verify(consumerMock).poll(anyLong()); - inOrder.verify(consumerMock).commitSync(); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList()); + + CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE); + Map committedOffsets = commitCapture.getValue(); + assertThat(committedOffsets.get(partition).offset(), is(0L)); + assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata())); } private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig spoutConfig) { @@ -172,39 +183,44 @@ public void testNoGuaranteeModeCannotReplayTuples() throws Exception { doTestModeCannotReplayTuples(spoutConfig); } - private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig spoutConfig) { + @Test + public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { + //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted + KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .setTupleTrackingEnforced(true) + .build(); try (SimulatedTime time = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); + reset(consumerMock); ArgumentCaptor msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); spout.ack(msgIdCaptor.getValue()); - + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); - + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.>>emptyMap())); + spout.nextTuple(); - - verify(consumerMock, never()).commitSync(any(Map.class)); + + verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher>() { + @Override + public boolean matches(Object arg) { + Map castArg = (Map) arg; + return !castArg.containsKey(partition); + } + })); } } - @Test - public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { - //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted - KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) - .setTupleTrackingEnforced(true) - .build(); - doTestModeDoesNotCommitAckedTuples(spoutConfig); - } - @Test public void testNoGuaranteeModeCommitsPolledTuples() throws Exception { //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked @@ -212,30 +228,31 @@ public void testNoGuaranteeModeCommitsPolledTuples() throws Exception { .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) .setTupleTrackingEnforced(true) .build(); - + try (SimulatedTime time = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); - + when(consumerMock.position(partition)).thenReturn(1L); ArgumentCaptor msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); - + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); - + spout.nextTuple(); - + verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class)); - - Map commit = commitCapture.getValue(); - assertThat(commit.containsKey(partition), is(true)); - assertThat(commit.get(partition).offset(), is(1L)); + + CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE); + Map committedOffsets = commitCapture.getValue(); + assertThat(committedOffsets.get(partition).offset(), is(1L)); + assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata())); } }