From 6f9c199c6a453773c768a9700f2286326079afdf Mon Sep 17 00:00:00 2001 From: Sachin Pasalkar Date: Thu, 16 Feb 2017 01:12:02 +0530 Subject: [PATCH 1/3] STORM-2358: Update storm hdfs spout to remove specific implementation handling As part of this change we have removed specific handling in code for TextFileReader & SequenceFileReader also made AbstractFileReader as public --- external/storm-hdfs/README.md | 4 +- .../spout/{HdfsSpout.java => HDFSSpout.java} | 84 +++++++------------ ...{TestHdfsSpout.java => TestHDFSSpout.java} | 53 ++++++------ 3 files changed, 56 insertions(+), 85 deletions(-) rename external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/{HdfsSpout.java => HDFSSpout.java} (90%) rename external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/{TestHdfsSpout.java => TestHDFSSpout.java} (92%) diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md index c7ab7ca4bde..5f22aa6e063 100644 --- a/external/storm-hdfs/README.md +++ b/external/storm-hdfs/README.md @@ -504,7 +504,7 @@ The following example creates an HDFS spout that reads text files from HDFS path ```java // Instantiate spout to read text files -HdfsSpout textReaderSpout = new HdfsSpout().setReaderType("text") +HdfsSpout textReaderSpout = new HDFSSpout().setReaderType(TextFileReader.class) .withOutputFields(TextFileReader.defaultFields) .setHdfsUri("hdfs://localhost:54310") // reqd .setSourceDir("/data/in") // reqd @@ -540,7 +540,7 @@ Only methods mentioned in **bold** are required. | Method | Alternative config name (deprecated) | Default | Description | |----------------------------|--------------------------------------|-------------|-------------| -| **.setReaderType()** |~~hdfsspout.reader.type~~ | | Determines which file reader to use. Set to 'seq' for reading sequence files or 'text' for text files. Set to a fully qualified class name if using a custom file reader class (that implements interface org.apache.storm.hdfs.spout.FileReader)| +| **.setReaderType()** |~~hdfsspout.reader.type~~ | | Determines which file reader to use. Set to 'org.apache.storm.hdfs.spout.SequenceFileReader' for reading sequence files or 'org.apache.storm.hdfs.spout.TextFileReader' for text files OR set to custom file reader class (that implements interface org.apache.storm.hdfs.spout.FileReader)| | **.withOutputFields()** | | | Sets the names for the output fields for the spout. The number of fields depends upon the reader being used. For convenience, built-in reader types expose a static member called `defaultFields` that can be used for setting this.| | **.setHdfsUri()** |~~hdfsspout.hdfs~~ | | HDFS URI for the hdfs Name node. Example: hdfs://namenodehost:8020| | **.setSourceDir()** |~~hdfsspout.source.dir~~ | | HDFS directory from where to read files. E.g. /data/inputdir| diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java similarity index 90% rename from external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java rename to external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java index b7627f24178..4fd0e585273 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java @@ -45,11 +45,11 @@ import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; -public class HdfsSpout extends BaseRichSpout { +public class HDFSSpout extends BaseRichSpout { // user configurable private String hdfsUri; // required - private String readerType; // required + private Class readerType; // required private Fields outputFields; // required private String sourceDir; // required @@ -76,7 +76,7 @@ public class HdfsSpout extends BaseRichSpout { private String outputStreamName= null; // other members - private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); + private static final Logger LOG = LoggerFactory.getLogger(HDFSSpout.class); private ProgressTracker tracker = null; @@ -105,79 +105,79 @@ public class HdfsSpout extends BaseRichSpout { private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs - public HdfsSpout() { + public HDFSSpout() { } - public HdfsSpout setHdfsUri(String hdfsUri) { + public HDFSSpout setHdfsUri(String hdfsUri) { this.hdfsUri = hdfsUri; return this; } - public HdfsSpout setReaderType(String readerType) { + public HDFSSpout setReaderType(Class readerType) { this.readerType = readerType; return this; } - public HdfsSpout setSourceDir(String sourceDir) { + public HDFSSpout setSourceDir(String sourceDir) { this.sourceDir = sourceDir; return this; } - public HdfsSpout setArchiveDir(String archiveDir) { + public HDFSSpout setArchiveDir(String archiveDir) { this.archiveDir = archiveDir; return this; } - public HdfsSpout setBadFilesDir(String badFilesDir) { + public HDFSSpout setBadFilesDir(String badFilesDir) { this.badFilesDir = badFilesDir; return this; } - public HdfsSpout setLockDir(String lockDir) { + public HDFSSpout setLockDir(String lockDir) { this.lockDir = lockDir; return this; } - public HdfsSpout setCommitFrequencyCount(int commitFrequencyCount) { + public HDFSSpout setCommitFrequencyCount(int commitFrequencyCount) { this.commitFrequencyCount = commitFrequencyCount; return this; } - public HdfsSpout setCommitFrequencySec(int commitFrequencySec) { + public HDFSSpout setCommitFrequencySec(int commitFrequencySec) { this.commitFrequencySec = commitFrequencySec; return this; } - public HdfsSpout setMaxOutstanding(int maxOutstanding) { + public HDFSSpout setMaxOutstanding(int maxOutstanding) { this.maxOutstanding = maxOutstanding; return this; } - public HdfsSpout setLockTimeoutSec(int lockTimeoutSec) { + public HDFSSpout setLockTimeoutSec(int lockTimeoutSec) { this.lockTimeoutSec = lockTimeoutSec; return this; } - public HdfsSpout setClocksInSync(boolean clocksInSync) { + public HDFSSpout setClocksInSync(boolean clocksInSync) { this.clocksInSync = clocksInSync; return this; } - public HdfsSpout setIgnoreSuffix(String ignoreSuffix) { + public HDFSSpout setIgnoreSuffix(String ignoreSuffix) { this.ignoreSuffix = ignoreSuffix; return this; } /** Output field names. Number of fields depends upon the reader type */ - public HdfsSpout withOutputFields(String... fields) { + public HDFSSpout withOutputFields(String... fields) { outputFields = new Fields(fields); return this; } /** set key name under which HDFS options are placed. (similar to HDFS bolt). * default key name is 'hdfs.config' */ - public HdfsSpout withConfigKey(String configKey) { + public HDFSSpout withConfigKey(String configKey) { this.configKey = configKey; return this; } @@ -185,7 +185,7 @@ public HdfsSpout withConfigKey(String configKey) { /** * Set output stream name */ - public HdfsSpout withOutputStream(String streamName) { + public HDFSSpout withOutputStream(String streamName) { this.outputStreamName = streamName; return this; } @@ -409,9 +409,14 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect // Reader type config if ( readerType==null && conf.containsKey(Configs.READER_TYPE) ) { - readerType = conf.get(Configs.READER_TYPE).toString(); + String className = (String) conf.get(Configs.READER_TYPE); + try { + readerType = (Class) Class.forName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to instantiate " + className, e); + } } - checkValidReader(readerType); + // -- source dir config if ( sourceDir==null && conf.containsKey(Configs.SOURCE_DIR) ) { @@ -531,22 +536,6 @@ private String getDefaultLockDir(Path sourceDirPath) { return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR; } - private static void checkValidReader(String readerType) { - if ( readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ) ) - return; - try { - Class classType = Class.forName(readerType); - classType.getConstructor(FileSystem.class, Path.class, Map.class); - return; - } catch (ClassNotFoundException e) { - LOG.error(readerType + " not found in classpath.", e); - throw new IllegalArgumentException(readerType + " not found in classpath.", e); - } catch (NoSuchMethodException e) { - LOG.error(readerType + " is missing the expected constructor for Readers.", e); - throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers."); - } - } - @Override public void ack(Object msgId) { LOG.trace("Ack received for msg {} on spout {}", msgId, spoutId); @@ -696,18 +685,10 @@ private boolean hasExpired(long lastModifyTime) { */ private FileReader createFileReader(Path file) throws IOException { - if ( readerType.equalsIgnoreCase(Configs.SEQ) ) { - return new SequenceFileReader(this.hdfs, file, conf); - } - if ( readerType.equalsIgnoreCase(Configs.TEXT) ) { - return new TextFileReader(this.hdfs, file, conf); - } try { - Class clsType = Class.forName(readerType); - Constructor constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class); + Constructor constructor = readerType.getConstructor(FileSystem.class, Path.class, Map.class); return (FileReader) constructor.newInstance(this.hdfs, file, conf); } catch (Exception e) { - LOG.error(e.getMessage(), e); throw new RuntimeException("Unable to instantiate " + readerType + " reader", e); } } @@ -722,19 +703,10 @@ private FileReader createFileReader(Path file) */ private FileReader createFileReader(Path file, String offset) throws IOException { - if ( readerType.equalsIgnoreCase(Configs.SEQ) ) { - return new SequenceFileReader(this.hdfs, file, conf, offset); - } - if ( readerType.equalsIgnoreCase(Configs.TEXT) ) { - return new TextFileReader(this.hdfs, file, conf, offset); - } - try { - Class clsType = Class.forName(readerType); - Constructor constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class); + Constructor constructor = readerType.getConstructor(FileSystem.class, Path.class, Map.class, String.class); return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset); } catch (Exception e) { - LOG.error(e.getMessage(), e); throw new RuntimeException("Unable to instantiate " + readerType, e); } } diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHDFSSpout.java similarity index 92% rename from external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java rename to external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHDFSSpout.java index f60cbf3e315..cd87f5fc053 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHDFSSpout.java @@ -59,7 +59,7 @@ import org.apache.storm.hdfs.common.HdfsUtils.Pair; -public class TestHdfsSpout { +public class TestHDFSSpout { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @@ -69,7 +69,7 @@ public class TestHdfsSpout { private Path badfiles; - public TestHdfsSpout() { + public TestHDFSSpout() { } static MiniDFSCluster.Builder builder; @@ -117,7 +117,7 @@ public void testSimpleText_noACK() throws IOException { Path file2 = new Path(source.toString() + "/file2.txt"); createTextFile(file2, 5); - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HDFSSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1); @@ -139,7 +139,7 @@ public void testSimpleText_ACK() throws IOException { Path file2 = new Path(source.toString() + "/file2.txt"); createTextFile(file2, 5); - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HDFSSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1); @@ -165,13 +165,13 @@ public void testResumeAbandoned_Text_NoAck() throws Exception { final Integer lockExpirySec = 1; - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HDFSSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1000); // effectively disable commits based on time spout.setLockTimeoutSec(lockExpirySec); - HdfsSpout spout2 = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HDFSSpout spout2 = makeSpout(TextFileReader.class); spout2.setCommitFrequencyCount(1); spout2.setCommitFrequencySec(1000); // effectively disable commits based on time spout2.setLockTimeoutSec(lockExpirySec); @@ -222,13 +222,13 @@ public void testResumeAbandoned_Seq_NoAck() throws Exception { final Integer lockExpirySec = 1; - HdfsSpout spout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields); + HDFSSpout spout = makeSpout(SequenceFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1000); // effectively disable commits based on time spout.setLockTimeoutSec(lockExpirySec); - HdfsSpout spout2 = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields); + HDFSSpout spout2 = makeSpout(SequenceFileReader.class); spout2.setCommitFrequencyCount(1); spout2.setCommitFrequencySec(1000); // effectively disable commits based on time spout2.setLockTimeoutSec(lockExpirySec); @@ -278,7 +278,7 @@ private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) } List actual = new ArrayList<>(); - for (Pair> item : collector.items) { + for (Pair> item : collector.items) { actual.add(item.getValue().get(0).toString()); } Assert.assertEquals(expected, actual); @@ -344,7 +344,7 @@ public void testMultipleFileConsumption_Ack() throws Exception { createTextFile(file1, 5); - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HDFSSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1); @@ -398,14 +398,14 @@ public void testMultipleFileConsumption_Ack() throws Exception { Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); } - private static T getField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { - Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); + private static T getField(HDFSSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { + Field readerFld = HDFSSpout.class.getDeclaredField(fieldName); readerFld.setAccessible(true); return (T) readerFld.get(spout); } - private static boolean getBoolField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { - Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); + private static boolean getBoolField(HDFSSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { + Field readerFld = HDFSSpout.class.getDeclaredField(fieldName); readerFld.setAccessible(true); return readerFld.getBoolean(spout); } @@ -426,7 +426,7 @@ public void testSimpleSequenceFile() throws IOException { createSeqFile(fs, file2, 5); - HdfsSpout spout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields); + HDFSSpout spout = makeSpout(SequenceFileReader.class); Map conf = getCommonConfigs(); openSpout(spout, 0, conf); @@ -454,7 +454,7 @@ public void testReadFailures() throws Exception { Assert.assertEquals(2, listDir(source).size()); // 2) run spout - HdfsSpout spout = makeSpout(MockTextFailingReader.class.getName(), MockTextFailingReader.defaultFields); + HDFSSpout spout = makeSpout(MockTextFailingReader.class); Map conf = getCommonConfigs(); openSpout(spout, 0, conf); @@ -476,7 +476,7 @@ public void testLocking() throws Exception { // 0) config spout to log progress in lock file for each tuple - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HDFSSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1000); // effectively disable commits based on time @@ -527,7 +527,7 @@ public void testLockLoggingFreqCount() throws Exception { // 0) config spout to log progress in lock file for each tuple - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HDFSSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(2); // 1 lock log entry every 2 tuples spout.setCommitFrequencySec(1000); // Effectively disable commits based on time @@ -554,7 +554,7 @@ public void testLockLoggingFreqSec() throws Exception { createTextFile(file1, 10); // 0) config spout to log progress in lock file for each tuple - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HDFSSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(0); // disable it spout.setCommitFrequencySec(2); // log every 2 sec @@ -594,9 +594,8 @@ private Map getCommonConfigs() { return conf; } - private HdfsSpout makeSpout(String readerType, String[] outputFields) { - HdfsSpout spout = new HdfsSpout().withOutputFields(outputFields) - .setReaderType(readerType) + private HDFSSpout makeSpout(Class readerType) { + HDFSSpout spout = new HDFSSpout().setReaderType(readerType) .setHdfsUri(hdfsCluster.getURI().toString()) .setSourceDir(source.toString()) .setArchiveDir(archive.toString()) @@ -605,7 +604,7 @@ private HdfsSpout makeSpout(String readerType, String[] outputFields) { return spout; } - private void openSpout(HdfsSpout spout, int spoutId, Map conf) { + private void openSpout(HDFSSpout spout, int spoutId, Map conf) { MockCollector collector = new MockCollector(); spout.open(conf, new MockTopologyContext(spoutId), collector); } @@ -620,7 +619,7 @@ private void openSpout(HdfsSpout spout, int spoutId, Map conf) { * fN - fail, item number: N */ - private List runSpout(HdfsSpout spout, String... cmds) { + private List runSpout(HDFSSpout spout, String... cmds) { MockCollector collector = (MockCollector) spout.getCollector(); for(String cmd : cmds) { if(cmd.startsWith("r")) { @@ -634,12 +633,12 @@ private List runSpout(HdfsSpout spout, String... cmds) { } else if(cmd.startsWith("a")) { int n = Integer.parseInt(cmd.substring(1)); - Pair> item = collector.items.get(n); + Pair> item = collector.items.get(n); spout.ack(item.getKey()); } else if(cmd.startsWith("f")) { int n = Integer.parseInt(cmd.substring(1)); - Pair> item = collector.items.get(n); + Pair> item = collector.items.get(n); spout.fail(item.getKey()); } } @@ -684,7 +683,7 @@ private static void createSeqFile(FileSystem fs, Path file, int rowCount) throws static class MockCollector extends SpoutOutputCollector { //comma separated offsets public ArrayList lines; - public ArrayList > > items; + public ArrayList > > items; public MockCollector() { super(null); From 45c06d6b625ca85304db623e43a954bb203655d5 Mon Sep 17 00:00:00 2001 From: Sachin Pasalkar Date: Fri, 17 Feb 2017 13:20:49 +0530 Subject: [PATCH 2/3] As per comments from arunmahadevan --- .../apache/storm/hdfs/spout/HDFSSpout.java | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java index 4fd0e585273..f68c45419f6 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.storm.Config; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,7 +50,8 @@ public class HDFSSpout extends BaseRichSpout { // user configurable private String hdfsUri; // required - private Class readerType; // required + private String readerType; + private Class readerTypeClass; // required private Fields outputFields; // required private String sourceDir; // required @@ -112,9 +114,17 @@ public HDFSSpout setHdfsUri(String hdfsUri) { this.hdfsUri = hdfsUri; return this; } - - public HDFSSpout setReaderType(Class readerType) { - this.readerType = readerType; + /** + * @deprecated use {@link #setReaderType(Class)} instead. + */ + @Deprecated + public HDFSSpout setReaderType(String readerType) { + this.readerType = readerType; + return this; + } + + public HDFSSpout setReaderType(Class readerTypeClass) { + this.readerTypeClass = readerTypeClass; return this; } @@ -411,7 +421,7 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect if ( readerType==null && conf.containsKey(Configs.READER_TYPE) ) { String className = (String) conf.get(Configs.READER_TYPE); try { - readerType = (Class) Class.forName(className); + readerTypeClass = (Class) Class.forName(className); } catch (ClassNotFoundException e) { throw new RuntimeException("Unable to instantiate " + className, e); } @@ -686,7 +696,10 @@ private boolean hasExpired(long lastModifyTime) { private FileReader createFileReader(Path file) throws IOException { try { - Constructor constructor = readerType.getConstructor(FileSystem.class, Path.class, Map.class); + if(StringUtils.isNotBlank(readerType)){ + readerTypeClass = (Class) Class.forName(readerType); + } + Constructor constructor = readerTypeClass.getConstructor(FileSystem.class, Path.class, Map.class); return (FileReader) constructor.newInstance(this.hdfs, file, conf); } catch (Exception e) { throw new RuntimeException("Unable to instantiate " + readerType + " reader", e); @@ -704,7 +717,10 @@ private FileReader createFileReader(Path file) private FileReader createFileReader(Path file, String offset) throws IOException { try { - Constructor constructor = readerType.getConstructor(FileSystem.class, Path.class, Map.class, String.class); + if(StringUtils.isNotBlank(readerType)){ + readerTypeClass = (Class) Class.forName(readerType); + } + Constructor constructor = readerTypeClass.getConstructor(FileSystem.class, Path.class, Map.class, String.class); return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset); } catch (Exception e) { throw new RuntimeException("Unable to instantiate " + readerType, e); From 7b444dae48ab498e0dc6c23436003302bc677361 Mon Sep 17 00:00:00 2001 From: Sachin Pasalkar Date: Fri, 17 Feb 2017 14:25:23 +0530 Subject: [PATCH 3/3] Reverted as per comments --- external/storm-hdfs/README.md | 2 +- .../spout/{HDFSSpout.java => HdfsSpout.java} | 38 ++++++------ ...{TestHDFSSpout.java => TestHdfsSpout.java} | 58 +++++++++---------- 3 files changed, 49 insertions(+), 49 deletions(-) rename external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/{HDFSSpout.java => HdfsSpout.java} (96%) rename external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/{TestHDFSSpout.java => TestHdfsSpout.java} (93%) diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md index 5f22aa6e063..0b3368ce7ba 100644 --- a/external/storm-hdfs/README.md +++ b/external/storm-hdfs/README.md @@ -504,7 +504,7 @@ The following example creates an HDFS spout that reads text files from HDFS path ```java // Instantiate spout to read text files -HdfsSpout textReaderSpout = new HDFSSpout().setReaderType(TextFileReader.class) +HdfsSpout textReaderSpout = new HdfsSpout().setReaderType(TextFileReader.class) .withOutputFields(TextFileReader.defaultFields) .setHdfsUri("hdfs://localhost:54310") // reqd .setSourceDir("/data/in") // reqd diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java similarity index 96% rename from external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java rename to external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index f68c45419f6..8914c9a41c8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HDFSSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -46,7 +46,7 @@ import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; -public class HDFSSpout extends BaseRichSpout { +public class HdfsSpout extends BaseRichSpout { // user configurable private String hdfsUri; // required @@ -78,7 +78,7 @@ public class HDFSSpout extends BaseRichSpout { private String outputStreamName= null; // other members - private static final Logger LOG = LoggerFactory.getLogger(HDFSSpout.class); + private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); private ProgressTracker tracker = null; @@ -107,10 +107,10 @@ public class HDFSSpout extends BaseRichSpout { private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs - public HDFSSpout() { + public HdfsSpout() { } - public HDFSSpout setHdfsUri(String hdfsUri) { + public HdfsSpout setHdfsUri(String hdfsUri) { this.hdfsUri = hdfsUri; return this; } @@ -118,76 +118,76 @@ public HDFSSpout setHdfsUri(String hdfsUri) { * @deprecated use {@link #setReaderType(Class)} instead. */ @Deprecated - public HDFSSpout setReaderType(String readerType) { + public HdfsSpout setReaderType(String readerType) { this.readerType = readerType; return this; } - public HDFSSpout setReaderType(Class readerTypeClass) { + public HdfsSpout setReaderType(Class readerTypeClass) { this.readerTypeClass = readerTypeClass; return this; } - public HDFSSpout setSourceDir(String sourceDir) { + public HdfsSpout setSourceDir(String sourceDir) { this.sourceDir = sourceDir; return this; } - public HDFSSpout setArchiveDir(String archiveDir) { + public HdfsSpout setArchiveDir(String archiveDir) { this.archiveDir = archiveDir; return this; } - public HDFSSpout setBadFilesDir(String badFilesDir) { + public HdfsSpout setBadFilesDir(String badFilesDir) { this.badFilesDir = badFilesDir; return this; } - public HDFSSpout setLockDir(String lockDir) { + public HdfsSpout setLockDir(String lockDir) { this.lockDir = lockDir; return this; } - public HDFSSpout setCommitFrequencyCount(int commitFrequencyCount) { + public HdfsSpout setCommitFrequencyCount(int commitFrequencyCount) { this.commitFrequencyCount = commitFrequencyCount; return this; } - public HDFSSpout setCommitFrequencySec(int commitFrequencySec) { + public HdfsSpout setCommitFrequencySec(int commitFrequencySec) { this.commitFrequencySec = commitFrequencySec; return this; } - public HDFSSpout setMaxOutstanding(int maxOutstanding) { + public HdfsSpout setMaxOutstanding(int maxOutstanding) { this.maxOutstanding = maxOutstanding; return this; } - public HDFSSpout setLockTimeoutSec(int lockTimeoutSec) { + public HdfsSpout setLockTimeoutSec(int lockTimeoutSec) { this.lockTimeoutSec = lockTimeoutSec; return this; } - public HDFSSpout setClocksInSync(boolean clocksInSync) { + public HdfsSpout setClocksInSync(boolean clocksInSync) { this.clocksInSync = clocksInSync; return this; } - public HDFSSpout setIgnoreSuffix(String ignoreSuffix) { + public HdfsSpout setIgnoreSuffix(String ignoreSuffix) { this.ignoreSuffix = ignoreSuffix; return this; } /** Output field names. Number of fields depends upon the reader type */ - public HDFSSpout withOutputFields(String... fields) { + public HdfsSpout withOutputFields(String... fields) { outputFields = new Fields(fields); return this; } /** set key name under which HDFS options are placed. (similar to HDFS bolt). * default key name is 'hdfs.config' */ - public HDFSSpout withConfigKey(String configKey) { + public HdfsSpout withConfigKey(String configKey) { this.configKey = configKey; return this; } @@ -195,7 +195,7 @@ public HDFSSpout withConfigKey(String configKey) { /** * Set output stream name */ - public HDFSSpout withOutputStream(String streamName) { + public HdfsSpout withOutputStream(String streamName) { this.outputStreamName = streamName; return this; } diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHDFSSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java similarity index 93% rename from external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHDFSSpout.java rename to external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index cd87f5fc053..264ccca4be6 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHDFSSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -59,7 +59,7 @@ import org.apache.storm.hdfs.common.HdfsUtils.Pair; -public class TestHDFSSpout { +public class TestHdfsSpout { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @@ -69,7 +69,7 @@ public class TestHDFSSpout { private Path badfiles; - public TestHDFSSpout() { + public TestHdfsSpout() { } static MiniDFSCluster.Builder builder; @@ -95,7 +95,7 @@ public static void teardownClass() throws IOException { @Before public void setup() throws Exception { - baseFolder = tempFolder.newFolder("hdfsspout"); + baseFolder = tempFolder.newFolder("HdfsSpout"); source = new Path(baseFolder.toString() + "/source"); fs.mkdirs(source); archive = new Path(baseFolder.toString() + "/archive"); @@ -117,7 +117,7 @@ public void testSimpleText_noACK() throws IOException { Path file2 = new Path(source.toString() + "/file2.txt"); createTextFile(file2, 5); - HDFSSpout spout = makeSpout(TextFileReader.class); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1); @@ -139,7 +139,7 @@ public void testSimpleText_ACK() throws IOException { Path file2 = new Path(source.toString() + "/file2.txt"); createTextFile(file2, 5); - HDFSSpout spout = makeSpout(TextFileReader.class); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1); @@ -165,13 +165,13 @@ public void testResumeAbandoned_Text_NoAck() throws Exception { final Integer lockExpirySec = 1; - HDFSSpout spout = makeSpout(TextFileReader.class); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1000); // effectively disable commits based on time spout.setLockTimeoutSec(lockExpirySec); - HDFSSpout spout2 = makeSpout(TextFileReader.class); + HdfsSpout spout2 = makeSpout(TextFileReader.class); spout2.setCommitFrequencyCount(1); spout2.setCommitFrequencySec(1000); // effectively disable commits based on time spout2.setLockTimeoutSec(lockExpirySec); @@ -222,13 +222,13 @@ public void testResumeAbandoned_Seq_NoAck() throws Exception { final Integer lockExpirySec = 1; - HDFSSpout spout = makeSpout(SequenceFileReader.class); + HdfsSpout spout = makeSpout(SequenceFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1000); // effectively disable commits based on time spout.setLockTimeoutSec(lockExpirySec); - HDFSSpout spout2 = makeSpout(SequenceFileReader.class); + HdfsSpout spout2 = makeSpout(SequenceFileReader.class); spout2.setCommitFrequencyCount(1); spout2.setCommitFrequencySec(1000); // effectively disable commits based on time spout2.setLockTimeoutSec(lockExpirySec); @@ -278,7 +278,7 @@ private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) } List actual = new ArrayList<>(); - for (Pair> item : collector.items) { + for (Pair> item : collector.items) { actual.add(item.getValue().get(0).toString()); } Assert.assertEquals(expected, actual); @@ -344,7 +344,7 @@ public void testMultipleFileConsumption_Ack() throws Exception { createTextFile(file1, 5); - HDFSSpout spout = makeSpout(TextFileReader.class); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1); @@ -398,14 +398,14 @@ public void testMultipleFileConsumption_Ack() throws Exception { Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); } - private static T getField(HDFSSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { - Field readerFld = HDFSSpout.class.getDeclaredField(fieldName); + private static T getField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { + Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); readerFld.setAccessible(true); return (T) readerFld.get(spout); } - private static boolean getBoolField(HDFSSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { - Field readerFld = HDFSSpout.class.getDeclaredField(fieldName); + private static boolean getBoolField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { + Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); readerFld.setAccessible(true); return readerFld.getBoolean(spout); } @@ -414,9 +414,9 @@ private static boolean getBoolField(HDFSSpout spout, String fieldName) throws No @Test public void testSimpleSequenceFile() throws IOException { //1) create a couple files to consume - source = new Path("/tmp/hdfsspout/source"); + source = new Path("/tmp/HdfsSpout/source"); fs.mkdirs(source); - archive = new Path("/tmp/hdfsspout/archive"); + archive = new Path("/tmp/HdfsSpout/archive"); fs.mkdirs(archive); Path file1 = new Path(source + "/file1.seq"); @@ -426,7 +426,7 @@ public void testSimpleSequenceFile() throws IOException { createSeqFile(fs, file2, 5); - HDFSSpout spout = makeSpout(SequenceFileReader.class); + HdfsSpout spout = makeSpout(SequenceFileReader.class); Map conf = getCommonConfigs(); openSpout(spout, 0, conf); @@ -454,7 +454,7 @@ public void testReadFailures() throws Exception { Assert.assertEquals(2, listDir(source).size()); // 2) run spout - HDFSSpout spout = makeSpout(MockTextFailingReader.class); + HdfsSpout spout = makeSpout(MockTextFailingReader.class); Map conf = getCommonConfigs(); openSpout(spout, 0, conf); @@ -476,7 +476,7 @@ public void testLocking() throws Exception { // 0) config spout to log progress in lock file for each tuple - HDFSSpout spout = makeSpout(TextFileReader.class); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1000); // effectively disable commits based on time @@ -527,7 +527,7 @@ public void testLockLoggingFreqCount() throws Exception { // 0) config spout to log progress in lock file for each tuple - HDFSSpout spout = makeSpout(TextFileReader.class); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(2); // 1 lock log entry every 2 tuples spout.setCommitFrequencySec(1000); // Effectively disable commits based on time @@ -554,7 +554,7 @@ public void testLockLoggingFreqSec() throws Exception { createTextFile(file1, 10); // 0) config spout to log progress in lock file for each tuple - HDFSSpout spout = makeSpout(TextFileReader.class); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(0); // disable it spout.setCommitFrequencySec(2); // log every 2 sec @@ -594,8 +594,8 @@ private Map getCommonConfigs() { return conf; } - private HDFSSpout makeSpout(Class readerType) { - HDFSSpout spout = new HDFSSpout().setReaderType(readerType) + private HdfsSpout makeSpout(Class readerType) { + HdfsSpout spout = new HdfsSpout().setReaderType(readerType) .setHdfsUri(hdfsCluster.getURI().toString()) .setSourceDir(source.toString()) .setArchiveDir(archive.toString()) @@ -604,7 +604,7 @@ private HDFSSpout makeSpout(Class readerType) { return spout; } - private void openSpout(HDFSSpout spout, int spoutId, Map conf) { + private void openSpout(HdfsSpout spout, int spoutId, Map conf) { MockCollector collector = new MockCollector(); spout.open(conf, new MockTopologyContext(spoutId), collector); } @@ -619,7 +619,7 @@ private void openSpout(HDFSSpout spout, int spoutId, Map conf) { * fN - fail, item number: N */ - private List runSpout(HDFSSpout spout, String... cmds) { + private List runSpout(HdfsSpout spout, String... cmds) { MockCollector collector = (MockCollector) spout.getCollector(); for(String cmd : cmds) { if(cmd.startsWith("r")) { @@ -633,12 +633,12 @@ private List runSpout(HDFSSpout spout, String... cmds) { } else if(cmd.startsWith("a")) { int n = Integer.parseInt(cmd.substring(1)); - Pair> item = collector.items.get(n); + Pair> item = collector.items.get(n); spout.ack(item.getKey()); } else if(cmd.startsWith("f")) { int n = Integer.parseInt(cmd.substring(1)); - Pair> item = collector.items.get(n); + Pair> item = collector.items.get(n); spout.fail(item.getKey()); } } @@ -683,7 +683,7 @@ private static void createSeqFile(FileSystem fs, Path file, int rowCount) throws static class MockCollector extends SpoutOutputCollector { //comma separated offsets public ArrayList lines; - public ArrayList > > items; + public ArrayList > > items; public MockCollector() { super(null);