From f09b7e55461194c2c03aedf569a739795cfc87ee Mon Sep 17 00:00:00 2001 From: Sanket Date: Tue, 4 Aug 2015 01:19:21 -0500 Subject: [PATCH] STORM-828 removing unnecessary config information from HdfsBolt --- .../apache/storm/hdfs/bolt/CSVFileBolt.java | 32 +++++++++++++++++++ .../org/apache/storm/hdfs/bolt/HdfsBolt.java | 24 ++++++++++++++ .../storm/hdfs/bolt/SequenceFileBolt.java | 20 ++++++++++++ .../apache/storm/hdfs/bolt/TSVFileBolt.java | 32 +++++++++++++++++++ .../storm/hdfs/bolt/HdfsFileTopology.java | 22 +++---------- .../storm/hdfs/bolt/SequenceFileTopology.java | 23 +------------ 6 files changed, 113 insertions(+), 40 deletions(-) create mode 100644 external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/CSVFileBolt.java create mode 100644 external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/TSVFileBolt.java diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/CSVFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/CSVFileBolt.java new file mode 100644 index 00000000000..0a17a17f74f --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/CSVFileBolt.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.bolt; + +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.RecordFormat; +import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.common.rotation.MoveFileAction; + +public class CSVFileBolt extends HdfsBolt { + private static String fileExtension = ".csv"; + + public CSVFileBolt(String sourceDir, String destDir) { + super(sourceDir, destDir, fileExtension); + } +} diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java index dcb09e7d666..97a8477772a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java @@ -25,10 +25,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.hdfs.common.rotation.MoveFileAction; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +49,25 @@ public class HdfsBolt extends AbstractHdfsBolt{ private transient FSDataOutputStream out; private RecordFormat format; private long offset = 0; + private static String defaultSourceDir = "/tmp/source"; + private static String defaultDestDir = "/tmp/dest"; + private static String defaultFileExtension = ".txt"; + + + public HdfsBolt() { + this(defaultSourceDir, defaultDestDir, defaultFileExtension); + } + + public HdfsBolt(String defaultSourceDir, String defaultDestDir, String fileExtension) { + this.withRotationPolicy(new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES)) + .withConfigKey("hdfs.config") + .withRecordFormat(new DelimitedRecordFormat().withRecordDelimiter("|")) + .withFileNameFormat(new DefaultFileNameFormat() + .withPath(defaultSourceDir) + .withExtension(fileExtension)) + .withSyncPolicy(new CountSyncPolicy(1000)) + .addRotationAction(new MoveFileAction().toDestination(defaultDestDir)); + } public HdfsBolt withFsUrl(String fsUrl){ this.fsUrl = fsUrl; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java index baf4df030ae..a7e55755368 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java @@ -24,10 +24,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.DefaultSequenceFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.SequenceFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.common.rotation.MoveFileAction; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +50,23 @@ public class SequenceFileBolt extends AbstractHdfsBolt { private String compressionCodec = "default"; private transient CompressionCodecFactory codecFactory; + private static String sourceDir = "/tmp/source"; + private static String destDir = "/tmp/dest"; public SequenceFileBolt() { + this(sourceDir,destDir); + } + + public SequenceFileBolt(String sourceDir, String destDir) { + this.withFileNameFormat(new DefaultFileNameFormat() + .withPath(sourceDir) + .withExtension(".seq")) + .withSequenceFormat(new DefaultSequenceFormat("timestamp", "sentence")) + .withRotationPolicy(new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB)) + .withSyncPolicy(new CountSyncPolicy(1000)) + .withCompressionType(SequenceFile.CompressionType.RECORD) + .withCompressionCodec("deflate") + .addRotationAction(new MoveFileAction().toDestination(destDir)); } public SequenceFileBolt withCompressionCodec(String codec){ diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/TSVFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/TSVFileBolt.java new file mode 100644 index 00000000000..3921d6fcbb6 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/TSVFileBolt.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.bolt; + +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.RecordFormat; +import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.common.rotation.MoveFileAction; + +public class TSVFileBolt extends HdfsBolt { + private static String fileExtension = ".txt"; + + public TSVFileBolt(String sourceDir, String destDir) { + super(sourceDir, destDir, fileExtension); + } +} diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java index 32f1f2d3e88..263324a2b8b 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java @@ -58,19 +58,11 @@ public class HdfsFileTopology { public static void main(String[] args) throws Exception { Config config = new Config(); config.setNumWorkers(1); + String sourceDir = "/tmp/foo"; + String destDir = "tmp/dest"; SentenceSpout spout = new SentenceSpout(); - // sync the filesystem after every 1k tuples - SyncPolicy syncPolicy = new CountSyncPolicy(1000); - - // rotate files when they reach 5MB - FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES); - - FileNameFormat fileNameFormat = new DefaultFileNameFormat() - .withPath("/tmp/foo/") - .withExtension(".txt"); - // use "|" instead of "," for field delimiter RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); @@ -81,14 +73,8 @@ public static void main(String[] args) throws Exception { in.close(); config.put("hdfs.config", yamlConf); - HdfsBolt bolt = new HdfsBolt() - .withConfigKey("hdfs.config") - .withFsUrl(args[0]) - .withFileNameFormat(fileNameFormat) - .withRecordFormat(format) - .withRotationPolicy(rotationPolicy) - .withSyncPolicy(syncPolicy) - .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/")); + HdfsBolt bolt = new TSVFileBolt(sourceDir,destDir) + .withFsUrl(args[0]); TopologyBuilder builder = new TopologyBuilder(); diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java index 2351cd359ae..81e62d6f91d 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java @@ -56,29 +56,8 @@ public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout(); - // sync the filesystem after every 1k tuples - SyncPolicy syncPolicy = new CountSyncPolicy(1000); - - // rotate files when they reach 5MB - FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); - - FileNameFormat fileNameFormat = new DefaultFileNameFormat() - .withPath("/tmp/source/") - .withExtension(".seq"); - - // create sequence format instance. - DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence"); - SequenceFileBolt bolt = new SequenceFileBolt() - .withFsUrl(args[0]) - .withFileNameFormat(fileNameFormat) - .withSequenceFormat(format) - .withRotationPolicy(rotationPolicy) - .withSyncPolicy(syncPolicy) - .withCompressionType(SequenceFile.CompressionType.RECORD) - .withCompressionCodec("deflate") - .addRotationAction(new MoveFileAction().toDestination("/tmp/dest/")); - + .withFsUrl(args[0]);