From cdbde7e2d15ae5f9a75e4ec5c0ad94bfc9bca1d8 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 24 Mar 2016 11:57:28 +0900 Subject: [PATCH] STORM-1654 HBaseBolt creates tick tuples with no interval when we don't set flushIntervalSecs * set 'default' flush interval seconds (1s) to HBaseBolt * since taking half of message timeout secs doesn't work --- .../storm/hbase/bolt/AbstractHBaseBolt.java | 1 - .../org/apache/storm/hbase/bolt/HBaseBolt.java | 18 +++--------------- 2 files changed, 3 insertions(+), 16 deletions(-) diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java index 3546f758c98..3bba0e74653 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java @@ -43,7 +43,6 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt { protected HBaseMapper mapper; protected String configKey; protected int batchSize = 15000; - protected int flushIntervalSecs = 0; public AbstractHBaseBolt(String tableName, HBaseMapper mapper) { Validate.notEmpty(tableName, "Table name can not be blank or null"); diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java index 2a48f10283c..e30a4f6f318 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java @@ -20,7 +20,6 @@ import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.TupleUtils; -import org.apache.storm.Config; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; @@ -40,10 +39,12 @@ */ public class HBaseBolt extends AbstractHBaseBolt { private static final Logger LOG = LoggerFactory.getLogger(HBaseBolt.class); + private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1; boolean writeToWAL = true; List batchMutations; List tupleBatch; + int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS; public HBaseBolt(String tableName, HBaseMapper mapper) { super(tableName, mapper); @@ -73,20 +74,7 @@ public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) { @Override public Map getComponentConfiguration() { - Map conf = super.getComponentConfiguration(); - if (conf == null) { - conf = new Config(); - } - - if (conf.containsKey("topology.message.timeout.secs") && flushIntervalSecs == 0) { - Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString()); - flushIntervalSecs = (int)(Math.floor(topologyTimeout / 2)); - LOG.debug("Setting flush interval to [{}] based on topology.message.timeout.secs", flushIntervalSecs); - } - - LOG.info("Enabling tick tuple with interval [{}]", flushIntervalSecs); - conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs); - return conf; + return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs); }