From adfdc145fc4fed15a71d60ff9c5cc4afbfa9b03f Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 12 Sep 2015 18:52:27 +0900 Subject: [PATCH 1/4] prepare v0.2.0 --- .gitignore | 3 + build.gradle | 10 +- lib/embulk/output/hdfs.rb | 2 +- .../org/embulk/output/HdfsOutputPlugin.java | 219 ------------------ .../output/hdfs/HdfsFileOutputPlugin.java | 93 ++++++++ .../embulk/output/TestHdfsOutputPlugin.java | 5 - .../output/hdfs/TestHdfsFileOutputPlugin.java | 5 + 7 files changed, 107 insertions(+), 230 deletions(-) delete mode 100644 src/main/java/org/embulk/output/HdfsOutputPlugin.java create mode 100644 src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java delete mode 100644 src/test/java/org/embulk/output/TestHdfsOutputPlugin.java create mode 100644 src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java diff --git a/.gitignore b/.gitignore index e45d049..eab43f9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,6 @@ /classpath/ build/ .idea +*.iml +.ruby-version + diff --git a/build.gradle b/build.gradle index 55aab84..7be9252 100644 --- a/build.gradle +++ b/build.gradle @@ -12,7 +12,7 @@ configurations { provided } -version = "0.1.2" +version = "0.2.0" sourceCompatibility = 1.7 targetCompatibility = 1.7 @@ -22,7 +22,7 @@ dependencies { provided "org.embulk:embulk-core:0.7.0" // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" compile 'org.apache.hadoop:hadoop-client:2.6.0' - compile 'com.google.guava:guava:14.0' + compile 'com.google.guava:guava:15.0' testCompile "junit:junit:4.+" } @@ -57,9 +57,9 @@ task gemspec { Gem::Specification.new do |spec| spec.name = "${project.name}" spec.version = "${project.version}" - spec.authors = ["takahiro.nakayama"] - spec.summary = %[Hdfs output plugin for Embulk] - spec.description = %[Dumps records to Hdfs.] + spec.authors = ["Civitaspo"] + spec.summary = %[Hdfs file output plugin for Embulk] + spec.description = %[Stores files on Hdfs.] spec.email = ["civitaspo@gmail.com"] spec.licenses = ["MIT"] spec.homepage = "https://github.com/civitaspo/embulk-output-hdfs" diff --git a/lib/embulk/output/hdfs.rb b/lib/embulk/output/hdfs.rb index c544569..68c8745 100644 --- a/lib/embulk/output/hdfs.rb +++ b/lib/embulk/output/hdfs.rb @@ -1,3 +1,3 @@ Embulk::JavaPlugin.register_output( - "hdfs", "org.embulk.output.HdfsOutputPlugin", + "hdfs", "org.embulk.output.hdfs.HdfsFileOutputPlugin", File.expand_path('../../../../classpath', __FILE__)) diff --git a/src/main/java/org/embulk/output/HdfsOutputPlugin.java b/src/main/java/org/embulk/output/HdfsOutputPlugin.java deleted file mode 100644 index bfb264b..0000000 --- a/src/main/java/org/embulk/output/HdfsOutputPlugin.java +++ /dev/null @@ -1,219 +0,0 @@ -package org.embulk.output; - -import com.google.common.base.Throwables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.embulk.config.*; -import org.embulk.spi.Buffer; -import org.embulk.spi.Exec; -import org.embulk.spi.FileOutputPlugin; -import org.embulk.spi.TransactionalFileOutput; -import org.jruby.embed.ScriptingContainer; -import org.slf4j.Logger; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; -import java.util.Map; - -public class HdfsOutputPlugin implements FileOutputPlugin -{ - private static final Logger logger = Exec.getLogger(HdfsOutputPlugin.class); - - public interface PluginTask extends Task - { - @Config("config_files") - @ConfigDefault("[]") - public List getConfigFiles(); - - @Config("config") - @ConfigDefault("{}") - public Map getConfig(); - - @Config("sequence_format") - @ConfigDefault("\"%03d.%02d\"") - public String getSequenceFormat(); - - @Config("output_path") - @ConfigDefault("\"/tmp/embulk.output.hdfs_output.%Y%m%d_%s\"") - public String getOutputPath(); - - @Config("working_path") - @ConfigDefault("\"/tmp/embulk.working.hdfs_output.%Y%m%d_%s\"") - public String getWorkingPath(); - - } - - @Override - public ConfigDiff transaction(ConfigSource config, - int taskCount, - FileOutputPlugin.Control control) - { - PluginTask task = config.loadConfig(PluginTask.class); - return resume(task.dump(), taskCount, control); - } - - @Override - public ConfigDiff resume(TaskSource taskSource, - int taskCount, - FileOutputPlugin.Control control) - { - control.run(taskSource); - return Exec.newConfigDiff(); - } - - - @Override - public void cleanup(TaskSource taskSource, - int taskCount, - List successTaskReports) - { - } - - @Override - public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) - { - PluginTask task = taskSource.loadTask(PluginTask.class); - - Configuration configuration = getHdfsConfiguration(task); - FileSystem fs = getFs(configuration); - String workingPath = strftime(task.getWorkingPath()); - String outputPath = strftime(task.getOutputPath()); - return new TransactionalHdfsFileOutput(task, fs, workingPath, outputPath, taskIndex); - } - - private Configuration getHdfsConfiguration(final PluginTask task) - { - Configuration configuration = new Configuration(); - - List configFiles = task.getConfigFiles(); - for (Object configFile : configFiles) { - configuration.addResource(configFile.toString()); - } - - for (Map.Entry entry: task.getConfig().entrySet()) { - configuration.set(entry.getKey(), entry.getValue()); - } - - return configuration; - } - - private FileSystem getFs(final Configuration configuration) { - try { - FileSystem fs = FileSystem.get(configuration); - return fs; - } - catch (IOException e) { - logger.error(e.getMessage()); - throw Throwables.propagate(e); - } - } - - private String strftime(final String path) - { - // strftime - ScriptingContainer jruby = new ScriptingContainer(); - Object result = jruby.runScriptlet("Time.now.strftime('" + path + "')"); - return result.toString(); - } - - static class TransactionalHdfsFileOutput implements TransactionalFileOutput - { - private final int taskIndex; - private final FileSystem fs; - private final String workingPath; - private final String outputPath; - private final String sequenceFormat; - - private int fileIndex = 0; - private int callCount = 0; - private Path currentPath = null; - private OutputStream currentStream = null; - - public TransactionalHdfsFileOutput(PluginTask task, FileSystem fs, String workingPath, String outputPath, int taskIndex) - { - this.taskIndex = taskIndex; - this.fs = fs; - this.workingPath = workingPath; - this.outputPath = outputPath; - this.sequenceFormat = task.getSequenceFormat(); - } - - public void nextFile() { - closeCurrentStream(); - currentPath = new Path(workingPath + '/' + String.format(sequenceFormat, taskIndex, fileIndex)); - try { - if (fs.exists(currentPath)) { - throw new IllegalAccessException(currentPath.toString() + "already exists."); - } - currentStream = fs.create(currentPath); - logger.info("Uploading '{}'", currentPath.toString()); - } - catch (IOException | IllegalAccessException e) { - logger.error(e.getMessage()); - throw Throwables.propagate(e); - } - fileIndex++; - } - - @Override - public void add(Buffer buffer) { - if (currentStream == null) { - throw new IllegalStateException("nextFile() must be called before poll()"); - } - try { - logger.debug("#add called {} times for taskIndex {}", callCount, taskIndex); - currentStream.write(buffer.array(), buffer.offset(), buffer.limit()); - callCount++; - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - buffer.release(); - } - } - - @Override - public void finish() { - closeCurrentStream(); - } - - @Override - public void close() { - closeCurrentStream(); - } - - @Override - public void abort() { - } - - @Override - public TaskReport commit() { - try { - fs.rename(new Path(workingPath), new Path(outputPath)); - logger.info("rename {} => {}", workingPath, outputPath); - } catch (IOException e) { - logger.error(e.getMessage()); - throw Throwables.propagate(e); - } - - TaskReport report = Exec.newTaskReport(); - report.set("files", currentPath); - return report; - } - - private void closeCurrentStream() { - try { - if (currentStream != null) { - currentStream.close(); - currentStream = null; - } - - callCount = 0; - } catch (IOException e) { - logger.error(e.getMessage()); - throw Throwables.propagate(e); - } - } - } -} diff --git a/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java b/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java new file mode 100644 index 0000000..8ba1f4b --- /dev/null +++ b/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java @@ -0,0 +1,93 @@ +package org.embulk.output.hdfs; + +import java.util.List; +import com.google.common.base.Optional; +import org.embulk.config.TaskReport; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.config.Task; +import org.embulk.config.TaskSource; +import org.embulk.spi.Exec; +import org.embulk.spi.FileOutputPlugin; +import org.embulk.spi.TransactionalFileOutput; + +public class HdfsFileOutputPlugin + implements FileOutputPlugin +{ + public interface PluginTask + extends Task + { + // configuration option 1 (required integer) + @Config("option1") + public int getOption1(); + + // configuration option 2 (optional string, null is not allowed) + @Config("optoin2") + @ConfigDefault("\"myvalue\"") + public String getOption2(); + + // configuration option 3 (optional string, null is allowed) + @Config("optoin3") + @ConfigDefault("null") + public Optional getOption3(); + + // usually, run() method needs to write multiple files because size of a file + // can be very large. So, file name will be: + // + // path_prefix + String.format(sequence_format, taskIndex, sequenceCounterInRunMethod) + file_ext + // + + //@Config("path_prefix") + //public String getPathPrefix(); + + //@Config("file_ext") + //public String getFileNameExtension(); + + //@Config("sequence_format") + //@ConfigDefault("\"%03d.%02d.\"") + //public String getSequenceFormat(); + } + + @Override + public ConfigDiff transaction(ConfigSource config, int taskCount, + FileOutputPlugin.Control control) + { + PluginTask task = config.loadConfig(PluginTask.class); + + // retryable (idempotent) output: + // return resume(task.dump(), taskCount, control); + + // non-retryable (non-idempotent) output: + control.run(task.dump()); + return Exec.newConfigDiff(); + } + + @Override + public ConfigDiff resume(TaskSource taskSource, + int taskCount, + FileOutputPlugin.Control control) + { + throw new UnsupportedOperationException("hdfs output plugin does not support resuming"); + } + + @Override + public void cleanup(TaskSource taskSource, + int taskCount, + List successTaskReports) + { + } + + @Override + public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) + { + PluginTask task = taskSource.loadTask(PluginTask.class); + + // Write your code here :) + throw new UnsupportedOperationException("HdfsFileOutputPlugin.open method is not implemented yet"); + + // See LocalFileOutputPlugin as an example implementation: + // https://github.com/embulk/embulk/blob/master/embulk-standards/src/main/java/org/embulk/standards/LocalFileOutputPlugin.java + } +} diff --git a/src/test/java/org/embulk/output/TestHdfsOutputPlugin.java b/src/test/java/org/embulk/output/TestHdfsOutputPlugin.java deleted file mode 100644 index 2f0fc08..0000000 --- a/src/test/java/org/embulk/output/TestHdfsOutputPlugin.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.embulk.output; - -public class TestHdfsOutputPlugin -{ -} diff --git a/src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java b/src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java new file mode 100644 index 0000000..6bcd8e2 --- /dev/null +++ b/src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java @@ -0,0 +1,5 @@ +package org.embulk.output.hdfs; + +public class TestHdfsFileOutputPlugin +{ +} From 46067d5f74bd23c61446323df07b1693933444a1 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 12 Sep 2015 20:42:34 +0900 Subject: [PATCH 2/4] create prototype. TODO: - README - dogfooding --- .../output/hdfs/HdfsFileOutputPlugin.java | 185 ++++++++++++++---- 1 file changed, 146 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java b/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java index 8ba1f4b..af2c938 100644 --- a/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java +++ b/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java @@ -1,7 +1,14 @@ package org.embulk.output.hdfs; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; import java.util.List; -import com.google.common.base.Optional; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.embulk.config.TaskReport; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; @@ -9,45 +16,48 @@ import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskSource; +import org.embulk.spi.Buffer; import org.embulk.spi.Exec; import org.embulk.spi.FileOutputPlugin; import org.embulk.spi.TransactionalFileOutput; +import org.jruby.embed.ScriptingContainer; +import org.slf4j.Logger; public class HdfsFileOutputPlugin implements FileOutputPlugin { + private static final Logger logger = Exec.getLogger(HdfsFileOutputPlugin.class); + public interface PluginTask extends Task { - // configuration option 1 (required integer) - @Config("option1") - public int getOption1(); - - // configuration option 2 (optional string, null is not allowed) - @Config("optoin2") - @ConfigDefault("\"myvalue\"") - public String getOption2(); - - // configuration option 3 (optional string, null is allowed) - @Config("optoin3") - @ConfigDefault("null") - public Optional getOption3(); - - // usually, run() method needs to write multiple files because size of a file - // can be very large. So, file name will be: - // - // path_prefix + String.format(sequence_format, taskIndex, sequenceCounterInRunMethod) + file_ext - // - - //@Config("path_prefix") - //public String getPathPrefix(); - - //@Config("file_ext") - //public String getFileNameExtension(); - - //@Config("sequence_format") - //@ConfigDefault("\"%03d.%02d.\"") - //public String getSequenceFormat(); + @Config("config_files") + @ConfigDefault("[]") + public List getConfigFiles(); + + @Config("config") + @ConfigDefault("{}") + public Map getConfig(); + + @Config("path_prefix") + public String getPathPrefix(); + + @Config("file_ext") + public String getFileNameExtension(); + + @Config("sequence_format") + @ConfigDefault("\"%03d.%02d.\"") + public String getSequenceFormat(); + + @Config("rewind_seconds") + @ConfigDefault("0") + public int getRewindSeconds(); + + // this parameter is experimental. + @Config("overwrite") + @ConfigDefault("false") + public boolean getOverwrite(); + } @Override @@ -56,10 +66,6 @@ public ConfigDiff transaction(ConfigSource config, int taskCount, { PluginTask task = config.loadConfig(PluginTask.class); - // retryable (idempotent) output: - // return resume(task.dump(), taskCount, control); - - // non-retryable (non-idempotent) output: control.run(task.dump()); return Exec.newConfigDiff(); } @@ -82,12 +88,113 @@ public void cleanup(TaskSource taskSource, @Override public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) { - PluginTask task = taskSource.loadTask(PluginTask.class); + final PluginTask task = taskSource.loadTask(PluginTask.class); + + final String pathPrefix = strftime(task.getPathPrefix(), task.getRewindSeconds()); + final String pathSuffix = task.getFileNameExtension(); + final String sequenceFormat = task.getSequenceFormat(); + + return new TransactionalFileOutput() + { + private final List hdfsFileNames = new ArrayList<>(); + private int fileIndex = 0; + private OutputStream output = null; + + @Override + public void nextFile() + { + closeCurrentStream(); + Path path = new Path(pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + pathSuffix); + try { + FileSystem fs = getFs(task); + output = fs.create(path, task.getOverwrite()); + logger.info("Uploading '{}'", path); + } + catch (IOException e) { + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + hdfsFileNames.add(path.toString()); + fileIndex++; + } + + @Override + public void add(Buffer buffer) + { + try { + output.write(buffer.array(), buffer.offset(), buffer.limit()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + finally { + buffer.release(); + } + } + + @Override + public void finish() + { + closeCurrentStream(); + } + + @Override + public void close() + { + closeCurrentStream(); + } + + @Override + public void abort() + { + + } + + @Override + public TaskReport commit() + { + TaskReport report = Exec.newTaskReport(); + report.set("hdfs_file_names", hdfsFileNames); + return report; + } + + private void closeCurrentStream() + { + if (output != null) { + try { + output.close(); + output = null; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }; + } + + private static FileSystem getFs(final PluginTask task) + throws IOException + { + Configuration configuration = new Configuration(); - // Write your code here :) - throw new UnsupportedOperationException("HdfsFileOutputPlugin.open method is not implemented yet"); + for (Object configFile : task.getConfigFiles()) { + configuration.addResource(configFile.toString()); + } + configuration.reloadConfiguration(); - // See LocalFileOutputPlugin as an example implementation: - // https://github.com/embulk/embulk/blob/master/embulk-standards/src/main/java/org/embulk/standards/LocalFileOutputPlugin.java + for (Map.Entry entry: task.getConfig().entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + + return FileSystem.get(configuration); + } + + private String strftime(final String raw, final int rewind_seconds) + { + ScriptingContainer jruby = new ScriptingContainer(); + Object resolved = jruby.runScriptlet( + String.format("(Time.now - %s).strftime('%s')", String.valueOf(rewind_seconds), raw)); + return resolved.toString(); } } From b9cb7e27e8e581986b42d9f14359ffcef84891d6 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 20 Sep 2015 14:54:55 +0900 Subject: [PATCH 3/4] dress up --- src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java b/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java index af2c938..432bf8d 100644 --- a/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java +++ b/src/main/java/org/embulk/output/hdfs/HdfsFileOutputPlugin.java @@ -53,7 +53,6 @@ public interface PluginTask @ConfigDefault("0") public int getRewindSeconds(); - // this parameter is experimental. @Config("overwrite") @ConfigDefault("false") public boolean getOverwrite(); @@ -147,7 +146,6 @@ public void close() @Override public void abort() { - } @Override From 635022fdcfd66585c2d588ee5123de955f3a6d92 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 20 Sep 2015 14:55:10 +0900 Subject: [PATCH 4/4] update readme --- README.md | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index f83d40d..031149e 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ -# Hdfs output plugin for Embulk +# Hdfs file output plugin for Embulk A File Output Plugin for Embulk to write HDFS. ## Overview * **Plugin type**: file output -* **Load all or nothing**: no +* **Load all or nothing**: yes * **Resume supported**: no * **Cleanup supported**: no @@ -13,8 +13,12 @@ A File Output Plugin for Embulk to write HDFS. - **config_files** list of paths to Hadoop's configuration files (array of strings, default: `[]`) - **config** overwrites configuration parameters (hash, default: `{}`) -- **output_path** the path finally stored files. (string, default: `"/tmp/embulk.output.hdfs_output.%Y%m%d_%s"`) -- **working_path** the path temporary stored files. (string, default: `"/tmp/embulk.working.hdfs_output.%Y%m%d_%s"`) +- **path_prefix** prefix of target files (string, required) +- **file_ext** suffix of target files (string, required) +- **sequence_format** format for sequence part of target files (string, default: `'.%03d.%02d'`) +- **rewind_seconds** When you use Date format in path_prefix property(like `/tmp/embulk/%Y-%m-%d/out`), the format is interpreted by using the time which is Now minus this property. (int, default: `0`) +- **overwrite** overwrite files when the same filenames already exists (boolean, default: `false`) + - *caution*: even if this property is `true`, this does not mean ensuring the idempotence. if you want to ensure the idempotence, you need the procedures to remove output files after or before running. ## Example @@ -24,14 +28,13 @@ out: config_files: - /etc/hadoop/conf/core-site.xml - /etc/hadoop/conf/hdfs-site.xml - - /etc/hadoop/conf/mapred-site.xml - - /etc/hadoop/conf/yarn-site.xml config: fs.defaultFS: 'hdfs://hdp-nn1:8020' - dfs.replication: 1 - mapreduce.client.submit.file.replication: 1 fs.hdfs.impl: 'org.apache.hadoop.hdfs.DistributedFileSystem' fs.file.impl: 'org.apache.hadoop.fs.LocalFileSystem' + path_prefix: '/tmp/embulk/hdfs_output/%Y-%m-%d/out' + file_ext: 'txt' + overwrite: true formatter: type: csv encoding: UTF-8