From 6e75016c45c602c874086dea26324ca413f0c141 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Tue, 14 Feb 2017 21:31:45 +0100 Subject: [PATCH] STORM-2250: Kafka spout refactoring to increase modularity and testability. Also support nanoseconds in Storm time simulation --- external/storm-kafka-client/pom.xml | 9 +- .../apache/storm/kafka/spout/KafkaSpout.java | 159 ++------- .../kafka/spout/internal/OffsetManager.java | 157 ++++++++ .../storm/kafka/spout/internal/Timer.java | 7 +- .../spout/ByTopicRecordTranslatorTest.java | 2 +- .../spout/DefaultRecordTranslatorTest.java | 2 +- .../kafka/spout/KafkaSpoutConfigTest.java | 4 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 82 ++--- .../spout/SingleTopicKafkaSpoutTest.java | 334 ++++++++++-------- .../KafkaSpoutTopologyMainNamedTopics.java | 6 +- .../KafkaSpoutTopologyMainWildcardTopics.java | 2 +- pom.xml | 1 - .../src/jvm/org/apache/storm/utils/Time.java | 146 +++++--- 13 files changed, 524 insertions(+), 387 deletions(-) create mode 100755 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index 97ed35919a0..0fdb64d85e9 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -77,7 +77,13 @@ org.hamcrest - hamcrest-all + hamcrest-core + 1.3 + test + + + org.hamcrest + hamcrest-library 1.3 test @@ -90,7 +96,6 @@ org.slf4j log4j-over-slf4j - ${log4j-over-slf4j.version} test diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index b96f3f9e1a5..f8a576c1161 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -25,16 +25,13 @@ import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -48,6 +45,7 @@ import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.kafka.spout.internal.Timer; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -58,19 +56,19 @@ public class KafkaSpout extends BaseRichSpout { private static final long serialVersionUID = 4151921085047987154L; + //Initial delay for the commit and subscription refresh timers + public static final long TIMER_DELAY_MS = 500; private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); - private static final Comparator OFFSET_COMPARATOR = new OffsetComparator(); // Storm protected SpoutOutputCollector collector; // Kafka private final KafkaSpoutConfig kafkaSpoutConfig; - private final KafkaConsumerFactory kafkaConsumerFactory; + private KafkaConsumerFactory kafkaConsumerFactory; private transient KafkaConsumer kafkaConsumer; private transient boolean consumerAutoCommitMode; - // Bookkeeping private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure @@ -78,7 +76,7 @@ public class KafkaSpout extends BaseRichSpout { private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() - transient Map acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate. Not used if it's AutoCommitMode + private transient Map acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate private transient Set emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode private transient Iterator> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode @@ -87,13 +85,13 @@ public class KafkaSpout extends BaseRichSpout { public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { - this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault()); + this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>()); } //This constructor is here for testing KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory kafkaConsumerFactory) { - this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration this.kafkaConsumerFactory = kafkaConsumerFactory; + this.kafkaSpoutConfig = kafkaSpoutConfig; } @Override @@ -114,9 +112,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect retryService = kafkaSpoutConfig.getRetryService(); if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually - commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); + commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } - refreshSubscriptionTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); + refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); acked = new HashMap<>(); emitted = new HashSet<>(); @@ -198,7 +196,7 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { private void setAcked(TopicPartition tp, long fetchOffset) { // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off if (!consumerAutoCommitMode && !acked.containsKey(tp)) { - acked.put(tp, new OffsetEntry(tp, fetchOffset)); + acked.put(tp, new OffsetManager(tp, fetchOffset)); } } @@ -290,7 +288,7 @@ private void doSeekRetriableTopicPartitions() { if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { - kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset + kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1); // Seek to last committed offset } } } @@ -347,7 +345,7 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { private void commitOffsetsForAckedTuples() { // Find offsets that are ready to be committed for every topic partition final Map nextCommitOffsets = new HashMap<>(); - for (Map.Entry tpOffset : acked.entrySet()) { + for (Map.Entry tpOffset : acked.entrySet()) { final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(); if (nextCommitOffset != null) { nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset); @@ -360,9 +358,14 @@ private void commitOffsetsForAckedTuples() { LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets); // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop - for (Map.Entry tpOffset : acked.entrySet()) { - final OffsetEntry offsetEntry = tpOffset.getValue(); - offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey())); + for (Map.Entry tpOffset : nextCommitOffsets.entrySet()) { + //Update the OffsetManager for each committed partition, and update numUncommittedOffsets + final TopicPartition tp = tpOffset.getKey(); + final OffsetManager offsetManager = acked.get(tp); + long numCommittedOffsets = offsetManager.commit(tpOffset.getValue()); + numUncommittedOffsets -= numCommittedOffsets; + LOG.debug("[{}] uncommitted offsets across all topic partitions", + numUncommittedOffsets); } } else { LOG.trace("No offsets to commit. {}", this); @@ -483,127 +486,7 @@ public Map getComponentConfiguration () { private String getTopicsString() { return kafkaSpoutConfig.getSubscription().getTopicsString(); } +} - // ======= Offsets Commit Management ========== - - private static class OffsetComparator implements Comparator { - public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) { - return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1; - } - } - - /** - * This class is not thread safe - */ - class OffsetEntry { - private final TopicPartition tp; - private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. - * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ - private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1 - private final NavigableSet ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); // acked messages sorted by ascending order of offset - - public OffsetEntry(TopicPartition tp, long initialFetchOffset) { - this.tp = tp; - this.initialFetchOffset = initialFetchOffset; - this.committedOffset = initialFetchOffset - 1; - LOG.debug("Instantiated {}", this); - } - - public void add(KafkaSpoutMessageId msgId) { // O(Log N) - ackedMsgs.add(msgId); - } - - /** - * An offset is only committed when all records with lower offset have - * been acked. This guarantees that all offsets smaller than the - * committedOffset have been delivered. - * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit. - */ - public OffsetAndMetadata findNextCommitOffset() { - boolean found = false; - long currOffset; - long nextCommitOffset = committedOffset; - KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata - - for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap - if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit - found = true; - nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; - } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search - LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); - break; - } else { - //Received a redundant ack. Ignore and continue processing. - LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", - tp, currOffset, committedOffset); - } - } - - OffsetAndMetadata nextCommitOffsetAndMetadata = null; - if (found) { - nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); - LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); - } else { - LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); - } - LOG.trace("{}", this); - return nextCommitOffsetAndMetadata; - } - - /** - * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future - * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any. - * - * @param committedOffset offset to be marked as committed - */ - public void commit(OffsetAndMetadata committedOffset) { - long numCommittedOffsets = 0; - if (committedOffset != null) { - final long oldCommittedOffset = this.committedOffset; - numCommittedOffsets = committedOffset.offset() - this.committedOffset; - this.committedOffset = committedOffset.offset(); - for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext(); ) { - if (iterator.next().offset() <= committedOffset.offset()) { - iterator.remove(); - } else { - break; - } - } - numUncommittedOffsets-= numCommittedOffsets; - LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions", - oldCommittedOffset + 1, this.committedOffset, numCommittedOffsets, tp, numUncommittedOffsets); - } else { - LOG.debug("Committed [{}] offsets for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions", - numCommittedOffsets, tp, numUncommittedOffsets); - } - LOG.trace("{}", this); - } - - long getCommittedOffset() { - return committedOffset; - } - - public boolean isEmpty() { - return ackedMsgs.isEmpty(); - } - public boolean contains(ConsumerRecord record) { - return contains(new KafkaSpoutMessageId(record)); - } - - public boolean contains(KafkaSpoutMessageId msgId) { - return ackedMsgs.contains(msgId); - } - @Override - public String toString() { - return "OffsetEntry{" + - "topic-partition=" + tp + - ", fetchOffset=" + initialFetchOffset + - ", committedOffset=" + committedOffset + - ", ackedMsgs=" + ackedMsgs + - '}'; - } - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java new file mode 100755 index 00000000000..4ce04718f27 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -0,0 +1,157 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.internal; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.NavigableSet; +import java.util.TreeSet; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages acked and committed offsets for a TopicPartition. This class is not thread safe + */ +public class OffsetManager { + + private static final Comparator OFFSET_COMPARATOR = new OffsetComparator(); + private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class); + private final TopicPartition tp; + /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. + * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ + private final long initialFetchOffset; + // Last offset committed to Kafka. Initially it is set to fetchOffset - 1 + private long committedOffset; + // Acked messages sorted by ascending order of offset + private final NavigableSet ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); + + public OffsetManager(TopicPartition tp, long initialFetchOffset) { + this.tp = tp; + this.initialFetchOffset = initialFetchOffset; + this.committedOffset = initialFetchOffset - 1; + LOG.debug("Instantiated {}", this); + } + + public void add(KafkaSpoutMessageId msgId) { // O(Log N) + ackedMsgs.add(msgId); + } + + /** + * An offset is only committed when all records with lower offset have been + * acked. This guarantees that all offsets smaller than the committedOffset + * have been delivered. + * + * @return the next OffsetAndMetadata to commit, or null if no offset is + * ready to commit. + */ + public OffsetAndMetadata findNextCommitOffset() { + boolean found = false; + long currOffset; + long nextCommitOffset = committedOffset; + KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata + + for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap + if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit + found = true; + nextCommitMsg = currAckedMsg; + nextCommitOffset = currOffset; + } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search + LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); + break; + } else { + //Received a redundant ack. Ignore and continue processing. + LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", + tp, currOffset, committedOffset); + } + } + + OffsetAndMetadata nextCommitOffsetAndMetadata = null; + if (found) { + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); + LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); + } else { + LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); + } + LOG.trace("{}", this); + return nextCommitOffsetAndMetadata; + } + + /** + * Marks an offset has committed. This method has side effects - it sets the + * internal state in such a way that future calls to + * {@link #findNextCommitOffset()} will return offsets greater than the + * offset specified, if any. + * + * @param committedOffset offset to be marked as committed + * @return Number of offsets committed in this commit + */ + public long commit(OffsetAndMetadata committedOffset) { + long preCommitCommittedOffsets = this.committedOffset; + long numCommittedOffsets = committedOffset.offset() - this.committedOffset; + this.committedOffset = committedOffset.offset(); + for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext();) { + if (iterator.next().offset() <= committedOffset.offset()) { + iterator.remove(); + } else { + break; + } + } + LOG.trace("{}", this); + + LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].", + preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp); + + return numCommittedOffsets; + } + + public long getCommittedOffset() { + return committedOffset; + } + + public boolean isEmpty() { + return ackedMsgs.isEmpty(); + } + + public boolean contains(ConsumerRecord record) { + return contains(new KafkaSpoutMessageId(record)); + } + + public boolean contains(KafkaSpoutMessageId msgId) { + return ackedMsgs.contains(msgId); + } + + @Override + public String toString() { + return "OffsetManager{" + + "topic-partition=" + tp + + ", fetchOffset=" + initialFetchOffset + + ", committedOffset=" + committedOffset + + ", ackedMsgs=" + ackedMsgs + + '}'; + } + + private static class OffsetComparator implements Comparator { + + @Override + public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) { + return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1; + } + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java index d51104df82f..2a2e1cb69c1 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java @@ -18,6 +18,7 @@ package org.apache.storm.kafka.spout.internal; import java.util.concurrent.TimeUnit; +import org.apache.storm.utils.Time; public class Timer { private final long delay; @@ -41,7 +42,7 @@ public Timer(long delay, long period, TimeUnit timeUnit) { this.timeUnit = timeUnit; periodNanos = timeUnit.toNanos(period); - start = System.nanoTime() + timeUnit.toNanos(delay); + start = Time.nanoTime() + timeUnit.toNanos(delay); } public long period() { @@ -65,9 +66,9 @@ public TimeUnit getTimeUnit() { * otherwise. */ public boolean isExpiredResetOnTrue() { - final boolean expired = System.nanoTime() - start > periodNanos; + final boolean expired = Time.nanoTime() - start >= periodNanos; if (expired) { - start = System.nanoTime(); + start = Time.nanoTime(); } return expired; } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java index fd53b1504c2..1e4b43b6dd4 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java @@ -17,7 +17,7 @@ */ package org.apache.storm.kafka.spout; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.util.Arrays; import java.util.HashSet; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java index f4275e49d10..681953d1f77 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java @@ -17,7 +17,7 @@ */ package org.apache.storm.kafka.spout; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.util.Arrays; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index 08220dd207e..57e01205d69 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -17,7 +17,9 @@ */ package org.apache.storm.kafka.spout; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.util.HashMap; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index 68fd4a6fe8a..b882b6759d3 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -44,6 +45,8 @@ import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -55,20 +58,18 @@ public class KafkaSpoutRebalanceTest { @Captor private ArgumentCaptor> commitCapture; - private TopologyContext contextMock; - private SpoutOutputCollector collectorMock; - private Map conf; + private final long offsetCommitPeriodMs = 2_000; + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map conf = new HashMap<>(); private KafkaConsumer consumerMock; - private KafkaConsumerFactory consumerFactoryMock; + private KafkaConsumerFactory consumerFactory; @Before public void setUp() { MockitoAnnotations.initMocks(this); - contextMock = mock(TopologyContext.class); - collectorMock = mock(SpoutOutputCollector.class); - conf = new HashMap<>(); consumerMock = mock(KafkaConsumer.class); - consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock; + consumerFactory = (kafkaSpoutConfig) -> consumerMock; } //Returns messageIds in order of emission @@ -93,9 +94,9 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti Map>> secondPartitionRecords = new HashMap<>(); secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value"))); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPartitionRecords)) - .thenReturn(new ConsumerRecords(secondPartitionRecords)) - .thenReturn(new ConsumerRecords(Collections.emptyMap())); + .thenReturn(new ConsumerRecords(firstPartitionRecords)) + .thenReturn(new ConsumerRecords(secondPartitionRecords)) + .thenReturn(new ConsumerRecords(Collections.emptyMap())); //Emit the messages spout.nextTuple(); @@ -109,7 +110,7 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti //Now rebalance consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition)); - + List emittedMessageIds = new ArrayList<>(); emittedMessageIds.add(messageIdForRevokedPartition.getValue()); emittedMessageIds.add(messageIdForAssignedPartition.getValue()); @@ -119,47 +120,48 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti @Test public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them - KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10), consumerFactoryMock); - String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; - TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); - TopicPartition assignedPartition = new TopicPartition(topic, 2); - - //Emit a message on each partition and revoke the first partition - List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); - - //Ack both emitted tuples - spout.ack(emittedMessageIds.get(0)); - spout.ack(emittedMessageIds.get(1)); - - //Ensure the commit timer has expired - Thread.sleep(510); - - //Make the spout commit any acked tuples - spout.nextTuple(); - //Verify that it only committed the message on the assigned partition - verify(consumerMock).commitSync(commitCapture.capture()); - - Map commitCaptureMap = commitCapture.getValue(); - assertThat(commitCaptureMap, hasKey(assignedPartition)); - assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked))); + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, this.offsetCommitPeriodMs), consumerFactory); + String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; + TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); + TopicPartition assignedPartition = new TopicPartition(topic, 2); + + //Emit a message on each partition and revoke the first partition + List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + + //Ack both emitted tuples + spout.ack(emittedMessageIds.get(0)); + spout.ack(emittedMessageIds.get(1)); + + //Ensure the commit timer has expired + Time.advanceTime(offsetCommitPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Make the spout commit any acked tuples + spout.nextTuple(); + //Verify that it only committed the message on the assigned partition + verify(consumerMock, times(1)).commitSync(commitCapture.capture()); + + Map commitCaptureMap = commitCapture.getValue(); + assertThat(commitCaptureMap, hasKey(assignedPartition)); + assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked))); + } } - + @Test public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); - KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactoryMock); + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactory); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); TopicPartition assignedPartition = new TopicPartition(topic, 2); - + //Emit a message on each partition and revoke the first partition List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); - + //Fail both emitted tuples spout.fail(emittedMessageIds.get(0)); spout.fail(emittedMessageIds.get(1)); - + //Check that only the tuple on the currently assigned partition is retried verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0)); verify(retryServiceMock).schedule(emittedMessageIds.get(1)); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index c5e4e31a43d..fdc97347b60 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -17,6 +17,8 @@ */ package org.apache.storm.kafka.spout; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig; + import info.batey.kafka.unit.KafkaUnitRule; import kafka.producer.KeyedMessage; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -28,21 +30,39 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; -import static org.junit.Assert.*; - import java.util.Map; import java.util.stream.IntStream; -import static org.mockito.Mockito.*; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.HashMap; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; public class SingleTopicKafkaSpoutTest { private class SpoutContext { + public KafkaSpout spout; public SpoutOutputCollector collector; public SpoutContext(KafkaSpout spout, - SpoutOutputCollector collector) { + SpoutOutputCollector collector) { this.spout = spout; this.collector = collector; } @@ -51,190 +71,206 @@ public SpoutContext(KafkaSpout spout, @Rule public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); - void populateTopicData(String topicName, int msgCount) { + @Captor + private ArgumentCaptor> commitCapture; + + private final TopologyContext topologyContext = mock(TopologyContext.class); + private final Map conf = new HashMap<>(); + private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); + private final long commitOffsetPeriodMs = 2_000; + private KafkaConsumer consumerSpy; + private KafkaConsumerFactory consumerFactory; + private KafkaSpout spout; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaPort(), commitOffsetPeriodMs); + this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); + this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy; + this.spout = new KafkaSpout<>(spoutConfig, consumerFactory); + } + + private void populateTopicData(String topicName, int msgCount) { kafkaUnitRule.getKafkaUnit().createTopic(topicName); IntStream.range(0, msgCount).forEach(value -> { KeyedMessage keyedMessage = new KeyedMessage<>( - topicName, Integer.toString(value), - Integer.toString(value)); + topicName, Integer.toString(value), + Integer.toString(value)); kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage); }); } - SpoutContext initializeSpout(int msgCount) { + private void initializeSpout(int msgCount) { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); - int kafkaPort = kafkaUnitRule.getKafkaPort(); - - TopologyContext topology = mock(TopologyContext.class); - SpoutOutputCollector collector = mock(SpoutOutputCollector.class); - Map conf = mock(Map.class); - - KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(kafkaPort)); - spout.open(conf, topology, collector); + spout.open(conf, topologyContext, collector); spout.activate(); - return new SpoutContext(spout, collector); } + /* - * Asserts that the next possible offset to commit or the committed offset is the provided offset. - * An offset that is ready to be committed is not guarenteed to be already committed. + * Asserts that commitSync has been called once, + * that there are only commits on one topic, + * and that the committed offset covers messageCount messages */ - private void assertOffsetCommitted(int offset, KafkaSpout.OffsetEntry entry) { - - boolean currentOffsetMatch = entry.getCommittedOffset() == offset; - OffsetAndMetadata nextOffset = entry.findNextCommitOffset(); - boolean nextOffsetMatch = nextOffset != null && nextOffset.offset() == offset; - assertTrue("Next offset: " + - entry.findNextCommitOffset() + - " OR current offset: " + - entry.getCommittedOffset() + - " must equal desired offset: " + - offset, - currentOffsetMatch | nextOffsetMatch); + private void verifyAllMessagesCommitted(long messageCount) { + verify(consumerSpy, times(1)).commitSync(commitCapture.capture()); + Map commits = commitCapture.getValue(); + assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1)); + OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue(); + assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1)); } @Test public void shouldContinueWithSlowDoubleAcks() throws Exception { - int messageCount = 20; - SpoutContext context = initializeSpout(messageCount); - - //play 1st tuple - ArgumentCaptor messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture()); - context.spout.ack(messageIdToDoubleAck.getValue()); - - IntStream.range(0, messageCount/2).forEach(value -> { - context.spout.nextTuple(); - }); - - context.spout.ack(messageIdToDoubleAck.getValue()); - - IntStream.range(0, messageCount).forEach(value -> { - context.spout.nextTuple(); - }); - - ArgumentCaptor remainingIds = ArgumentCaptor.forClass(Object.class); - - verify(context.collector, times(messageCount)).emit( - eq(SingleTopicKafkaSpoutConfiguration.STREAM), + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = 20; + initializeSpout(messageCount); + + //play 1st tuple + ArgumentCaptor messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture()); + spout.ack(messageIdToDoubleAck.getValue()); + + //Emit some more messages + IntStream.range(0, messageCount / 2).forEach(value -> { + spout.nextTuple(); + }); + + spout.ack(messageIdToDoubleAck.getValue()); + + //Emit any remaining messages + IntStream.range(0, messageCount).forEach(value -> { + spout.nextTuple(); + }); + + //Verify that all messages are emitted, ack all the messages + ArgumentCaptor messageIds = ArgumentCaptor.forClass(Object.class); + verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyObject(), - remainingIds.capture()); - remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack); + messageIds.capture()); + messageIds.getAllValues().iterator().forEachRemaining(spout::ack); - context.spout.acked.values().forEach(item -> { - assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item); - }); + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } } @Test public void shouldEmitAllMessages() throws Exception { - int messageCount = 10; - SpoutContext context = initializeSpout(messageCount); - - - IntStream.range(0, messageCount).forEach(value -> { - context.spout.nextTuple(); - ArgumentCaptor messageId = ArgumentCaptor.forClass(Object.class); - verify(context.collector).emit( + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = 10; + initializeSpout(messageCount); + + //Emit all messages and check that they are emitted. Ack the messages too + IntStream.range(0, messageCount).forEach(value -> { + spout.nextTuple(); + ArgumentCaptor messageId = ArgumentCaptor.forClass(Object.class); + verify(collector).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC, - Integer.toString(value), - Integer.toString(value))), - messageId.capture()); - context.spout.ack(messageId.getValue()); - reset(context.collector); - }); - - context.spout.acked.values().forEach(item -> { - assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item); - }); + Integer.toString(value), + Integer.toString(value))), + messageId.capture()); + spout.ack(messageId.getValue()); + reset(collector); + }); + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } } @Test public void shouldReplayInOrderFailedMessages() throws Exception { - int messageCount = 10; - SpoutContext context = initializeSpout(messageCount); - - //play and ack 1 tuple - ArgumentCaptor messageIdAcked = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyObject(), anyObject(), messageIdAcked.capture()); - context.spout.ack(messageIdAcked.getValue()); - reset(context.collector); - - //play and fail 1 tuple - ArgumentCaptor messageIdFailed = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyObject(), anyObject(), messageIdFailed.capture()); - context.spout.fail(messageIdFailed.getValue()); - reset(context.collector); - - //pause so that failed tuples will be retried - Thread.sleep(200); - - - //allow for some calls to nextTuple() to fail to emit a tuple - IntStream.range(0, messageCount + 5).forEach(value -> { - context.spout.nextTuple(); - }); - - ArgumentCaptor remainingMessageIds = ArgumentCaptor.forClass(Object.class); - - //1 message replayed, messageCount - 2 messages emitted for the first time - verify(context.collector, times(messageCount - 1)).emit( + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = 10; + initializeSpout(messageCount); + + //play and ack 1 tuple + ArgumentCaptor messageIdAcked = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyObject(), anyObject(), messageIdAcked.capture()); + spout.ack(messageIdAcked.getValue()); + reset(collector); + + //play and fail 1 tuple + ArgumentCaptor messageIdFailed = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyObject(), anyObject(), messageIdFailed.capture()); + spout.fail(messageIdFailed.getValue()); + reset(collector); + + //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. + IntStream.range(0, messageCount).forEach(value -> { + spout.nextTuple(); + }); + + ArgumentCaptor remainingMessageIds = ArgumentCaptor.forClass(Object.class); + //All messages except the first acked message should have been emitted + verify(collector, times(messageCount - 1)).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyObject(), remainingMessageIds.capture()); - remainingMessageIds.getAllValues().iterator().forEachRemaining(context.spout::ack); + remainingMessageIds.getAllValues().iterator().forEachRemaining(spout::ack); - context.spout.acked.values().forEach(item -> { - assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item); - }); + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } } @Test public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception { - int messageCount = 10; - SpoutContext context = initializeSpout(messageCount); - - - //play 1st tuple - ArgumentCaptor messageIdToFail = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyObject(), anyObject(), messageIdToFail.capture()); - reset(context.collector); - - //play 2nd tuple - ArgumentCaptor messageIdToAck = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyObject(), anyObject(), messageIdToAck.capture()); - reset(context.collector); - - //ack 2nd tuple - context.spout.ack(messageIdToAck.getValue()); - //fail 1st tuple - context.spout.fail(messageIdToFail.getValue()); - - //pause so that failed tuples will be retried - Thread.sleep(200); - - //allow for some calls to nextTuple() to fail to emit a tuple - IntStream.range(0, messageCount + 5).forEach(value -> { - context.spout.nextTuple(); - }); - - ArgumentCaptor remainingIds = ArgumentCaptor.forClass(Object.class); - //1 message replayed, messageCount - 2 messages emitted for the first time - verify(context.collector, times(messageCount - 1)).emit( + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = 10; + initializeSpout(messageCount); + + //play 1st tuple + ArgumentCaptor messageIdToFail = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyObject(), anyObject(), messageIdToFail.capture()); + reset(collector); + + //play 2nd tuple + ArgumentCaptor messageIdToAck = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyObject(), anyObject(), messageIdToAck.capture()); + reset(collector); + + //ack 2nd tuple + spout.ack(messageIdToAck.getValue()); + //fail 1st tuple + spout.fail(messageIdToFail.getValue()); + + //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. + IntStream.range(0, messageCount).forEach(value -> { + spout.nextTuple(); + }); + + ArgumentCaptor remainingIds = ArgumentCaptor.forClass(Object.class); + //All messages except the first acked message should have been emitted + verify(collector, times(messageCount - 1)).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyObject(), remainingIds.capture()); - remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack); + remainingIds.getAllValues().iterator().forEachRemaining(spout::ack); - context.spout.acked.values().forEach(item -> { - assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item); - }); + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } } -} \ No newline at end of file +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java index 2aeeb9571e9..e305c8afc54 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java @@ -50,9 +50,9 @@ public static void main(String[] args) throws Exception { protected void runMain(String[] args) throws Exception { if (args.length == 0) { - submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig()); + submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig()); } else { - submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig()); + submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), getConfig()); } } @@ -82,7 +82,7 @@ protected Config getConfig() { return config; } - protected StormTopology getTopolgyKafkaSpout() { + protected StormTopology getTopologyKafkaSpout() { final TopologyBuilder tp = new TopologyBuilder(); tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()) diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java index d0376e68c6b..f811c7ab73d 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java @@ -37,7 +37,7 @@ public static void main(String[] args) throws Exception { new KafkaSpoutTopologyMainWildcardTopics().runMain(args); } - protected StormTopology getTopolgyKafkaSpout() { + protected StormTopology getTopologyKafkaSpout() { final TopologyBuilder tp = new TopologyBuilder(); tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); diff --git a/pom.xml b/pom.xml index 13c4c3548cf..6d3543e7a1d 100644 --- a/pom.xml +++ b/pom.xml @@ -359,7 +359,6 @@ junit junit - ${junit.version} test diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java index c5c6b6a7dcc..0401829ad0e 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Time.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java @@ -24,14 +24,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * This class implements time simulation support. When time simulation is enabled, methods on this class will use fixed time. + * When time simulation is disabled, methods will pass through to relevant java.lang.System/java.lang.Thread calls. + * Methods using units higher than nanoseconds will pass through to System.currentTimeMillis(). Methods supporting nanoseconds will pass through to System.nanoTime(). + */ public class Time { private static final Logger LOG = LoggerFactory.getLogger(Time.class); private static AtomicBoolean simulating = new AtomicBoolean(false); - private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0); - private static volatile Map threadSleepTimes; + private static AtomicLong autoAdvanceNanosOnSleep = new AtomicLong(0); + private static volatile Map threadSleepTimesNanos; private static final Object sleepTimesLock = new Object(); - private static AtomicLong simulatedCurrTimeMs; + private static AtomicLong simulatedCurrTimeNanos; public static class SimulatedTime implements AutoCloseable { @@ -39,13 +43,13 @@ public SimulatedTime() { this(null); } - public SimulatedTime(Number ms) { + public SimulatedTime(Number advanceTimeMs) { synchronized(Time.sleepTimesLock) { Time.simulating.set(true); - Time.simulatedCurrTimeMs = new AtomicLong(0); - Time.threadSleepTimes = new ConcurrentHashMap<>(); - if (ms != null) { - Time.autoAdvanceOnSleep.set(ms.longValue()); + Time.simulatedCurrTimeNanos = new AtomicLong(0); + Time.threadSleepTimesNanos = new ConcurrentHashMap<>(); + if (advanceTimeMs != null) { + Time.autoAdvanceNanosOnSleep.set(millisToNanos(advanceTimeMs.longValue())); } LOG.warn("AutoCloseable Simulated Time Starting..."); } @@ -55,8 +59,8 @@ public SimulatedTime(Number ms) { public void close() { synchronized(Time.sleepTimesLock) { Time.simulating.set(false); - Time.autoAdvanceOnSleep.set(0); - Time.threadSleepTimes = null; + Time.autoAdvanceNanosOnSleep.set(0); + Time.threadSleepTimesNanos = null; LOG.warn("AutoCloseable Simulated Time Ending..."); } } @@ -66,8 +70,8 @@ public void close() { public static void startSimulating() { synchronized(Time.sleepTimesLock) { Time.simulating.set(true); - Time.simulatedCurrTimeMs = new AtomicLong(0); - Time.threadSleepTimes = new ConcurrentHashMap<>(); + Time.simulatedCurrTimeNanos = new AtomicLong(0); + Time.threadSleepTimesNanos = new ConcurrentHashMap<>(); LOG.warn("Simulated Time Starting..."); } } @@ -76,8 +80,8 @@ public static void startSimulating() { public static void stopSimulating() { synchronized(Time.sleepTimesLock) { Time.simulating.set(false); - Time.autoAdvanceOnSleep.set(0); - Time.threadSleepTimes = null; + Time.autoAdvanceNanosOnSleep.set(0); + Time.threadSleepTimesNanos = null; LOG.warn("Simulated Time Ending..."); } } @@ -88,44 +92,66 @@ public static boolean isSimulating() { public static void sleepUntil(long targetTimeMs) throws InterruptedException { if(simulating.get()) { - try { - synchronized(sleepTimesLock) { - if (threadSleepTimes == null) { + simulatedSleepUntilNanos(millisToNanos(targetTimeMs)); + } else { + long sleepTimeMs = targetTimeMs - currentTimeMillis(); + if(sleepTimeMs>0) { + Thread.sleep(sleepTimeMs); + } + } + } + + public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedException { + if(simulating.get()) { + simulatedSleepUntilNanos(targetTimeNanos); + } else { + long sleepTimeNanos = targetTimeNanos-nanoTime(); + long sleepTimeMs = nanosToMillis(sleepTimeNanos); + int sleepTimeNanosSansMs = (int)(sleepTimeNanos%1_000_000); + if(sleepTimeNanos>0) { + Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs); + } + } + } + + private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException { + try { + synchronized (sleepTimesLock) { + if (threadSleepTimesNanos == null) { + LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); + throw new InterruptedException(); + } + threadSleepTimesNanos.put(Thread.currentThread(), new AtomicLong(targetTimeNanos)); + } + while (simulatedCurrTimeNanos.get() < targetTimeNanos) { + synchronized (sleepTimesLock) { + if (threadSleepTimesNanos == null) { LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); throw new InterruptedException(); } - threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs)); } - while(simulatedCurrTimeMs.get() < targetTimeMs) { - synchronized(sleepTimesLock) { - if (threadSleepTimes == null) { - LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); - throw new InterruptedException(); - } - } - long autoAdvance = autoAdvanceOnSleep.get(); - if (autoAdvance > 0) { - advanceTime(autoAdvance); - } - Thread.sleep(10); + long autoAdvance = autoAdvanceNanosOnSleep.get(); + if (autoAdvance > 0) { + advanceTimeNanos(autoAdvance); } - } finally { - synchronized(sleepTimesLock) { - if (simulating.get() && threadSleepTimes != null) { - threadSleepTimes.remove(Thread.currentThread()); - } + Thread.sleep(10); + } + } finally { + synchronized (sleepTimesLock) { + if (simulating.get() && threadSleepTimesNanos != null) { + threadSleepTimesNanos.remove(Thread.currentThread()); } } - } else { - long sleepTime = targetTimeMs-currentTimeMillis(); - if(sleepTime>0) - Thread.sleep(sleepTime); } } public static void sleep(long ms) throws InterruptedException { sleepUntil(currentTimeMillis()+ms); } + + public static void sleepNanos(long nanos) throws InterruptedException { + sleepUntilNanos(nanoTime() + nanos); + } public static void sleepSecs (long secs) throws InterruptedException { if (secs > 0) { @@ -133,14 +159,30 @@ public static void sleepSecs (long secs) throws InterruptedException { } } + public static long nanoTime() { + if (simulating.get()) { + return simulatedCurrTimeNanos.get(); + } else { + return System.nanoTime(); + } + } + public static long currentTimeMillis() { if(simulating.get()) { - return simulatedCurrTimeMs.get(); + return nanosToMillis(simulatedCurrTimeNanos.get()); } else { return System.currentTimeMillis(); } } + public static long nanosToMillis(long nanos) { + return nanos/1_000_000; + } + + public static long millisToNanos(long millis) { + return millis*1_000_000; + } + public static long secsToMillis (int secs) { return 1000*(long) secs; } @@ -162,9 +204,17 @@ public static long deltaMs(long timeInMilliseconds) { } public static void advanceTime(long ms) { - if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode"); - if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument"); - long newTime = simulatedCurrTimeMs.addAndGet(ms); + advanceTimeNanos(millisToNanos(ms)); + } + + public static void advanceTimeNanos(long nanos) { + if (!simulating.get()) { + throw new IllegalStateException("Cannot simulate time unless in simulation mode"); + } + if (nanos < 0) { + throw new IllegalArgumentException("advanceTime only accepts positive time as an argument"); + } + long newTime = simulatedCurrTimeNanos.addAndGet(nanos); LOG.debug("Advanced simulated time to {}", newTime); } @@ -173,11 +223,13 @@ public static void advanceTimeSecs(long secs) { } public static boolean isThreadWaiting(Thread t) { - if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode"); + if(!simulating.get()) { + throw new IllegalStateException("Must be in simulation mode"); + } AtomicLong time; synchronized(sleepTimesLock) { - time = threadSleepTimes.get(t); + time = threadSleepTimesNanos.get(t); } - return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue(); + return !t.isAlive() || time!=null && nanoTime() < time.longValue(); } }