From 4cda7c059c0be1250708a8895aeff7db3784c565 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 23 May 2018 16:53:16 -0500 Subject: [PATCH 1/2] STORM-3061: Update version of hbase --- examples/storm-hbase-examples/README.md | 62 +++++++++++++++++++ .../hbase/topology/PersistentWordCount.java | 2 +- .../storm/hbase/trident/WordCountTrident.java | 2 +- external/storm-hbase/pom.xml | 40 ++---------- .../storm/hbase/bolt/AbstractHBaseBolt.java | 6 +- .../state/HBaseKeyValueStateProvider.java | 2 +- .../storm/hbase/trident/state/HBaseState.java | 4 +- .../hbase/state/HBaseClientTestUtil.java | 15 ++++- pom.xml | 2 +- 9 files changed, 91 insertions(+), 44 deletions(-) create mode 100644 examples/storm-hbase-examples/README.md diff --git a/examples/storm-hbase-examples/README.md b/examples/storm-hbase-examples/README.md new file mode 100644 index 00000000000..bedf8e6186e --- /dev/null +++ b/examples/storm-hbase-examples/README.md @@ -0,0 +1,62 @@ +# Storm HBase Integration Example + +This is a very simple set of topologies that show how to use the storm-hbase package for accessign HBase from storm. + +## HBase Setup + +First you need an instance of HBase that is setup and running. If you have one already you can skip to setting up the table, if not download a copy from http://archive.apache.org/dist/hbase/1.4.4/ and untar the result into a directory you want to run it in. Then follow the instructiuons from https://hbase.apache.org/0.94/book/quickstart.html to setup a standalone HBase instance. Be aware that when you run `start-hbase.sh` an instance of zookeeper will also be started. If you are testing using a single node storm cluster you can skip running zookeeper yourself as the hbase zookeeper instance will work. + +Before we can run the topology we need to setup a table in HBase to store the results. + +First launch the hbase shell + +``` +hbase shell +``` + +Next create a table called WordCount with a single column family called cf. + +``` +create 'WordCount', 'cf' +``` + +## PersistentWordCount + +PersistentWordCount is a very simple topology that performs a word count and stores the results in HBase. + +### Run The Topology + +``` +storm jar storm-hbase-examples-${STORM_VERSION}.jar org.apache.storm.hbase.topology.PersistentWordCount ${HBASE_HOME} +``` + +In this `${STORM_VERSION}` should be the version of storm you are running, and `${HBASE_HOME}` is where your installed version fo HBase is. `${HBASE_HOME}` is mostly to get the config started. Please refer to the documentation for storm-hbase for more information on how to configure your topology. + +### Verify The Results + +If you want to see the results of the topology live you can run the command + +``` +storm jar storm-hbase-examples-${STORM_VERSION}.jar org.apache.storm.hbase.topology.WordCountClient ${HBASE_HOME} +``` + +## WordCountTrident + +WordCountTrident does essentially the same as PersistentWordCount but using Trident instead. + +### Run The Trident Topology + +``` +storm jar storm-hbase-examples-${STORM_VERSION}.jar org.apache.storm.hbase.trident.WordCountTrident ${HBASE_HOME} +``` + +### Verify The Trident Results + +Verifying the results for the trident topology is a little more difficult. The data is stored in the same format as PersistentWordCount, but the keys are different. To verify that it is working you can look at the logs and you will occasionally see log messages like + +``` +o.a.s.h.t.PrintFunction Thread-16-b-1-executor[7, 7] [INFO] [storm, 10] +``` + +Or you can use the `hbase shell` to `scan 'WordCount'` and see that the values are being updated consistently. + diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java index 65de9221084..015ce85c1f9 100644 --- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java +++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java @@ -33,7 +33,7 @@ public class PersistentWordCount { public static void main(String[] args) throws Exception { Config config = new Config(); - Map hbConf = new HashMap(); + Map hbConf = new HashMap<>(); if (args.length > 0) { hbConf.put("hbase.rootdir", args[0]); } diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java index 4861765ccdc..ee7cee0f509 100644 --- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java +++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java @@ -84,7 +84,7 @@ public static void main(String[] args) throws Exception { if (args.length == 2) { topoName = args[1]; } else if (args.length > 2) { - System.out.println("Usage: TridentFileTopology [topology name]"); + System.out.println("Usage: WordCountTrident [topology name]"); return; } conf.setNumWorkers(3); diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml index e32f3c3c906..04e8ba077e3 100644 --- a/external/storm-hbase/pom.xml +++ b/external/storm-hbase/pom.xml @@ -42,31 +42,15 @@ ${project.version} ${provided.scope} - - org.apache.hbase - hbase-server - ${hbase.version} - - - org.slf4j - slf4j-log4j12 - - - org.apache.zookeeper - zookeeper - - - - jdk.tools - jdk.tools - - - org.apache.hbase hbase-client ${hbase.version} + + log4j + log4j + org.slf4j slf4j-log4j12 @@ -82,17 +66,6 @@ - - org.apache.hadoop - hadoop-hdfs - ${hdfs.version} - - - org.slf4j - slf4j-log4j12 - - - com.github.ben-manes.caffeine caffeine @@ -118,10 +91,7 @@ org.mockito mockito-core - - - org.hamcrest - java-hamcrest + test diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java index 0ff60b3b6d6..bb3e93ce20f 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java @@ -13,6 +13,7 @@ package org.apache.storm.hbase.bolt; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.Validate; @@ -51,7 +52,8 @@ public void prepare(Map topoConf, TopologyContext topologyContex Map conf = (Map) topoConf.get(this.configKey); if (conf == null) { - throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'"); + LOG.warn("HBase configuration not found using key '" + this.configKey + "'"); + conf = Collections.emptyMap(); } if (conf.get("hbase.rootdir") == null) { @@ -63,7 +65,7 @@ public void prepare(Map topoConf, TopologyContext topologyContex //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf //the conf instance is instance of persistentMap so making a copy. - Map hbaseConfMap = new HashMap(conf); + Map hbaseConfMap = new HashMap<>(conf); hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, topoConf.get(Config.TOPOLOGY_AUTO_CREDENTIALS)); this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName); } diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java index 89adae3b9b9..491bdb4dc22 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java @@ -24,9 +24,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.storm.Config; import org.apache.storm.hbase.common.HBaseClient; import org.apache.storm.state.DefaultStateSerializer; diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java index a2164023d72..0f81678709a 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java @@ -14,6 +14,7 @@ import com.google.common.collect.Lists; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,6 +61,7 @@ protected void prepare() { if (conf == null) { LOG.info("HBase configuration not found using key '" + options.configKey + "'"); LOG.info("Using HBase config from first hbase-site.xml found on classpath."); + conf = Collections.emptyMap(); } else { if (conf.get("hbase.rootdir") == null) { LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults."); @@ -71,7 +73,7 @@ protected void prepare() { //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf //the conf instance is instance of persistentMap so making a copy. - Map hbaseConfMap = new HashMap(conf); + Map hbaseConfMap = new HashMap<>(conf); hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS)); this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, options.tableName); diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java index 24ac8c5fef9..189713fd9e9 100644 --- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.common.ColumnList; import org.apache.storm.hbase.common.HBaseClient; @@ -341,7 +342,7 @@ static class MockedResultScanner implements ResultScanner { } @Override - public Result next() throws IOException { + public Result next() { if (results.size() <= position) { return null; } @@ -349,7 +350,7 @@ public Result next() throws IOException { } @Override - public Result[] next(int nbRows) throws IOException { + public Result[] next(int nbRows) { List bulkResult = new ArrayList<>(); for (int i = 0; i < nbRows; i++) { Result result = next(); @@ -364,7 +365,17 @@ public Result[] next(int nbRows) throws IOException { @Override public void close() { + //NO-OP + } + @Override + public boolean renewLease() { + return true; + } + + @Override + public ScanMetrics getScanMetrics() { + return null; } @Override diff --git a/pom.xml b/pom.xml index b5f3a5ef2dc..77de9a329b3 100644 --- a/pom.xml +++ b/pom.xml @@ -292,7 +292,7 @@ 0.14.0 2.6.1 ${hadoop.version} - 1.1.12 + 1.4.4 3.0.3 3.1.0 2.3 From 8242af7d64291c48be7c682b8d4ba569843d5df9 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 19 Jun 2018 14:09:50 -0500 Subject: [PATCH 2/2] STORM-3061: Addressed review comments --- examples/storm-hbase-examples/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/storm-hbase-examples/README.md b/examples/storm-hbase-examples/README.md index bedf8e6186e..83c6f3da627 100644 --- a/examples/storm-hbase-examples/README.md +++ b/examples/storm-hbase-examples/README.md @@ -1,10 +1,10 @@ # Storm HBase Integration Example -This is a very simple set of topologies that show how to use the storm-hbase package for accessign HBase from storm. +This is a very simple set of topologies that show how to use the storm-hbase package for accessing HBase from storm. ## HBase Setup -First you need an instance of HBase that is setup and running. If you have one already you can skip to setting up the table, if not download a copy from http://archive.apache.org/dist/hbase/1.4.4/ and untar the result into a directory you want to run it in. Then follow the instructiuons from https://hbase.apache.org/0.94/book/quickstart.html to setup a standalone HBase instance. Be aware that when you run `start-hbase.sh` an instance of zookeeper will also be started. If you are testing using a single node storm cluster you can skip running zookeeper yourself as the hbase zookeeper instance will work. +First you need an instance of HBase that is setup and running. If you have one already you can start setting up the table using hbase shell, if not download a copy from http://archive.apache.org/dist/hbase/1.4.4/ and untar the result into a directory you want to run it in. Then follow the instructions from https://hbase.apache.org/0.94/book/quickstart.html to setup a standalone HBase instance. Be aware that when you run `start-hbase.sh` an instance of zookeeper will also be started. If you are testing using a single node storm cluster you can skip running zookeeper yourself as the hbase zookeeper instance will work. Before we can run the topology we need to setup a table in HBase to store the results. @@ -30,7 +30,7 @@ PersistentWordCount is a very simple topology that performs a word count and sto storm jar storm-hbase-examples-${STORM_VERSION}.jar org.apache.storm.hbase.topology.PersistentWordCount ${HBASE_HOME} ``` -In this `${STORM_VERSION}` should be the version of storm you are running, and `${HBASE_HOME}` is where your installed version fo HBase is. `${HBASE_HOME}` is mostly to get the config started. Please refer to the documentation for storm-hbase for more information on how to configure your topology. +In this `${STORM_VERSION}` should be the version of storm you are running, and `${HBASE_HOME}` is where your installed version of HBase is. `${HBASE_HOME}` is mostly to get the config started. Please refer to the documentation for storm-hbase for more information on how to configure your topology. ### Verify The Results