From a4cdfdcaaf15c8c60749a103d38b55e77703c2d5 Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 12 May 2017 11:59:18 -0400 Subject: [PATCH 1/6] Upgrading spout to 1.1 for performance. --- .../metron-storm-kafka-override/pom.xml | 101 ++++++++++++++++++ .../storm/kafka/spout/internal/Timer.java | 58 ++++++++++ metron-platform/metron-storm-kafka/pom.xml | 5 + .../kafka/flux/SimpleStormKafkaBuilder.java | 89 ++++++++------- .../storm/kafka/flux/SpoutConfiguration.java | 20 ++-- .../kafka/flux/SpoutConfigurationTest.java | 13 +-- metron-platform/pom.xml | 1 + pom.xml | 20 +++- 8 files changed, 252 insertions(+), 55 deletions(-) create mode 100644 metron-platform/metron-storm-kafka-override/pom.xml create mode 100644 metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java diff --git a/metron-platform/metron-storm-kafka-override/pom.xml b/metron-platform/metron-storm-kafka-override/pom.xml new file mode 100644 index 0000000000..8683176241 --- /dev/null +++ b/metron-platform/metron-storm-kafka-override/pom.xml @@ -0,0 +1,101 @@ + + + + + 4.0.0 + + org.apache.metron + metron-platform + 0.4.0 + + metron-storm-kafka-override + metron-storm-kafka-override + Components that extend the Storm/Kafka spout + https://metron.apache.org/ + + UTF-8 + UTF-8 + 1.10 + + + + org.apache.storm + storm-kafka-client + ${global_storm_kafka_version} + + + org.apache.kafka + kafka-clients + ${global_kafka_version} + + + org.apache.storm + storm-core + ${global_storm_version} + provided + + + servlet-api + javax.servlet + + + log4j-over-slf4j + org.slf4j + + + log4j-slf4j-impl + org.apache.logging.log4j + + + + + org.apache.kafka + kafka_2.10 + ${global_kafka_version} + provided + + + com.sun.jmx + jmxri + + + com.sun.jdmk + jmxtools + + + javax.jms + jms + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.metron + metron-common + ${project.parent.version} + + + + + + + src/main/resources + + + + diff --git a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java new file mode 100644 index 0000000000..f9782ab97b --- /dev/null +++ b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.internal; + +import java.util.concurrent.TimeUnit; +import org.apache.storm.utils.Time; + +public class Timer { + private final long delay; + private final long period; + private final TimeUnit timeUnit; + private final long periodNanos; + private long start; + + public Timer(long delay, long period, TimeUnit timeUnit) { + this.delay = delay; + this.period = period; + this.timeUnit = timeUnit; + this.periodNanos = timeUnit.toNanos(period); + this.start = System.nanoTime() + timeUnit.toNanos(delay); + } + + public long period() { + return this.period; + } + + public long delay() { + return this.delay; + } + + public TimeUnit getTimeUnit() { + return this.timeUnit; + } + + public boolean isExpiredResetOnTrue() { + boolean expired = System.nanoTime() - this.start >= this.periodNanos; + if(expired) { + this.start = System.nanoTime(); + } + + return expired; + } +} diff --git a/metron-platform/metron-storm-kafka/pom.xml b/metron-platform/metron-storm-kafka/pom.xml index b8e3f8d79f..5c28b3431c 100644 --- a/metron-platform/metron-storm-kafka/pom.xml +++ b/metron-platform/metron-storm-kafka/pom.xml @@ -30,6 +30,11 @@ 1.10 + + org.apache.metron + metron-storm-kafka-override + ${project.parent.version} + org.apache.storm storm-kafka-client diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java index bf5250bc23..514eaaeb50 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java @@ -20,8 +20,10 @@ import com.google.common.base.Joiner; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.metron.common.utils.KafkaUtils; import org.apache.storm.kafka.spout.*; import org.apache.storm.spout.SpoutOutputCollector; @@ -30,10 +32,7 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Function; /** @@ -113,11 +112,12 @@ public static Fields getFields(Iterable configs) { * @param The key type in kafka * @param The value type in kafka */ - public static class TupleBuilder extends KafkaSpoutTupleBuilder { + public static class SpoutRecordTranslator implements RecordTranslator { private List configurations; - private TupleBuilder(String topic, List configurations) { - super(topic); + private Fields fields; + private SpoutRecordTranslator(List configurations) { this.configurations = configurations; + this.fields = FieldsConfiguration.getFields(configurations); } /** @@ -127,15 +127,27 @@ private TupleBuilder(String topic, List configurations) { * @return list of tuples */ @Override - public List buildTuple(ConsumerRecord consumerRecord) { + public List apply(ConsumerRecord consumerRecord) { Values ret = new Values(); for(FieldsConfiguration config : configurations) { ret.add(config.recordExtractor.apply(consumerRecord)); } return ret; } + + @Override + public Fields getFieldsFor(String s) { + return fields; + } + + @Override + public List streams() { + return DEFAULT_STREAM; + } } + public static String DEFAULT_DESERIALIZER = ByteArrayDeserializer.class.getName(); + private String topic; /** @@ -165,13 +177,39 @@ public SimpleStormKafkaBuilder( Map kafkaProps , List fieldsConfiguration ) { - super( modifyKafkaProps(kafkaProps, zkQuorum) - , createStreams(fieldsConfiguration, topic) - , createTuplesBuilder(fieldsConfiguration, topic) - ); + super( getBootstrapServers(zkQuorum, kafkaProps) + , createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)), DEFAULT_DESERIALIZER) + , createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)), DEFAULT_DESERIALIZER) + , topic + ); + setProp(kafkaProps); + setRecordTranslator(new SpoutRecordTranslator<>(FieldsConfiguration.toList(fieldsConfiguration))); this.topic = topic; } + private static Class> createDeserializer( Optional deserializerClass + , String defaultDeserializerClass + ) + { + try { + return (Class>) Class.forName(deserializerClass.orElse(defaultDeserializerClass)); + } catch (Exception e) { + throw new IllegalStateException("Unable to create a deserializer: " + deserializerClass.orElse(defaultDeserializerClass) + ": " + e.getMessage(), e); + } + } + + private static String getBootstrapServers(String zkQuorum, Map kafkaProps) { + String brokers = (String)kafkaProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + if(brokers == null) { + try { + return Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum)); + } catch (Exception e) { + throw new IllegalStateException("Unable to find the bootstrap servers: " + e.getMessage(), e); + } + } + return brokers; + } + /** * Get the kafka topic. TODO: In the future, support multiple topics and regex patterns. * @return @@ -202,31 +240,4 @@ public static StormKafkaSpout create( String topic return new StormKafkaSpout<>(builder); } - private static Map modifyKafkaProps(Map props, String zkQuorum) { - try { - if(!props.containsKey(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS)) { - //this isn't a putIfAbsent because I only want to pull the brokers from zk if it's absent. - List brokers = KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum); - props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, Joiner.on(",").join(brokers)); - } - props.putIfAbsent(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, ByteArrayDeserializer.class.getName()); - props.putIfAbsent(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName()); - - } catch (Exception e) { - throw new IllegalStateException("Unable to retrieve brokers from zookeeper: " + e.getMessage(), e); - } - return props; - } - - private static KafkaSpoutTuplesBuilder createTuplesBuilder(List config, String topic) { - TupleBuilder tb = new TupleBuilder(topic, FieldsConfiguration.toList(config)); - return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(tb).build(); - } - - - private static KafkaSpoutStreams createStreams(List config, String topic) { - final Fields fields = FieldsConfiguration.getFields(FieldsConfiguration.toList(config)); - return new KafkaSpoutStreamsNamedTopics.Builder(fields, STREAM, new String[] { topic} ).build(); - } - } diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java index 6c0f148e7f..2a4586dac3 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java @@ -17,6 +17,8 @@ */ package org.apache.metron.storm.kafka.flux; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.metron.common.utils.ConversionUtils; import org.apache.storm.kafka.spout.KafkaSpoutConfig; @@ -48,12 +50,6 @@ public enum SpoutConfiguration { ,FIRST_POLL_OFFSET_STRATEGY("spout.firstPollOffsetStrategy" , container -> container.builder.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.valueOf(container.value.toString())) ) - /** - * The maximum number of retries - */ - ,MAX_RETRIES("spout.maxRetries" - , container -> container.builder.setMaxRetries(ConversionUtils.convert(container.value, Integer.class)) - ) /** * The maximum amount of uncommitted offsets */ @@ -66,6 +62,12 @@ public enum SpoutConfiguration { ,OFFSET_COMMIT_PERIOD_MS("spout.offsetCommitPeriodMs" , container -> container.builder.setOffsetCommitPeriodMs(ConversionUtils.convert(container.value, Long.class)) ) + /** + * The partition refresh period in milliseconds + */ + ,PARTITION_REFRESH_PERIOD_MS("spout.partitionRefreshPeriodMs" + , container -> container.builder.setPartitionRefreshPeriodMs(ConversionUtils.convert(container.value, Long.class)) + ) ; private static class Container { Map config; @@ -131,9 +133,9 @@ public static List allOptions() { for(SpoutConfiguration spoutConfig : SpoutConfiguration.values()) { ret.add(spoutConfig.key); } - ret.add(KafkaSpoutConfig.Consumer.GROUP_ID); - ret.add(KafkaSpoutConfig.Consumer.AUTO_COMMIT_INTERVAL_MS); - ret.add(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT); + ret.add(ConsumerConfig.GROUP_ID_CONFIG); + ret.add(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + ret.add(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); return ret; } } diff --git a/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java index fdef69db76..c6dbd8f0e4 100644 --- a/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java +++ b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java @@ -17,6 +17,7 @@ */ package org.apache.metron.storm.kafka.flux; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.metron.common.utils.KafkaUtils; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.junit.Assert; @@ -33,14 +34,14 @@ public class SpoutConfigurationTest { public void testSeparation() { Map config = new HashMap() {{ put(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key, "UNCOMMITTED_EARLIEST"); - put(SpoutConfiguration.MAX_RETRIES.key, "1000"); + put(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key, "1000"); put("group.id", "foobar"); }}; Map spoutConfig = SpoutConfiguration.separate(config); Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key)); Assert.assertEquals(spoutConfig.get(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key), "UNCOMMITTED_EARLIEST"); - Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.MAX_RETRIES.key)); - Assert.assertEquals(spoutConfig.get(SpoutConfiguration.MAX_RETRIES.key), "1000"); + Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key)); + Assert.assertEquals(spoutConfig.get(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key), "1000"); Assert.assertEquals(2, spoutConfig.size()); Assert.assertEquals(1, config.size()); Assert.assertEquals(config.get("group.id"), "foobar"); @@ -49,15 +50,15 @@ public void testSeparation() { @Test public void testBuilderCreation() { Map config = new HashMap() {{ - put(SpoutConfiguration.MAX_RETRIES.key, "1000"); - put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "foo:1234"); + put(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key, "1000"); + put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234"); put("group.id", "foobar"); }}; Map spoutConfig = SpoutConfiguration.separate(config); KafkaSpoutConfig.Builder builder = new SimpleStormKafkaBuilder(config, "topic", null); SpoutConfiguration.configure(builder, spoutConfig); KafkaSpoutConfig c = builder.build(); - Assert.assertEquals(1000, c.getMaxTupleRetries() ); + Assert.assertEquals(1000, c.getOffsetsCommitPeriodMs() ); } } diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml index 1376e5ceab..cc488514d9 100644 --- a/metron-platform/pom.xml +++ b/metron-platform/pom.xml @@ -58,6 +58,7 @@ elasticsearch-shaded metron-elasticsearch metron-storm-kafka + metron-storm-kafka-override diff --git a/pom.xml b/pom.xml index bb32c64249..390073cb66 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,22 @@ + + + true + always + warn + + + true + never + warn + + HDPPrivateReleases + HDP Private Releases + http://nexus-private.hortonworks.com/nexus/content/groups/public + default + clojars.org http://clojars.org/repo @@ -106,7 +122,8 @@ but I justify it by noting that this should be able to be removed when we migrate to Storm 1.1.x, which properly supports Kafka 0.10.x. --> - 1.0.1.2.5.0.0-1245 + + 1.1.0.2.6.1.0-SNAPSHOT ${base_flux_version} 1.7.1 0.10.0.1 @@ -138,6 +155,7 @@ 2.5.0.0 1245 + 1.1.0.2.6.1.0-SNAPSHOT ${base_storm_version}.${hdp_version}-${build_number} ${base_kafka_version}.${hdp_version}-${build_number} From 7ef320d3d80324d2322943a7b95354f0ef6c3852 Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 12 May 2017 12:09:57 -0400 Subject: [PATCH 2/6] Travis drama. --- .../apache/metron/parsers/topology/ParserTopologyBuilder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index e9acbaa9f6..196c19d44b 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -17,6 +17,7 @@ */ package org.apache.metron.parsers.topology; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.metron.storm.kafka.flux.StormKafkaSpout; @@ -124,7 +125,7 @@ private static StormKafkaSpout createKafkaSpout( String zkQuorum kafkaSpoutConfigOptions.putIfAbsent( SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key , KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.toString() ); - kafkaSpoutConfigOptions.putIfAbsent( KafkaSpoutConfig.Consumer.GROUP_ID + kafkaSpoutConfigOptions.putIfAbsent( ConsumerConfig.GROUP_ID_CONFIG , inputTopic + "_parser" ); if(securityProtocol.isPresent()) { From 284c6452d08bc050fe8b1c66e5c85e4a95804cad Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 12 May 2017 14:09:05 -0400 Subject: [PATCH 3/6] Update --- pom.xml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 390073cb66..de627acd6e 100644 --- a/pom.xml +++ b/pom.xml @@ -122,8 +122,7 @@ but I justify it by noting that this should be able to be removed when we migrate to Storm 1.1.x, which properly supports Kafka 0.10.x. --> - - 1.1.0.2.6.1.0-SNAPSHOT + 1.1.0 ${base_flux_version} 1.7.1 0.10.0.1 From 4246c36d7fdb94505d789c359c9115728f856e15 Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 12 May 2017 14:48:21 -0400 Subject: [PATCH 4/6] Remove ominous warning. --- pom.xml | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/pom.xml b/pom.xml index de627acd6e..9f622499e2 100644 --- a/pom.xml +++ b/pom.xml @@ -99,29 +99,6 @@ 2.7.1 3.3 1.0.3 - 1.1.0 ${base_flux_version} 1.7.1 From 3387d420c3c348ea369d65c6404493483226f105 Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 15 May 2017 15:39:26 -0400 Subject: [PATCH 5/6] removed extraneous STREAM --- .../apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java index 514eaaeb50..592859ed6a 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java @@ -46,7 +46,6 @@ * @param The kafka value type */ public class SimpleStormKafkaBuilder extends KafkaSpoutConfig.Builder { - final static String STREAM = "default"; /** * The fields exposed by the kafka consumer. These will show up in the Storm tuple. From a368afb0851b7ae0aa756a2a1c451d48bd941412 Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 15 May 2017 15:50:51 -0400 Subject: [PATCH 6/6] Adding test for Timer. --- .../storm/kafka/spout/internal/TimerTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java diff --git a/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java b/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java new file mode 100644 index 0000000000..0d49ae1636 --- /dev/null +++ b/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.internal; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +public class TimerTest { + + @Test + public void testReset() throws InterruptedException { + Timer t = new Timer(0, 2, TimeUnit.SECONDS); + Thread.sleep(1000); + Assert.assertFalse(t.isExpiredResetOnTrue()); + Thread.sleep(1000); + Assert.assertTrue(t.isExpiredResetOnTrue()); + } + +}