From 06f7f66db3bee17257e7610ff67178fb4ddab49d Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Tue, 13 Feb 2018 16:56:12 -0500 Subject: [PATCH 1/4] STORM-2946: Upgrade to HBase 2.0 --- .../storm/hbase/topology/WordCountClient.java | 10 +++- external/storm-autocreds/pom.xml | 15 ++++++ .../storm/hbase/security/AutoHBaseNimbus.java | 1 + .../storm/hbase/common/HBaseClient.java | 51 +++++++++++++------ .../org/apache/storm/hbase/common/Utils.java | 11 ++-- .../hbase/trident/state/HBaseMapState.java | 25 ++++++--- .../trident/windowing/HBaseWindowsStore.java | 23 +++++---- .../windowing/HBaseWindowsStoreFactory.java | 7 ++- .../hbase/state/HBaseClientTestUtil.java | 11 ++++ .../storm/flux/examples/WordCountClient.java | 7 ++- pom.xml | 2 +- 11 files changed, 124 insertions(+), 39 deletions(-) diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java index 33ce45091e9..1e0db07c6e4 100644 --- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java +++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java @@ -15,13 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hbase.topology; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; /** @@ -33,11 +38,12 @@ public class WordCountClient { public static void main(String[] args) throws Exception { Configuration config = HBaseConfiguration.create(); - if(args.length > 0){ + if(args.length > 0) { config.set("hbase.rootdir", args[0]); } - HTable table = new HTable(config, "WordCount"); + Connection con = ConnectionFactory.createConnection(config); + Table table = con.getTable(TableName.valueOf("WordCount")); for (String word : WordSpout.words) { diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml index 0e4a1cf69a4..312b5c976f6 100644 --- a/external/storm-autocreds/pom.xml +++ b/external/storm-autocreds/pom.xml @@ -86,6 +86,21 @@ + + org.apache.hbase + hbase-server + ${hbase.version} + + + org.slf4j + slf4j-log4j12 + + + org.apache.zookeeper + zookeeper + + + org.apache.hive.hcatalog hive-hcatalog-streaming diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java index bd1e03a08d9..ce4cb317645 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +//import org.apache.hadoop.hbase.security.token. import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java index ad5160c3a67..fc81a091d1a 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java @@ -26,7 +26,13 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -37,6 +43,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.security.HBaseSecurityUtil; @@ -49,12 +56,14 @@ public class HBaseClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class); - private HTable table; + private Table table; + private Connection connection; public HBaseClient(Map map , final Configuration configuration, final String tableName) { try { UserProvider provider = HBaseSecurityUtil.login(map, configuration); - this.table = Utils.getTable(provider, configuration, tableName); + this.connection = ConnectionFactory.createConnection(configuration, provider.getCurrent()); + this.table = Utils.getTable(this.connection, provider, configuration, tableName); } catch (Exception e) { throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e); } @@ -64,22 +73,34 @@ public List constructMutationReq(byte[] rowKey, ColumnList cols, Durab List mutations = Lists.newArrayList(); if (cols.hasColumns()) { + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); + Put put = new Put(rowKey); put.setDurability(durability); for (ColumnList.Column col : cols.getColumns()) { - if (col.getTs() > 0) { - put.add( - col.getFamily(), - col.getQualifier(), - col.getTs(), - col.getValue() - ); - } else { - put.add( - col.getFamily(), - col.getQualifier(), - col.getValue() - ); + try { + if (col.getTs() > 0) { + put.add( + builder.setFamily(col.getFamily()) + .setQualifier(col.getQualifier()) + .setTimestamp(col.getTs()) + .setValue(col.getValue()) + .setType(Cell.Type.Put) + .setRow(rowKey) + .build() + ); + } else { + put.add( + builder.setFamily(col.getFamily()) + .setQualifier(col.getQualifier()) + .setType(Cell.Type.Put) + .setValue(col.getValue()) + .setRow(rowKey) + .build() + ); + } + } catch (IOException e) { + throw new RuntimeException("HBase Put failed", e); } } mutations.add(put); diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java index 981d4ff46b0..ef8e8e21a34 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java @@ -18,7 +18,10 @@ package org.apache.storm.hbase.common; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; @@ -37,7 +40,7 @@ public class Utils { private Utils(){} - public static HTable getTable(UserProvider provider, Configuration config, String tableName) + public static Table getTable(Connection connection, UserProvider provider, Configuration config, String tableName) throws IOException, InterruptedException { UserGroupInformation ugi; if (provider != null) { @@ -77,9 +80,9 @@ public static HTable getTable(UserProvider provider, Configuration config, Strin } - return ugi.doAs(new PrivilegedExceptionAction() { - @Override public HTable run() throws IOException { - return new HTable(config, tableName); + return ugi.doAs(new PrivilegedExceptionAction() { + @Override public Table run() throws IOException { + return connection.getTable(TableName.valueOf(tableName)); } }); } diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java index 36077fca6b8..b3398024414 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java @@ -28,12 +28,18 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.storm.hbase.common.Utils; import org.apache.storm.hbase.security.HBaseSecurityUtil; @@ -77,7 +83,8 @@ public class HBaseMapState implements IBackingMap { private Options options; private Serializer serializer; - private HTable table; + private Table table; + private Connection connection; /** * Constructor. @@ -107,7 +114,8 @@ public HBaseMapState(final Options options, Map map, int partitionNum) { try { UserProvider provider = HBaseSecurityUtil.login(map, hbConfig); - this.table = Utils.getTable(provider, hbConfig, options.tableName); + this.connection = ConnectionFactory.createConnection(hbConfig, provider.getCurrent()); + this.table = Utils.getTable(this.connection, provider, hbConfig, options.tableName); } catch (Exception e) { throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e); } @@ -245,6 +253,7 @@ public List multiGet(List> keys) { @Override public void multiPut(List> keys, List values) { List puts = new ArrayList(keys.size()); + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); for (int i = 0; i < keys.size(); i++) { byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i)); String qualifier = this.options.mapMapper.qualifier(keys.get(i)); @@ -252,9 +261,13 @@ public void multiPut(List> keys, List values) { new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))}); Put put = new Put(hbaseKey); T val = values.get(i); - put.add(this.options.columnFamily.getBytes(), - qualifier.getBytes(), - this.serializer.serialize(val)); + try { + put.add(builder.setFamily(this.options.columnFamily.getBytes()) + .setQualifier(qualifier.getBytes()) + .setValue(this.serializer.serialize(val)).build()); + } catch (IOException e) { + throw new FailedException("IOException while writing to HBase", e); + } puts.add(put); } @@ -263,7 +276,7 @@ public void multiPut(List> keys, List values) { } catch (InterruptedIOException e) { throw new FailedException("Interrupted while writing to HBase", e); } catch (RetriesExhaustedWithDetailsException e) { - throw new FailedException("Retries exhaused while writing to HBase", e); + throw new FailedException("Retries exhausted while writing to HBase", e); } catch (IOException e) { throw new FailedException("IOException while writing to HBase", e); } diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java index 702a790c5fd..bd5537387de 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -19,9 +19,12 @@ package org.apache.storm.hbase.trident.windowing; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -50,21 +53,23 @@ public class HBaseWindowsStore implements WindowsStore { private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class); public static final String UTF_8 = "utf-8"; - private final ThreadLocal threadLocalHtable; + private final ThreadLocal
threadLocalHtable; private final ThreadLocal threadLocalWindowKryoSerializer; - private final Queue htables = new ConcurrentLinkedQueue<>(); + private final Queue
htables = new ConcurrentLinkedQueue<>(); private final byte[] family; private final byte[] qualifier; + private Connection connection; - public HBaseWindowsStore(final Map topoConf, final Configuration config, final String tableName, byte[] family, byte[] qualifier) { + public HBaseWindowsStore(final Map topoConf, final Configuration config, final String tableName, byte[] family, byte[] qualifier) throws IOException { this.family = family; this.qualifier = qualifier; + this.connection = ConnectionFactory.createConnection(config); - threadLocalHtable = new ThreadLocal() { + threadLocalHtable = new ThreadLocal
() { @Override - protected HTable initialValue() { + protected Table initialValue() { try { - HTable hTable = new HTable(config, tableName); + Table hTable = connection.getTable(TableName.valueOf(tableName)); htables.add(hTable); return hTable; } catch (IOException e) { @@ -82,7 +87,7 @@ protected WindowKryoSerializer initialValue() { } - private HTable htable() { + private Table htable() { return threadLocalHtable.get(); } @@ -259,7 +264,7 @@ public void removeAll(Collection keys) { @Override public void shutdown() { // close all the created hTable instances - for (HTable htable : htables) { + for (Table htable : htables) { try { htable.close(); } catch (IOException e) { diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java index f0c88053aed..c6d2ef0106e 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java @@ -23,6 +23,7 @@ import org.apache.storm.trident.windowing.WindowsStore; import org.apache.storm.trident.windowing.WindowsStoreFactory; +import java.io.IOException; import java.util.Map; /** @@ -49,7 +50,11 @@ public WindowsStore create(Map topoConf) { configuration.set(entry.getKey(), entry.getValue().toString()); } } - return new HBaseWindowsStore(topoConf, configuration, tableName, family, qualifier); + try { + return new HBaseWindowsStore(topoConf, configuration, tableName, family, qualifier); + } catch (IOException e) { + throw new RuntimeException("Failed to connect to HBase.", e); + } } } diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java index c4fa7c1117d..bd226232574 100644 --- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.common.ColumnList; import org.apache.storm.hbase.common.HBaseClient; @@ -369,6 +370,16 @@ public void close() { } + @Override + public boolean renewLease() { + return true; + } + + @Override + public ScanMetrics getScanMetrics() { + return null; + } + @Override public Iterator iterator() { return results.iterator(); diff --git a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java index 6e732ae3765..5f4aee8ef41 100644 --- a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java +++ b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java @@ -23,9 +23,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; /** @@ -61,7 +65,8 @@ public static void main(String[] args) throws Exception { System.exit(1); } - HTable table = new HTable(config, "WordCount"); + Connection con = ConnectionFactory.createConnection(config); + Table table = con.getTable(TableName.valueOf("WordCount")); String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; for (String word : words) { diff --git a/pom.xml b/pom.xml index cb893764a5e..2e491614d8e 100644 --- a/pom.xml +++ b/pom.xml @@ -279,7 +279,7 @@ 0.14.0 2.6.1 ${hadoop.version} - 1.1.12 + 2.0.0-beta-1 3.0.3 3.1.0 2.3 From 74439146a49aecf42fc129138e641c5d77ec0d76 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Tue, 13 Feb 2018 18:52:54 -0500 Subject: [PATCH 2/4] STORM-2946: checkstyle cleanup --- .../apache/storm/hbase/security/AutoHBaseNimbus.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java index ce4cb317645..6e9046747df 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java @@ -18,27 +18,26 @@ package org.apache.storm.hbase.security; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.net.InetAddress; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; -//import org.apache.hadoop.hbase.security.token. import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.storm.Config; import org.apache.storm.common.AbstractHadoopNimbusPluginAutoCreds; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; -import java.net.InetAddress; -import java.util.Map; + import static org.apache.storm.hbase.security.HBaseSecurityUtil.HBASE_CREDENTIALS; import static org.apache.storm.hbase.security.HBaseSecurityUtil.HBASE_KEYTAB_FILE_KEY; From 6ebcf13440a7532e03b897ca9d1a6ddcf94e6ab7 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 21 Feb 2018 12:50:29 -0500 Subject: [PATCH 3/4] STORM-2946: add null check for user --- .../java/org/apache/storm/hbase/common/HBaseClient.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java index fc81a091d1a..da1e90fe6a0 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java @@ -62,7 +62,11 @@ public class HBaseClient implements Closeable { public HBaseClient(Map map , final Configuration configuration, final String tableName) { try { UserProvider provider = HBaseSecurityUtil.login(map, configuration); - this.connection = ConnectionFactory.createConnection(configuration, provider.getCurrent()); + if(provider != null) { + this.connection = ConnectionFactory.createConnection(configuration, provider.getCurrent()); + } else { + this.connection = ConnectionFactory.createConnection(configuration); + } this.table = Utils.getTable(this.connection, provider, configuration, tableName); } catch (Exception e) { throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e); From cd97a89382f91d63a15e72ea973c76a35d04a8ad Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 21 Feb 2018 16:41:37 -0500 Subject: [PATCH 4/4] STORM-2946: fix config logic in HBase trident example --- .../org/apache/storm/hbase/trident/WordCountTrident.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java index c81512efb9c..a8f13fc8108 100644 --- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java +++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.storm.Config; +import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; @@ -38,6 +39,8 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; +import java.util.HashMap; + public class WordCountTrident { public static StormTopology buildTopology(String hbaseRoot){ Fields fields = new Fields("word", "count"); @@ -88,10 +91,14 @@ public static void main(String[] args) throws Exception { if (args.length == 2) { topoName = args[1]; - } else if (args.length > 2) { + } else if (args.length == 0 || args.length > 2) { System.out.println("Usage: TridentFileTopology [topology name]"); return; } + String configKey = "hbase.conf"; + HashMap hbConf = new HashMap(); + hbConf.put("hbase.rootdir", args[0]); + conf.put(configKey, hbConf); conf.setNumWorkers(3); StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0])); }