From 9fd358eb33b0f3f0d61a8a72890354eb11c5fa1c Mon Sep 17 00:00:00 2001 From: Aaron Dossett Date: Fri, 31 Jul 2015 10:34:44 -0500 Subject: [PATCH] STORM-938: Add tick tuples to HiveBolt for time-based flushing --- external/storm-hive/README.md | 3 +- .../org/apache/storm/hive/bolt/HiveBolt.java | 39 ++++++++++++++----- .../apache/storm/hive/common/HiveOptions.java | 11 ++++++ .../apache/storm/hive/bolt/TestHiveBolt.java | 12 +++--- 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md index b16c390c691..629ec54abce 100644 --- a/external/storm-hive/README.md +++ b/external/storm-hive/README.md @@ -76,9 +76,10 @@ HiveOptions params |withBatchSize| Max number of events written to Hive in a single Hive transaction| Integer. default 15000| |withCallTimeout| (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. | Integer. default 10000| |withHeartBeatInterval| (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.| Integer. default 240 | -|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. defalut true | +|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. default true | |withKerberosPrinicipal| Kerberos user principal for accessing secure Hive | String| |withKerberosKeytab| Kerberos keytab for accessing secure Hive | String | +|withTickTupleInterval| (In seconds) If > 0 then the Hive Bolt will periodically flush transaction batches. Enabling this is recommended to avoid tuple timeouts while waiting for a batch to fill up.| Integer. default 0| diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java index ec0ade2103f..e20d31fb8a0 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java @@ -23,6 +23,8 @@ import backtype.storm.tuple.Tuple; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.utils.TupleUtils; +import backtype.storm.Config; import org.apache.storm.hive.common.HiveWriter; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hive.hcatalog.streaming.*; @@ -99,16 +101,24 @@ public void prepare(Map conf, TopologyContext topologyContext, OutputCollector c @Override public void execute(Tuple tuple) { try { - List partitionVals = options.getMapper().mapPartitions(tuple); - HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options); - HiveWriter writer = getOrCreateWriter(endPoint); - if(timeToSendHeartBeat.compareAndSet(true, false)) { - enableHeartBeatOnAllWriters(); + boolean forceFlush = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + options.getBatchSize() + "]"); + forceFlush = true; } - writer.write(options.getMapper().mapRecord(tuple)); - - tupleBatch.add(tuple); - if(tupleBatch.size() >= options.getBatchSize()) { + else { + List partitionVals = options.getMapper().mapPartitions(tuple); + HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options); + HiveWriter writer = getOrCreateWriter(endPoint); + if (timeToSendHeartBeat.compareAndSet(true, false)) { + enableHeartBeatOnAllWriters(); + } + writer.write(options.getMapper().mapRecord(tuple)); + tupleBatch.add(tuple); + if (tupleBatch.size() >= options.getBatchSize()) + forceFlush = true; + } + if(forceFlush && !tupleBatch.isEmpty()) { flushAllWriters(true); LOG.info("acknowledging tuples after writers flushed "); for(Tuple t : tupleBatch) @@ -174,6 +184,17 @@ public void cleanup() { LOG.info("Hive Bolt stopped"); } + @Override + public Map getComponentConfiguration() { + Map conf = super.getComponentConfiguration(); + if (conf == null) + conf = new Config(); + + if (options.getTickTupleInterval() > 0) + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval()); + + return conf; + } private void setupHeartBeatTimer() { if(options.getHeartBeatInterval()>0) { diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java index d3162949aa3..3df16004a34 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java @@ -39,6 +39,7 @@ public class HiveOptions implements Serializable { protected Boolean autoCreatePartitions = true; protected String kerberosPrincipal; protected String kerberosKeytab; + protected Integer tickTupleInterval = 0; public HiveOptions(String metaStoreURI,String databaseName,String tableName,HiveMapper mapper) { this.metaStoreURI = metaStoreURI; @@ -47,6 +48,12 @@ public HiveOptions(String metaStoreURI,String databaseName,String tableName,Hive this.mapper = mapper; } + public HiveOptions withTickTupleInterval(Integer tickInterval) + { + this.tickTupleInterval = tickInterval; + return this; + } + public HiveOptions withTxnsPerBatch(Integer txnsPerBatch) { this.txnsPerBatch = txnsPerBatch; return this; @@ -143,4 +150,8 @@ public String getKerberosPrincipal() { public String getKerberosKeytab() { return kerberosKeytab; } + + public Integer getTickTupleInterval() { + return tickTupleInterval; + } } diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java index b83b960f2a7..0350c6ed4dd 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java @@ -20,7 +20,6 @@ import backtype.storm.Config; import backtype.storm.task.GeneralTopologyContext; -import backtype.storm.task.IOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; @@ -95,7 +94,7 @@ public class TestHiveBolt { public TemporaryFolder dbFolder = new TemporaryFolder(); @Mock - private IOutputCollector collector; + private OutputCollector collector; private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class); @@ -150,7 +149,7 @@ public void testWithByteArrayIdandMessage() .withTxnsPerBatch(2) .withBatchSize(2); bolt = new HiveBolt(hiveOptions); - bolt.prepare(config,null,new OutputCollector(collector)); + bolt.prepare(config,null,collector); Integer id = 100; String msg = "test-123"; String city = "sunnyvale"; @@ -183,7 +182,7 @@ public void testWithoutPartitions() .withBatchSize(2) .withAutoCreatePartitions(false); bolt = new HiveBolt(hiveOptions); - bolt.prepare(config,null,new OutputCollector(collector)); + bolt.prepare(config,null,collector); Integer id = 100; String msg = "test-123"; String city = "sunnyvale"; @@ -217,7 +216,7 @@ public void testWithTimeformat() .withTxnsPerBatch(2) .withBatchSize(1); bolt = new HiveBolt(hiveOptions); - bolt.prepare(config,null,new OutputCollector(collector)); + bolt.prepare(config,null,collector); Integer id = 100; String msg = "test-123"; Date d = new Date(); @@ -249,6 +248,7 @@ public void testData() bolt = new HiveBolt(hiveOptions); bolt.prepare(config, null, new OutputCollector(collector)); Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA"); + //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA"); bolt.execute(tuple1); verify(collector).ack(tuple1); @@ -270,7 +270,7 @@ public void testJsonWriter() .withTxnsPerBatch(2) .withBatchSize(1); bolt = new HiveBolt(hiveOptions); - bolt.prepare(config,null,new OutputCollector(collector)); + bolt.prepare(config,null,collector); Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA"); //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA"); bolt.execute(tuple1);