From a6fc550a9cb9b189b590e719be327aa2af0317de Mon Sep 17 00:00:00 2001 From: Roshan Naik Date: Fri, 6 Jan 2017 16:45:05 -0800 Subject: [PATCH] Adding support for named output stream in HdfsSpout --- external/storm-hdfs/README.md | 3 ++- .../apache/storm/hdfs/spout/HdfsSpout.java | 24 +++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md index 4f74e2522a7..66be6f1ac28 100644 --- a/external/storm-hdfs/README.md +++ b/external/storm-hdfs/README.md @@ -537,7 +537,8 @@ Only methods mentioned in **bold** are required. | .setLockTimeoutSec() |~~hdfsspout.lock.timeout.sec~~ | 5 minutes | Duration of inactivity after which a lock file is considered to be abandoned and ready for another spout to take ownership | | .setClocksInSync() |~~hdfsspout.clocks.insync~~ | true | Indicates whether clocks on the storm machines are in sync (using services like NTP). Used for detecting stale locks. | | .withConfigKey() | | | Optional setting. Overrides the default key name ('hdfs.config', see below) used for specifying HDFS client configs. | -| .setHdfsClientSettings() |~~hdfs.config~~ (unless changed via withConfigKey)| | Set it to a Map of Key/value pairs indicating the HDFS settings to be used. For example, keytab and principle could be set using this. See section **Using keytabs on all worker hosts** under HDFS bolt below.| +| .setHdfsClientSettings() |~~hdfs.config~~ (unless changed via withConfigKey)| | Set it to a Map of Key/value pairs indicating the HDFS settings to be used. For example, keytab and principle could be set using this. See section **Using keytabs on all worker hosts** under HDFS bolt below.| +| .withOutputStream() | | | Name of output stream. If set, the tuples will be emited to the specified stream. Else tuples will be emited to the default output stream | --- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 6623a3e6fb6..b7627f24178 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -73,6 +73,8 @@ public class HdfsSpout extends BaseRichSpout { private String inprogress_suffix = ".inprogress"; // not configurable to prevent change between topology restarts private String ignoreSuffix = ".ignore"; + private String outputStreamName= null; + // other members private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); @@ -180,6 +182,14 @@ public HdfsSpout withConfigKey(String configKey) { return this; } + /** + * Set output stream name + */ + public HdfsSpout withOutputStream(String streamName) { + this.outputStreamName = streamName; + return this; + } + public Path getLockDirPath() { return lockDirPath; } @@ -345,7 +355,12 @@ private static void releaseLockAndLog(FileLock fLock, String spoutId) { protected void emitData(List tuple, MessageId id) { LOG.trace("Emitting - {}", id); - this.collector.emit(tuple, id); + + if ( outputStreamName==null ) + collector.emit( tuple, id ); + else + collector.emit( outputStreamName, tuple, id ); + inflight.put(id, tuple); } @@ -775,8 +790,13 @@ private Path renameCompletedFile(Path file) throws IOException { return newFile; } + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(outputFields); + if (outputStreamName!=null) { + declarer.declareStream(outputStreamName, outputFields); + } else { + declarer.declare(outputFields); + } } static class MessageId implements Comparable {