From ad9b4d61f53e4b0a0f6c0cf7b384bba67854fc3f Mon Sep 17 00:00:00 2001 From: ambud Date: Fri, 23 Dec 2016 10:14:27 -0800 Subject: [PATCH] HBase lookup caching STORM-2204 --- external/storm-hbase/README.md | 8 ++ external/storm-hbase/pom.xml | 4 + .../storm/hbase/bolt/AbstractHBaseBolt.java | 2 +- .../storm/hbase/bolt/HBaseLookupBolt.java | 121 ++++++++++++------ 4 files changed, 97 insertions(+), 38 deletions(-) diff --git a/external/storm-hbase/README.md b/external/storm-hbase/README.md index fd4d0ade7d1..9e729573f34 100644 --- a/external/storm-hbase/README.md +++ b/external/storm-hbase/README.md @@ -170,6 +170,14 @@ The `HBaseLookupBolt` will use the mapper to get rowKey to lookup for. It will u figure out which columns to include in the result and it will leverage the `HBaseRowToStormValueMapper` to get the values to be emitted by the bolt. +In addition, the `HBaseLookupBolt` supports bolt-side HBase result caching using an in-memory LRU cache using Guava (for 1.x). To enable caching: + +`hbase.cache.enable` - to enable caching (default false) + +`hbase.cache.ttl.seconds` - set time to live for LRU cache in seconds (default 300) + +`hbase.cache.size` - set size of the cache (default 1000) + You can look at an example topology LookupWordCount.java under `src/test/java`. ## Example: Persistent Word Count A runnable example can be found in the `src/test/java` directory. diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml index d8c3a40be3f..1fb3c38beb7 100644 --- a/external/storm-hbase/pom.xml +++ b/external/storm-hbase/pom.xml @@ -87,5 +87,9 @@ + + com.google.guava + guava + diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java index 76a0f8a3f53..5ede597c499 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java @@ -37,7 +37,7 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(AbstractHBaseBolt.class); - protected OutputCollector collector; + protected transient OutputCollector collector; protected transient HBaseClient hBaseClient; protected String tableName; diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java index 58ef674ae55..b535be757e3 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java @@ -22,14 +22,22 @@ import org.apache.storm.tuple.Values; import org.apache.storm.utils.TupleUtils; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.Validate; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,51 +48,90 @@ * */ public class HBaseLookupBolt extends AbstractHBaseBolt { - private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupBolt.class); - - private HBaseValueMapper rowToTupleMapper; + private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupBolt.class); - private HBaseProjectionCriteria projectionCriteria; + private HBaseValueMapper rowToTupleMapper; - public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper){ - super(tableName, mapper); - Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null"); - this.rowToTupleMapper = rowToTupleMapper; - } + private HBaseProjectionCriteria projectionCriteria; + private transient LoadingCache cache; + private transient boolean cacheEnabled; - public HBaseLookupBolt withConfigKey(String configKey){ - this.configKey = configKey; - return this; - } + public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper) { + super(tableName, mapper); + Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null"); + this.rowToTupleMapper = rowToTupleMapper; + } - public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) { - this.projectionCriteria = projectionCriteria; - return this; - } + public HBaseLookupBolt withConfigKey(String configKey) { + this.configKey = configKey; + return this; + } + + public Object getOrDefault(@SuppressWarnings("rawtypes") Map map, String property, Object defaultValue) { + Object conf = map.get(property); + if(conf==null) { + conf = defaultValue; + } + return conf; + } - @Override - public void execute(Tuple tuple) { - if (TupleUtils.isTick(tuple)) { - collector.ack(tuple); - return; - } - byte[] rowKey = this.mapper.rowKey(tuple); - Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria); + @SuppressWarnings({ "rawtypes" }) + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { + super.prepare(map, topologyContext, collector); + cacheEnabled = Boolean.parseBoolean(getOrDefault(map, "hbase.cache.enable", "false").toString()); + + int cacheTTL = Integer.parseInt(getOrDefault(map, "hbase.cache.ttl.seconds", "300").toString()); + int maxCacheSize = Integer.parseInt(getOrDefault(map, "hbase.cache.size", "1000").toString()); + if (cacheEnabled) { + cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize).expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + .build(new CacheLoader() { - try { - Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0]; - for(Values values : rowToTupleMapper.toValues(tuple, result)) { - this.collector.emit(tuple, values); + @Override + public Result load(byte[] rowKey) throws Exception { + Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria); + if (LOG.isDebugEnabled()) { + LOG.debug("Cache miss for key:" + new String(rowKey)); + } + return hBaseClient.batchGet(Lists.newArrayList(get))[0]; } - this.collector.ack(tuple); - } catch (Exception e) { - this.collector.reportError(e); - this.collector.fail(tuple); - } + + }); + } } - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - rowToTupleMapper.declareOutputFields(outputFieldsDeclarer); + public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) { + this.projectionCriteria = projectionCriteria; + return this; + } + + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + collector.ack(tuple); + return; } + byte[] rowKey = this.mapper.rowKey(tuple); + Result result = null; + try { + if (cacheEnabled) { + result = cache.get(rowKey); + } else { + Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria); + result = hBaseClient.batchGet(Lists.newArrayList(get))[0]; + } + for (Values values : rowToTupleMapper.toValues(tuple, result)) { + this.collector.emit(tuple, values); + } + this.collector.ack(tuple); + } catch (Exception e) { + this.collector.reportError(e); + this.collector.fail(tuple); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + rowToTupleMapper.declareOutputFields(outputFieldsDeclarer); + } }